11 #ifndef vtk_m_filter_flow_internal_ParticleMessenger_h
12 #define vtk_m_filter_flow_internal_ParticleMessenger_h
17 #include <vtkm/filter/flow/vtkm_filter_flow_export.h>
33 template <
typename ParticleType>
34 class VTKM_FILTER_FLOW_EXPORT ParticleMessenger :
public vtkm::filter::flow::internal::Messenger
37 using MsgCommType = std::pair<int, std::vector<int>>;
40 using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
43 using ParticleRecvCommType = std::pair<int, std::vector<ParticleCommType>>;
46 VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
47 const vtkm::filter::flow::internal::BoundsMap& bm,
49 int numParticles = 128,
53 VTKM_CONT void Exchange(
const std::vector<ParticleType>& outData,
54 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
56 std::vector<ParticleType>& inData,
57 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
59 bool blockAndWait =
false);
62 #ifdef VTKM_ENABLE_MPI
63 static constexpr
int MSG_TERMINATE = 1;
65 enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
67 VTKM_CONT void RegisterMessages(
int msgSz,
int nParticles,
int numBlockIds);
72 template <
typename,
typename>
74 typename Allocator = std::allocator<P>>
75 inline void SendParticles(
int dst,
const Container<P, Allocator>& c);
79 template <
typename,
typename>
81 typename Allocator = std::allocator<P>>
82 inline void SendParticles(
const std::unordered_map<
int, Container<P, Allocator>>& m);
85 VTKM_CONT void SendMsg(
int dst,
const std::vector<int>& msg);
86 VTKM_CONT void SendAllMsg(
const std::vector<int>& msg);
87 VTKM_CONT bool RecvMsg(std::vector<MsgCommType>& msgs) {
return RecvAny(&msgs, NULL,
false); }
90 VTKM_CONT bool RecvAny(std::vector<MsgCommType>* msgs,
91 std::vector<ParticleRecvCommType>* recvParticles,
93 const vtkm::filter::flow::internal::BoundsMap& BoundsMap;
98 const std::vector<ParticleType>& outData,
99 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
101 std::vector<ParticleType>& inData,
102 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
103 bool blockAndWait)
const;
105 static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
111 template <
typename ParticleType>
112 ParticleMessenger<ParticleType>::ParticleMessenger(
113 vtkmdiy::mpi::communicator& comm,
114 const vtkm::filter::flow::internal::BoundsMap& boundsMap,
119 #ifdef VTKM_ENABLE_MPI
120 , BoundsMap(boundsMap)
123 #ifdef VTKM_ENABLE_MPI
124 this->RegisterMessages(msgSz, numParticles, numBlockIds);
128 (void)(numParticles);
133 template <
typename ParticleType>
134 std::size_t ParticleMessenger<ParticleType>::CalcParticleBufferSize(std::size_t nParticles,
135 std::size_t nBlockIds)
138 std::size_t pSize = ParticleType::Sizeof();
141 vtkmdiy::MemoryBuffer buff;
143 vtkmdiy::save(buff, p);
155 +
sizeof(std::size_t)
160 + nParticles *
sizeof(std::size_t)
162 + nParticles * nBlockIds *
sizeof(
vtkm::Id);
166 template <
typename ParticleType>
167 void ParticleMessenger<ParticleType>::SerialExchange(
168 const std::vector<ParticleType>& outData,
169 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
171 std::vector<ParticleType>& inData,
172 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
175 for (
auto& p : outData)
177 const auto& bids = outBlockIDsMap.find(p.GetID())->second;
178 inData.emplace_back(p);
179 inDataBlockIDsMap[p.GetID()] = bids;
184 template <
typename ParticleType>
185 void ParticleMessenger<ParticleType>::Exchange(
186 const std::vector<ParticleType>& outData,
187 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
189 std::vector<ParticleType>& inData,
190 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
194 numTerminateMessages = 0;
195 inDataBlockIDsMap.clear();
197 if (this->GetNumRanks() == 1)
198 return this->SerialExchange(
199 outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
201 #ifdef VTKM_ENABLE_MPI
204 std::unordered_map<int, std::vector<ParticleCommType>> sendData;
206 for (
const auto& p : outData)
208 const auto& bids = outBlockIDsMap.find(p.GetID())->second;
209 int dstRank = this->BoundsMap.FindRank(bids[0]);
210 sendData[dstRank].emplace_back(std::make_pair(p, bids));
214 if (numLocalTerm > 0)
215 this->SendAllMsg({ MSG_TERMINATE,
static_cast<int>(numLocalTerm) });
216 this->SendParticles(sendData);
217 this->CheckPendingSendRequests();
220 std::vector<ParticleRecvCommType> particleData;
221 std::vector<MsgCommType> msgData;
222 if (RecvAny(&msgData, &particleData, blockAndWait))
224 for (
const auto& it : particleData)
225 for (
const auto& v : it.second)
227 const auto& p = v.first;
228 const auto& bids = v.second;
229 inData.emplace_back(p);
230 inDataBlockIDsMap[p.GetID()] = bids;
233 for (
const auto& m : msgData)
235 if (m.second[0] == MSG_TERMINATE)
236 numTerminateMessages +=
static_cast<vtkm::Id>(m.second[1]);
243 #ifdef VTKM_ENABLE_MPI
246 template <
typename ParticleType>
247 void ParticleMessenger<ParticleType>::RegisterMessages(
int msgSz,
int nParticles,
int numBlockIds)
250 std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
251 std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
253 int numRecvs = std::min(64, this->GetNumRanks() - 1);
255 this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
256 this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
258 this->InitializeBuffers();
262 template <
typename ParticleType>
263 void ParticleMessenger<ParticleType>::SendMsg(
int dst,
const std::vector<int>& msg)
265 vtkmdiy::MemoryBuffer buff;
268 vtkmdiy::save(buff, this->GetRank());
269 vtkmdiy::save(buff, msg);
270 this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
274 template <
typename ParticleType>
275 void ParticleMessenger<ParticleType>::SendAllMsg(
const std::vector<int>& msg)
277 for (
int i = 0; i < this->GetNumRanks(); i++)
278 if (i != this->GetRank())
279 this->SendMsg(i, msg);
283 template <
typename ParticleType>
284 bool ParticleMessenger<ParticleType>::RecvAny(std::vector<MsgCommType>* msgs,
285 std::vector<ParticleRecvCommType>* recvParticles,
291 tags.insert(ParticleMessenger::MESSAGE_TAG);
296 tags.insert(ParticleMessenger::PARTICLE_TAG);
297 recvParticles->resize(0);
303 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> buffers;
304 if (!this->RecvData(tags, buffers, blockAndWait))
307 for (
auto& buff : buffers)
309 if (buff.first == ParticleMessenger::MESSAGE_TAG)
315 msgs->emplace_back(std::make_pair(sendRank, m));
317 else if (buff.first == ParticleMessenger::PARTICLE_TAG)
320 std::vector<ParticleCommType> particles;
324 recvParticles->emplace_back(std::make_pair(sendRank, particles));
332 template <
typename ParticleType>
333 template <
typename P,
template <
typename,
typename>
class Container,
typename Allocator>
334 inline void ParticleMessenger<ParticleType>::SendParticles(
int dst,
335 const Container<P, Allocator>& c)
337 if (dst == this->GetRank())
345 vtkmdiy::MemoryBuffer bb;
346 vtkmdiy::save(bb, this->GetRank());
347 vtkmdiy::save(bb, c);
348 this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
352 template <
typename ParticleType>
353 template <
typename P,
template <
typename,
typename>
class Container,
typename Allocator>
354 inline void ParticleMessenger<ParticleType>::SendParticles(
355 const std::unordered_map<
int, Container<P, Allocator>>& m)
357 for (
const auto& mit : m)
358 if (!mit.second.empty())
359 this->SendParticles(mit.first, mit.second);
368 #endif // vtk_m_filter_flow_internal_ParticleMessenger_h