11 #ifndef vtk_m_filter_flow_internal_ParticleMessenger_h
12 #define vtk_m_filter_flow_internal_ParticleMessenger_h
33 template <
typename ParticleType>
37 using MsgCommType = std::pair<int, std::vector<int>>;
40 using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
43 using ParticleRecvCommType = std::pair<int, std::vector<ParticleCommType>>;
46 VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
48 const vtkm::filter::flow::internal::BoundsMap& bm,
50 int numParticles = 128,
54 VTKM_CONT void Exchange(
const std::vector<ParticleType>& outData,
55 const std::vector<vtkm::Id>& outRanks,
56 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
58 std::vector<ParticleType>& inData,
59 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
61 bool blockAndWait =
false);
64 #ifdef VTKM_ENABLE_MPI
65 static constexpr
int MSG_TERMINATE = 1;
67 enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
69 VTKM_CONT void RegisterMessages(
int msgSz,
int nParticles,
int numBlockIds);
74 template <
typename,
typename>
76 typename Allocator = std::allocator<P>>
77 inline void SendParticles(
int dst,
const Container<P, Allocator>& c);
81 template <
typename,
typename>
83 typename Allocator = std::allocator<P>>
84 inline void SendParticles(
const std::unordered_map<
int, Container<P, Allocator>>& m);
87 VTKM_CONT void SendMsg(
int dst,
const std::vector<int>& msg);
88 VTKM_CONT void SendAllMsg(
const std::vector<int>& msg);
89 VTKM_CONT bool RecvMsg(std::vector<MsgCommType>& msgs) {
return RecvAny(&msgs, NULL,
false); }
92 VTKM_CONT bool RecvAny(std::vector<MsgCommType>* msgs,
93 std::vector<ParticleRecvCommType>* recvParticles,
95 const vtkm::filter::flow::internal::BoundsMap& BoundsMap;
100 const std::vector<ParticleType>& outData,
101 const std::vector<vtkm::Id>& outRanks,
102 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
104 std::vector<ParticleType>& inData,
105 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
106 bool blockAndWait)
const;
108 static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
114 template <
typename ParticleType>
115 ParticleMessenger<ParticleType>::ParticleMessenger(
116 vtkmdiy::mpi::communicator& comm,
118 const vtkm::filter::flow::internal::BoundsMap& boundsMap,
122 : Messenger(comm, useAsyncComm)
124 , BoundsMap(boundsMap)
127 #ifdef VTKM_ENABLE_MPI
128 this->RegisterMessages(msgSz, numParticles, numBlockIds);
132 (void)(numParticles);
137 template <
typename ParticleType>
138 std::size_t ParticleMessenger<ParticleType>::CalcParticleBufferSize(std::size_t nParticles,
139 std::size_t nBlockIds)
142 std::size_t pSize = ParticleType::Sizeof();
145 vtkmdiy::MemoryBuffer buff;
147 vtkmdiy::save(buff, p);
159 +
sizeof(std::size_t)
164 + nParticles *
sizeof(std::size_t)
166 + nParticles * nBlockIds *
sizeof(
vtkm::Id);
170 template <
typename ParticleType>
171 void ParticleMessenger<ParticleType>::SerialExchange(
172 const std::vector<ParticleType>& outData,
173 const std::vector<vtkm::Id>&
vtkmNotUsed(outRanks),
174 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
176 std::vector<ParticleType>& inData,
177 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
180 for (
auto& p : outData)
182 const auto& bids = outBlockIDsMap.find(p.GetID())->second;
183 inData.emplace_back(p);
184 inDataBlockIDsMap[p.GetID()] = bids;
189 template <
typename ParticleType>
190 void ParticleMessenger<ParticleType>::Exchange(
191 const std::vector<ParticleType>& outData,
192 const std::vector<vtkm::Id>& outRanks,
193 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
195 std::vector<ParticleType>& inData,
196 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
202 numTerminateMessages = 0;
203 inDataBlockIDsMap.clear();
205 if (this->GetNumRanks() == 1)
206 return this->SerialExchange(
207 outData, outRanks, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
209 #ifdef VTKM_ENABLE_MPI
212 std::unordered_map<int, std::vector<ParticleCommType>> sendData;
214 std::size_t numP = outData.size();
215 for (std::size_t i = 0; i < numP; i++)
217 const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
218 sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
222 if (numLocalTerm > 0)
223 this->SendAllMsg({ MSG_TERMINATE,
static_cast<int>(numLocalTerm) });
224 this->SendParticles(sendData);
225 this->CheckPendingSendRequests();
228 std::vector<ParticleRecvCommType> particleData;
229 std::vector<MsgCommType> msgData;
230 if (RecvAny(&msgData, &particleData, blockAndWait))
232 for (
const auto& it : particleData)
233 for (
const auto& v : it.second)
235 const auto& p = v.first;
236 const auto& bids = v.second;
237 inData.emplace_back(p);
238 inDataBlockIDsMap[p.GetID()] = bids;
241 for (
const auto& m : msgData)
243 if (m.second[0] == MSG_TERMINATE)
244 numTerminateMessages +=
static_cast<vtkm::Id>(m.second[1]);
251 #ifdef VTKM_ENABLE_MPI
254 template <
typename ParticleType>
255 void ParticleMessenger<ParticleType>::RegisterMessages(
int msgSz,
int nParticles,
int numBlockIds)
258 std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
259 std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
261 int numRecvs = std::min(64, this->GetNumRanks() - 1);
263 this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
264 this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
266 this->InitializeBuffers();
270 template <
typename ParticleType>
271 void ParticleMessenger<ParticleType>::SendMsg(
int dst,
const std::vector<int>& msg)
273 vtkmdiy::MemoryBuffer buff;
276 vtkmdiy::save(buff, this->GetRank());
277 vtkmdiy::save(buff, msg);
278 this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
282 template <
typename ParticleType>
283 void ParticleMessenger<ParticleType>::SendAllMsg(
const std::vector<int>& msg)
285 for (
int i = 0; i < this->GetNumRanks(); i++)
286 if (i != this->GetRank())
287 this->SendMsg(i, msg);
291 template <
typename ParticleType>
292 bool ParticleMessenger<ParticleType>::RecvAny(std::vector<MsgCommType>* msgs,
293 std::vector<ParticleRecvCommType>* recvParticles,
299 tags.insert(ParticleMessenger::MESSAGE_TAG);
304 tags.insert(ParticleMessenger::PARTICLE_TAG);
305 recvParticles->resize(0);
311 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> buffers;
312 if (!this->RecvData(tags, buffers, blockAndWait))
315 for (
auto& buff : buffers)
317 if (buff.first == ParticleMessenger::MESSAGE_TAG)
323 msgs->emplace_back(std::make_pair(sendRank, m));
325 else if (buff.first == ParticleMessenger::PARTICLE_TAG)
328 std::vector<ParticleCommType> particles;
332 recvParticles->emplace_back(std::make_pair(sendRank, particles));
340 template <
typename ParticleType>
341 template <
typename P,
template <
typename,
typename>
class Container,
typename Allocator>
342 inline void ParticleMessenger<ParticleType>::SendParticles(
int dst,
343 const Container<P, Allocator>& c)
345 if (dst == this->GetRank())
353 vtkmdiy::MemoryBuffer bb;
354 vtkmdiy::save(bb, this->GetRank());
355 vtkmdiy::save(bb, c);
356 this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
360 template <
typename ParticleType>
361 template <
typename P,
template <
typename,
typename>
class Container,
typename Allocator>
362 inline void ParticleMessenger<ParticleType>::SendParticles(
363 const std::unordered_map<
int, Container<P, Allocator>>& m)
365 for (
const auto& mit : m)
366 if (!mit.second.empty())
367 this->SendParticles(mit.first, mit.second);
376 #endif // vtk_m_filter_flow_internal_ParticleMessenger_h