11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithm_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithm_h
28 template <
typename DSIType>
32 using ParticleType =
typename DSIType::PType;
34 AdvectAlgorithm(
const vtkm::filter::flow::internal::BoundsMap& bm,
35 std::vector<DSIType>& blocks,
39 , NumRanks(this->Comm.size())
40 , Rank(this->Comm.rank())
41 , UseAsynchronousCommunication(useAsyncComm)
47 this->SetStepSize(stepSize);
48 this->SetSeeds(seeds);
55 for (
const auto& b : this->Blocks)
68 this->ClearParticles();
73 std::vector<std::vector<vtkm::Id>> blockIDs;
74 std::vector<ParticleType> particles;
77 const ParticleType p = portal.Get(i);
78 std::vector<vtkm::Id> ids = this->BoundsMap.FindBlocks(p.GetPosition());
83 auto ranks = this->BoundsMap.FindRank(ids[0]);
84 if (!ranks.empty() && this->Rank == ranks[0])
86 particles.emplace_back(p);
87 blockIDs.emplace_back(ids);
91 this->SetSeedArray(particles, blockIDs);
97 vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
98 this->Comm, this->UseAsynchronousCommunication, this->BoundsMap, 1, 128);
100 this->ComputeTotalNumParticles();
102 while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
104 std::vector<ParticleType> v;
106 if (this->GetActiveParticles(v, blockId))
109 auto& block = this->GetDataSet(blockId);
110 DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
111 block.Advect(bb, this->StepSize);
112 numTerm = this->UpdateResult(bb);
116 this->Communicate(messenger, numTerm, numTermMessages);
118 this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
119 if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
124 virtual void ClearParticles()
126 this->Active.clear();
127 this->Inactive.clear();
128 this->ParticleBlockIDsMap.clear();
131 void ComputeTotalNumParticles()
134 for (
const auto& it : this->Active)
135 numLocal += it.second.size();
137 #ifdef VTKM_ENABLE_MPI
138 vtkmdiy::mpi::all_reduce(this->Comm, numLocal, this->TotalNumParticles, std::plus<vtkm::Id>{});
140 this->TotalNumParticles = numLocal;
144 DataSetIntegrator<DSIType, ParticleType>& GetDataSet(
vtkm::Id id)
146 for (
auto& it : this->Blocks)
147 if (it.GetID() ==
id)
153 virtual void SetSeedArray(
const std::vector<ParticleType>& particles,
154 const std::vector<std::vector<vtkm::Id>>& blockIds)
158 auto pit = particles.begin();
159 auto bit = blockIds.begin();
160 while (pit != particles.end() && bit != blockIds.end())
163 this->ParticleBlockIDsMap[pit->GetID()] = *bit;
164 if (this->Active.find(blockId0) == this->Active.end())
165 this->Active[blockId0] = { *pit };
167 this->Active[blockId0].emplace_back(*pit);
173 virtual bool GetActiveParticles(std::vector<ParticleType>& particles,
vtkm::Id& blockId)
177 if (this->Active.empty())
181 if (this->Active.size() == 1)
183 blockId = this->Active.begin()->first;
184 particles = std::move(this->Active.begin()->second);
185 this->Active.clear();
190 std::size_t maxNum = 0;
191 auto maxIt = this->Active.end();
192 for (
auto it = this->Active.begin(); it != this->Active.end(); it++)
194 auto sz = it->second.size();
204 this->Active.clear();
208 blockId = maxIt->first;
209 particles = std::move(maxIt->second);
210 this->Active.erase(maxIt);
213 return !particles.empty();
216 void Communicate(vtkm::filter::flow::internal::ParticleMessenger<ParticleType>& messenger,
220 std::vector<ParticleType> outgoing;
221 std::vector<vtkm::Id> outgoingRanks;
223 this->GetOutgoingParticles(outgoing, outgoingRanks);
225 std::vector<ParticleType> incoming;
226 std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> incomingBlockIDs;
229 #ifdef VTKM_ENABLE_MPI
230 block = this->GetBlockAndWait(messenger.UsingSyncCommunication(), numLocalTerminations);
233 messenger.Exchange(outgoing,
235 this->ParticleBlockIDsMap,
236 numLocalTerminations,
243 for (
const auto& p : outgoing)
244 this->ParticleBlockIDsMap.erase(p.GetID());
246 this->UpdateActive(incoming, incomingBlockIDs);
249 void GetOutgoingParticles(std::vector<ParticleType>& outgoing,
250 std::vector<vtkm::Id>& outgoingRanks)
253 outgoingRanks.clear();
255 outgoing.reserve(this->Inactive.size());
256 outgoingRanks.reserve(this->Inactive.size());
258 std::vector<ParticleType> particlesStaying;
259 std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> particlesStayingBlockIDs;
261 for (
const auto& p : this->Inactive)
263 const auto& bid = this->ParticleBlockIDsMap[p.GetID()];
266 auto ranks = this->BoundsMap.FindRank(bid[0]);
269 if (ranks.size() == 1)
271 if (ranks[0] == this->Rank)
273 particlesStaying.emplace_back(p);
274 particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
278 outgoing.emplace_back(p);
279 outgoingRanks.emplace_back(ranks[0]);
287 vtkm::Id outRank = std::rand() % ranks.size();
288 if (outRank == this->Rank)
290 particlesStayingBlockIDs[p.GetID()] = this->ParticleBlockIDsMap[p.GetID()];
291 particlesStaying.emplace_back(p);
295 outgoing.emplace_back(p);
296 outgoingRanks.emplace_back(outRank);
301 this->Inactive.clear();
302 VTKM_ASSERT(outgoing.size() == outgoingRanks.size());
304 VTKM_ASSERT(particlesStaying.size() == particlesStayingBlockIDs.size());
305 if (!particlesStaying.empty())
306 this->UpdateActive(particlesStaying, particlesStayingBlockIDs);
309 virtual void UpdateActive(
const std::vector<ParticleType>& particles,
310 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& idsMap)
314 for (
auto pit = particles.begin(); pit != particles.end(); pit++)
317 const auto& it = idsMap.find(particleID);
318 VTKM_ASSERT(it != idsMap.end() && !it->second.empty());
320 this->Active[blockId].emplace_back(*pit);
323 for (
const auto& it : idsMap)
324 this->ParticleBlockIDsMap[it.first] = it.second;
327 virtual void UpdateInactive(
const std::vector<ParticleType>& particles,
328 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& idsMap)
332 this->Inactive.insert(this->Inactive.end(), particles.begin(), particles.end());
333 for (
const auto& it : idsMap)
334 this->ParticleBlockIDsMap[it.first] = it.second;
337 vtkm::Id UpdateResult(
const DSIHelperInfo<ParticleType>& stuff)
339 this->UpdateActive(stuff.InBounds.Particles, stuff.InBounds.BlockIDs);
340 this->UpdateInactive(stuff.OutOfBounds.Particles, stuff.OutOfBounds.BlockIDs);
346 for (
const auto&
id : stuff.TermID)
347 this->ParticleBlockIDsMap.erase(
id);
354 virtual bool GetBlockAndWait(
const bool& syncComm,
const vtkm::Id& numLocalTerm)
356 bool haveNoWork = this->Active.empty() && this->Inactive.empty();
371 (numLocalTerm + this->TotalNumTerminatedParticles < this->TotalNumParticles))
380 std::unordered_map<vtkm::Id, std::vector<ParticleType>> Active;
381 std::vector<DSIType> Blocks;
382 vtkm::filter::flow::internal::BoundsMap BoundsMap;
384 std::vector<ParticleType> Inactive;
388 std::unordered_map<vtkm::Id, std::vector<vtkm::Id>> ParticleBlockIDsMap;
392 vtkm::Id TotalNumTerminatedParticles = 0;
393 bool UseAsynchronousCommunication =
true;
401 #endif //vtk_m_filter_flow_internal_AdvectAlgorithm_h