VTK-m  2.2
ParticleMessenger.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_ParticleMessenger_h
12 #define vtk_m_filter_flow_internal_ParticleMessenger_h
13 
14 #include <vtkm/Particle.h>
18 
19 #include <list>
20 #include <map>
21 #include <set>
22 #include <vector>
23 
24 namespace vtkm
25 {
26 namespace filter
27 {
28 namespace flow
29 {
30 namespace internal
31 {
32 
33 template <typename ParticleType>
34 class VTKM_FILTER_FLOW_EXPORT ParticleMessenger : public vtkm::filter::flow::internal::Messenger
35 {
36  //sendRank, message
37  using MsgCommType = std::pair<int, std::vector<int>>;
38 
39  //particle + blockIDs.
40  using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
41 
42  //sendRank, vector of ParticleCommType.
43  using ParticleRecvCommType = std::pair<int, std::vector<ParticleCommType>>;
44 
45 public:
46  VTKM_CONT ParticleMessenger(vtkmdiy::mpi::communicator& comm,
47  bool useAsyncComm,
48  const vtkm::filter::flow::internal::BoundsMap& bm,
49  int msgSz = 1,
50  int numParticles = 128,
51  int numBlockIds = 2);
52  VTKM_CONT ~ParticleMessenger() {}
53 
54  VTKM_CONT void Exchange(const std::vector<ParticleType>& outData,
55  const std::vector<vtkm::Id>& outRanks,
56  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
57  vtkm::Id numLocalTerm,
58  std::vector<ParticleType>& inData,
59  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
60  vtkm::Id& numTerminateMessages,
61  bool blockAndWait = false);
62 
63 protected:
64 #ifdef VTKM_ENABLE_MPI
65  static constexpr int MSG_TERMINATE = 1;
66 
67  enum { MESSAGE_TAG = 0x42000, PARTICLE_TAG = 0x42001 };
68 
69  VTKM_CONT void RegisterMessages(int msgSz, int nParticles, int numBlockIds);
70 
71  // Send/Recv particles
72  VTKM_CONT
73  template <typename P,
74  template <typename, typename>
75  class Container,
76  typename Allocator = std::allocator<P>>
77  inline void SendParticles(int dst, const Container<P, Allocator>& c);
78 
79  VTKM_CONT
80  template <typename P,
81  template <typename, typename>
82  class Container,
83  typename Allocator = std::allocator<P>>
84  inline void SendParticles(const std::unordered_map<int, Container<P, Allocator>>& m);
85 
86  // Send/Recv messages.
87  VTKM_CONT void SendMsg(int dst, const std::vector<int>& msg);
88  VTKM_CONT void SendAllMsg(const std::vector<int>& msg);
89  VTKM_CONT bool RecvMsg(std::vector<MsgCommType>& msgs) { return RecvAny(&msgs, NULL, false); }
90 
91  // Send/Recv datasets.
92  VTKM_CONT bool RecvAny(std::vector<MsgCommType>* msgs,
93  std::vector<ParticleRecvCommType>* recvParticles,
94  bool blockAndWait);
95  const vtkm::filter::flow::internal::BoundsMap& BoundsMap;
96 
97 #endif
98 
99  VTKM_CONT void SerialExchange(
100  const std::vector<ParticleType>& outData,
101  const std::vector<vtkm::Id>& outRanks,
102  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
103  vtkm::Id numLocalTerm,
104  std::vector<ParticleType>& inData,
105  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
106  bool blockAndWait) const;
107 
108  static std::size_t CalcParticleBufferSize(std::size_t nParticles, std::size_t numBlockIds = 2);
109 };
110 
111 //methods
112 
113 VTKM_CONT
114 template <typename ParticleType>
115 ParticleMessenger<ParticleType>::ParticleMessenger(
116  vtkmdiy::mpi::communicator& comm,
117  bool useAsyncComm,
118  const vtkm::filter::flow::internal::BoundsMap& boundsMap,
119  int msgSz,
120  int numParticles,
121  int numBlockIds)
122  : Messenger(comm, useAsyncComm)
123 #ifdef VTKM_ENABLE_MPI
124  , BoundsMap(boundsMap)
125 #endif
126 {
127 #ifdef VTKM_ENABLE_MPI
128  this->RegisterMessages(msgSz, numParticles, numBlockIds);
129 #else
130  (void)(boundsMap);
131  (void)(msgSz);
132  (void)(numParticles);
133  (void)(numBlockIds);
134 #endif
135 }
136 
137 template <typename ParticleType>
138 std::size_t ParticleMessenger<ParticleType>::CalcParticleBufferSize(std::size_t nParticles,
139  std::size_t nBlockIds)
140 {
141  ParticleType pTmp;
142  std::size_t pSize = ParticleType::Sizeof();
143 
144 #ifndef NDEBUG
145  vtkmdiy::MemoryBuffer buff;
146  ParticleType p;
147  vtkmdiy::save(buff, p);
148 
149  //Make sure the buffer size is correct.
150  //If this fires, then the size of the class has changed.
151  VTKM_ASSERT(pSize == buff.size());
152 #endif
153 
154  return
155  // rank
156  sizeof(int)
157  //std::vector<ParticleType> p;
158  //p.size()
159  + sizeof(std::size_t)
160  //nParticles of ParticleType
161  + nParticles * pSize
162  // std::vector<vtkm::Id> blockIDs for each particle.
163  // blockIDs.size() for each particle
164  + nParticles * sizeof(std::size_t)
165  // nBlockIDs of vtkm::Id for each particle.
166  + nParticles * nBlockIds * sizeof(vtkm::Id);
167 }
168 
169 VTKM_CONT
170 template <typename ParticleType>
171 void ParticleMessenger<ParticleType>::SerialExchange(
172  const std::vector<ParticleType>& outData,
173  const std::vector<vtkm::Id>& vtkmNotUsed(outRanks),
174  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
175  vtkm::Id vtkmNotUsed(numLocalTerm),
176  std::vector<ParticleType>& inData,
177  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
178  bool vtkmNotUsed(blockAndWait)) const
179 {
180  for (auto& p : outData)
181  {
182  const auto& bids = outBlockIDsMap.find(p.GetID())->second;
183  inData.emplace_back(p);
184  inDataBlockIDsMap[p.GetID()] = bids;
185  }
186 }
187 
188 VTKM_CONT
189 template <typename ParticleType>
190 void ParticleMessenger<ParticleType>::Exchange(
191  const std::vector<ParticleType>& outData,
192  const std::vector<vtkm::Id>& outRanks,
193  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
194  vtkm::Id numLocalTerm,
195  std::vector<ParticleType>& inData,
196  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap,
197  vtkm::Id& numTerminateMessages,
198  bool blockAndWait)
199 {
200  VTKM_ASSERT(outData.size() == outRanks.size());
201 
202  numTerminateMessages = 0;
203  inDataBlockIDsMap.clear();
204 
205  if (this->GetNumRanks() == 1)
206  return this->SerialExchange(
207  outData, outRanks, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
208 
209 #ifdef VTKM_ENABLE_MPI
210 
211  //dstRank, vector of (particles,blockIDs)
212  std::unordered_map<int, std::vector<ParticleCommType>> sendData;
213 
214  std::size_t numP = outData.size();
215  for (std::size_t i = 0; i < numP; i++)
216  {
217  const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
218  sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
219  }
220 
221  //Do all the sends first.
222  if (numLocalTerm > 0)
223  this->SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
224  this->SendParticles(sendData);
225  this->CheckPendingSendRequests();
226 
227  //Check if we have anything coming in.
228  std::vector<ParticleRecvCommType> particleData;
229  std::vector<MsgCommType> msgData;
230  if (RecvAny(&msgData, &particleData, blockAndWait))
231  {
232  for (const auto& it : particleData)
233  for (const auto& v : it.second)
234  {
235  const auto& p = v.first;
236  const auto& bids = v.second;
237  inData.emplace_back(p);
238  inDataBlockIDsMap[p.GetID()] = bids;
239  }
240 
241  for (const auto& m : msgData)
242  {
243  if (m.second[0] == MSG_TERMINATE)
244  numTerminateMessages += static_cast<vtkm::Id>(m.second[1]);
245  }
246  }
247 #endif
248 }
249 
250 
251 #ifdef VTKM_ENABLE_MPI
252 
253 VTKM_CONT
254 template <typename ParticleType>
255 void ParticleMessenger<ParticleType>::RegisterMessages(int msgSz, int nParticles, int numBlockIds)
256 {
257  //Determine buffer size for msg and particle tags.
258  std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
259  std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
260 
261  int numRecvs = std::min(64, this->GetNumRanks() - 1);
262 
263  this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
264  this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
265 
266  this->InitializeBuffers();
267 }
268 
269 VTKM_CONT
270 template <typename ParticleType>
271 void ParticleMessenger<ParticleType>::SendMsg(int dst, const std::vector<int>& msg)
272 {
273  vtkmdiy::MemoryBuffer buff;
274 
275  //Write data.
276  vtkmdiy::save(buff, this->GetRank());
277  vtkmdiy::save(buff, msg);
278  this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
279 }
280 
281 VTKM_CONT
282 template <typename ParticleType>
283 void ParticleMessenger<ParticleType>::SendAllMsg(const std::vector<int>& msg)
284 {
285  for (int i = 0; i < this->GetNumRanks(); i++)
286  if (i != this->GetRank())
287  this->SendMsg(i, msg);
288 }
289 
290 VTKM_CONT
291 template <typename ParticleType>
292 bool ParticleMessenger<ParticleType>::RecvAny(std::vector<MsgCommType>* msgs,
293  std::vector<ParticleRecvCommType>* recvParticles,
294  bool blockAndWait)
295 {
296  std::set<int> tags;
297  if (msgs)
298  {
299  tags.insert(ParticleMessenger::MESSAGE_TAG);
300  msgs->resize(0);
301  }
302  if (recvParticles)
303  {
304  tags.insert(ParticleMessenger::PARTICLE_TAG);
305  recvParticles->resize(0);
306  }
307 
308  if (tags.empty())
309  return false;
310 
311  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> buffers;
312  if (!this->RecvData(tags, buffers, blockAndWait))
313  return false;
314 
315  for (auto& buff : buffers)
316  {
317  if (buff.first == ParticleMessenger::MESSAGE_TAG)
318  {
319  int sendRank;
320  std::vector<int> m;
321  vtkmdiy::load(buff.second, sendRank);
322  vtkmdiy::load(buff.second, m);
323  msgs->emplace_back(std::make_pair(sendRank, m));
324  }
325  else if (buff.first == ParticleMessenger::PARTICLE_TAG)
326  {
327  int sendRank;
328  std::vector<ParticleCommType> particles;
329 
330  vtkmdiy::load(buff.second, sendRank);
331  vtkmdiy::load(buff.second, particles);
332  recvParticles->emplace_back(std::make_pair(sendRank, particles));
333  }
334  }
335 
336  return true;
337 }
338 
339 VTKM_CONT
340 template <typename ParticleType>
341 template <typename P, template <typename, typename> class Container, typename Allocator>
342 inline void ParticleMessenger<ParticleType>::SendParticles(int dst,
343  const Container<P, Allocator>& c)
344 {
345  if (dst == this->GetRank())
346  {
347  VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
348  return;
349  }
350  if (c.empty())
351  return;
352 
353  vtkmdiy::MemoryBuffer bb;
354  vtkmdiy::save(bb, this->GetRank());
355  vtkmdiy::save(bb, c);
356  this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
357 }
358 
359 VTKM_CONT
360 template <typename ParticleType>
361 template <typename P, template <typename, typename> class Container, typename Allocator>
362 inline void ParticleMessenger<ParticleType>::SendParticles(
363  const std::unordered_map<int, Container<P, Allocator>>& m)
364 {
365  for (const auto& mit : m)
366  if (!mit.second.empty())
367  this->SendParticles(mit.first, mit.second);
368 }
369 #endif
370 
371 }
372 }
373 }
374 } // vtkm::filter::flow::internal
375 
376 #endif // vtk_m_filter_flow_internal_ParticleMessenger_h
vtkm::exec::arg::load
T load(const U &u, vtkm::Id v)
Definition: FetchTagArrayDirectIn.h:36
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
VTKM_ENABLE_MPI
#define VTKM_ENABLE_MPI
Definition: Configure.h:309
VTKM_CONT
#define VTKM_CONT
Definition: ExportMacros.h:57
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
vtkmNotUsed
#define vtkmNotUsed(parameter_name)
Simple macro to identify a parameter as unused.
Definition: ExportMacros.h:128
vtkm::cont::LogLevel::Error
@ Error
Important but non-fatal errors, such as device fail-over.
VTKM_LOG_S
#define VTKM_LOG_S(level,...)
Writes a message using stream syntax to the indicated log level.
Definition: Logging.h:208
VTKM_FILTER_FLOW_EXPORT
#define VTKM_FILTER_FLOW_EXPORT
Definition: vtkm_filter_flow_export.h:44
vtkm_filter_flow_export.h
Messenger.h
Particle.h