VTK-m  2.2
AdvectAlgorithm.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithm_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithm_h
13 
18 
19 namespace vtkm
20 {
21 namespace filter
22 {
23 namespace flow
24 {
25 namespace internal
26 {
27 
28 template <typename DSIType>
29 class AdvectAlgorithm
30 {
31 public:
32  using ParticleType = typename DSIType::PType;
33 
34  AdvectAlgorithm(const vtkm::filter::flow::internal::BoundsMap& bm,
35  std::vector<DSIType>& blocks,
36  bool useAsyncComm)
37  : Blocks(blocks)
38  , BoundsMap(bm)
39  , NumRanks(this->Comm.size())
40  , Rank(this->Comm.rank())
41  , UseAsynchronousCommunication(useAsyncComm)
42  {
43  }
44 
45  void Execute(const vtkm::cont::ArrayHandle<ParticleType>& seeds, vtkm::FloatDefault stepSize)
46  {
47  this->SetStepSize(stepSize);
48  this->SetSeeds(seeds);
49  this->Go();
50  }
51 
52  vtkm::cont::PartitionedDataSet GetOutput() const
53  {
55  for (const auto& b : this->Blocks)
56  {
58  if (b.GetOutput(ds))
59  output.AppendPartition(ds);
60  }
61  return output;
62  }
63 
64  void SetStepSize(vtkm::FloatDefault stepSize) { this->StepSize = stepSize; }
65 
66  void SetSeeds(const vtkm::cont::ArrayHandle<ParticleType>& seeds)
67  {
68  this->ClearParticles();
69 
70  vtkm::Id n = seeds.GetNumberOfValues();
71  auto portal = seeds.ReadPortal();
72 
73  std::vector<std::vector<vtkm::Id>> blockIDs;
74  std::vector<ParticleType> particles;
75  for (vtkm::Id i = 0; i < n; i++)
76  {
77  const ParticleType p = portal.Get(i);
78  std::vector<vtkm::Id> ids = this->BoundsMap.FindBlocks(p.GetPosition());
79 
80  //Note: For duplicate blocks, this will give the seeds to the rank that are first in the list.
81  if (!ids.empty())
82  {
83  auto ranks = this->BoundsMap.FindRank(ids[0]);
84  if (!ranks.empty() && this->Rank == ranks[0])
85  {
86  particles.emplace_back(p);
87  blockIDs.emplace_back(ids);
88  }
89  }
90  }
91  this->SetSeedArray(particles, blockIDs);
92  }
93 
94  //Advect all the particles.
95  virtual void Go()
96  {
97  vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
98  this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
99 
100  this->ComputeTotalNumParticles();
101 
102  while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
103  {
104  std::vector<ParticleType> v;
105  vtkm::Id numTerm = 0, blockId = -1;
106  if (this->GetActiveParticles(v, blockId))
107  {
108  //make this a pointer to avoid the copy?
109  auto& block = this->GetDataSet(blockId);
110  DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
111  block.Advect(bb, this->StepSize);
112  numTerm = this->UpdateResult(bb);
113  }
114 
115  vtkm::Id numTermMessages = 0;
116  this->Communicate(messenger, numTerm, numTermMessages);
117 
118  this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
119  if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
120  throw vtkm::cont::ErrorFilterExecution("Particle count error");
121  }
122  }
123 
124  virtual void ClearParticles()
125  {
126  this->Active.clear();
127  this->Inactive.clear();
128  this->ParticleBlockIDsMap.clear();
129  }
130 
131  void ComputeTotalNumParticles()
132  {
133  vtkm::Id numLocal = static_cast<vtkm::Id>(this->Inactive.size());
134  for (const auto& it : this->Active)
135  numLocal += it.second.size();
136 
137 #ifdef VTKM_ENABLE_MPI
138  vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
139 #else
140  this->TotalNumParticles = numLocal;
141 #endif
142  }
143 
144  DataSetIntegrator<DSIType, ParticleType>& GetDataSet(vtkm::Id id)
145  {
146  for (auto& it : this->Blocks)
147  if (it.GetID() == id)
148  return it;
149 
150  throw vtkm::cont::ErrorFilterExecution("Bad block");
151  }
152 
153  virtual void SetSeedArray(const std::vector<ParticleType>& particles,
154  const std::vector<std::vector<vtkm::Id>>& blockIds)
155  {
156  VTKM_ASSERT(particles.size() == blockIds.size());
157 
158  auto pit = particles.begin();
159  auto bit = blockIds.begin();
160  while (pit != particles.end() && bit != blockIds.end())
161  {
162  vtkm::Id blockId0 = (*bit)[0];
163  this->ParticleBlockIDsMap[pit->GetID()] = *bit;
164  if (this->Active.find(blockId0) == this->Active.end())
165  this->Active[blockId0] = { *pit };
166  else
167  this->Active[blockId0].emplace_back(*pit);
168  pit++;
169  bit++;
170  }
171  }
172 
173  virtual bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId)
174  {
175  particles.clear();
176  blockId = -1;
177  if (this->Active.empty())
178  return false;
179 
180  //If only one, return it.
181  if (this->Active.size() == 1)
182  {
183  blockId = this->Active.begin()->first;
184  particles = std::move(this->Active.begin()->second);
185  this->Active.clear();
186  }
187  else
188  {
189  //Find the blockId with the most particles.
190  std::size_t maxNum = 0;
191  auto maxIt = this->Active.end();
192  for (auto it = this->Active.begin(); it != this->Active.end(); it++)
193  {
194  auto sz = it->second.size();
195  if (sz > maxNum)
196  {
197  maxNum = sz;
198  maxIt = it;
199  }
200  }
201 
202  if (maxNum == 0)
203  {
204  this->Active.clear();
205  return false;
206  }
207 
208  blockId = maxIt->first;
209  particles = std::move(maxIt->second);
210  this->Active.erase(maxIt);
211  }
212 
213  return !particles.empty();
214  }
215 
216  void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
217  vtkm::Id numLocalTerminations,
218  vtkm::Id& numTermMessages)
219  {
220  std::vector<ParticleType> outgoing;
221  std::vector<vtkm::Id> outgoingRanks;
222 
223  this->GetOutgoingParticles(outgoing, outgoingRanks);
224 
225  std::vector<ParticleType> incoming;
226  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
227  numTermMessages = 0;
228  bool block = false;
229 #ifdef VTKM_ENABLE_MPI
230  block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
231 #endif
232 
233  messenger.Exchange(outgoing,
234  outgoingRanks,
235  this->ParticleBlockIDsMap,
236  numLocalTerminations,
237  incoming,
238  incomingBlockIDs,
239  numTermMessages,
240  block);
241 
242  //Cleanup what was sent.
243  for (const auto& p : outgoing)
244  this->ParticleBlockIDsMap.erase(p.GetID());
245 
246  this->UpdateActive(incoming, incomingBlockIDs);
247  }
248 
249  void GetOutgoingParticles(std::vector<ParticleType>& outgoing,
250  std::vector<vtkm::Id>& outgoingRanks)
251  {
252  outgoing.clear();
253  outgoingRanks.clear();
254 
255  outgoing.reserve(this->Inactive.size());
256  outgoingRanks.reserve(this->Inactive.size());
257 
258  std::vector<ParticleType> particlesStaying;
259  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> particlesStayingBlockIDs;
260  //Send out Everything.
261  for (const auto& p : this->Inactive)
262  {
263  const auto& bid = this->ParticleBlockIDsMap[p.GetID()];
264  VTKM_ASSERT(!bid.empty());
265 
266  auto ranks = this->BoundsMap.FindRank(bid[0]);
267  VTKM_ASSERT(!ranks.empty());
268 
269  if (ranks.size() == 1)
270  {
271  if (ranks[0] == this->Rank)
272  {
273  particlesStaying.emplace_back(p);
274  particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
275  }
276  else
277  {
278  outgoing.emplace_back(p);
279  outgoingRanks.emplace_back(ranks[0]);
280  }
281  }
282  else
283  {
284  //Decide where it should go...
285 
286  //Random selection:
287  vtkm::Id outRank = std::rand() % ranks.size();
288  if (outRank == this->Rank)
289  {
290  particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
291  particlesStaying.emplace_back(p);
292  }
293  else
294  {
295  outgoing.emplace_back(p);
296  outgoingRanks.emplace_back(outRank);
297  }
298  }
299  }
300 
301  this->Inactive.clear();
302  VTKM_ASSERT(outgoing.size() == outgoingRanks.size());
303 
304  VTKM_ASSERT(particlesStaying.size() == particlesStayingBlockIDs.size());
305  if (!particlesStaying.empty())
306  this->UpdateActive(particlesStaying, particlesStayingBlockIDs);
307  }
308 
309  virtual void UpdateActive(const std::vector<ParticleType>& particles,
310  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap)
311  {
312  VTKM_ASSERT(particles.size() == idsMap.size());
313 
314  for (auto pit = particles.begin(); pit != particles.end(); pit++)
315  {
316  vtkm::Id particleID = pit->GetID();
317  const auto& it = idsMap.find(particleID);
318  VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
319  vtkm::Id blockId = it->second[0];
320  this->Active[blockId].emplace_back(*pit);
321  }
322 
323  for (const auto& it : idsMap)
324  this->ParticleBlockIDsMap[it.first] = it.second;
325  }
326 
327  virtual void UpdateInactive(const std::vector<ParticleType>& particles,
328  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap)
329  {
330  VTKM_ASSERT(particles.size() == idsMap.size());
331 
332  this->Inactive.insert(this->Inactive.end(), particles.begin(), particles.end());
333  for (const auto& it : idsMap)
334  this->ParticleBlockIDsMap[it.first] = it.second;
335  }
336 
337  vtkm::Id UpdateResult(const DSIHelperInfo<ParticleType>& stuff)
338  {
339  this->UpdateActive(stuff.InBounds.Particles, stuff.InBounds.BlockIDs);
340  this->UpdateInactive(stuff.OutOfBounds.Particles, stuff.OutOfBounds.BlockIDs);
341 
342  vtkm::Id numTerm = static_cast<vtkm::Id>(stuff.TermID.size());
343  //Update terminated particles.
344  if (numTerm > 0)
345  {
346  for (const auto& id : stuff.TermID)
347  this->ParticleBlockIDsMap.erase(id);
348  }
349 
350  return numTerm;
351  }
352 
353 
354  virtual bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm)
355  {
356  bool haveNoWork = this->Active.empty() && this->Inactive.empty();
357 
358  //Using syncronous communication we should only block and wait if we have no particles
359  if (syncComm)
360  {
361  return haveNoWork;
362  }
363  else
364  {
365  //Otherwise, for asyncronous communication, there are only two cases where blocking would deadlock.
366  //1. There are active particles.
367  //2. numLocalTerm + this->TotalNumberOfTerminatedParticles == this->TotalNumberOfParticles
368  //So, if neither are true, we can safely block and wait for communication to come in.
369 
370  if (haveNoWork &&
371  (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
372  return true;
373 
374  return false;
375  }
376  }
377 
378  //Member data
379  // {blockId, std::vector of particles}
380  std::unordered_map<vtkm::Id, std::vector<ParticleType>> Active;
381  std::vector<DSIType> Blocks;
382  vtkm::filter::flow::internal::BoundsMap BoundsMap;
383  vtkmdiy::mpi::communicator Comm = vtkm::cont::EnvironmentTracker::GetCommunicator();
384  std::vector<ParticleType> Inactive;
385  vtkm::Id MaxNumberOfSteps = 0;
386  vtkm::Id NumRanks;
387  //{particleId : {block IDs}}
388  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
389  vtkm::Id Rank;
390  vtkm::FloatDefault StepSize;
391  vtkm::Id TotalNumParticles = 0;
392  vtkm::Id TotalNumTerminatedParticles = 0;
393  bool UseAsynchronousCommunication = true;
394 };
395 
396 }
397 }
398 }
399 } //vtkm::filter::flow::internal
400 
401 #endif //vtk_m_filter_flow_internal_AdvectAlgorithm_h
vtkm::cont::ArrayHandle
Manages an array-worth of data.
Definition: ArrayHandle.h:300
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
vtkm::cont::ErrorFilterExecution
This class is primarily intended to filters to throw in the control environment to indicate an execut...
Definition: ErrorFilterExecution.h:27
vtkm::cont::DataSet
Contains and manages the geometric data structures that VTK-m operates on.
Definition: DataSet.h:57
vtkm::cont::ArrayHandle::GetNumberOfValues
vtkm::Id GetNumberOfValues() const
Returns the number of entries in the array.
Definition: ArrayHandle.h:468
DataSetIntegrator.h
ParticleMessenger.h
vtkm::cont::PartitionedDataSet::AppendPartition
void AppendPartition(const vtkm::cont::DataSet &ds)
Add DataSet ds to the end of the list of partitions.
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
vtkm::cont::EnvironmentTracker::GetCommunicator
static const vtkmdiy::mpi::communicator & GetCommunicator()
vtkm::cont::ArrayHandle::ReadPortal
ReadPortalType ReadPortal() const
Get an array portal that can be used in the control environment.
Definition: ArrayHandle.h:433
vtkm::FloatDefault
vtkm::Float32 FloatDefault
The floating point type to use when no other precision is specified.
Definition: Types.h:236
PartitionedDataSet.h
vtkm::cont::PartitionedDataSet
Comprises a set of vtkm::cont::DataSet objects.
Definition: PartitionedDataSet.h:26