11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
31 template <
typename DSIType>
32 class AdvectAlgorithmThreaded :
public AdvectAlgorithm<DSIType>
35 using ParticleType =
typename DSIType::PType;
37 AdvectAlgorithmThreaded(
const vtkm::filter::flow::internal::BoundsMap& bm,
38 std::vector<DSIType>& blocks,
40 : AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
42 , WorkerActivate(false)
47 for (
auto& block : this->Blocks)
48 block.SetCopySeedFlag(
true);
53 this->ComputeTotalNumParticles();
55 std::vector<std::thread> workerThreads;
56 workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker,
this));
61 for (
auto& t : workerThreads)
66 bool GetActiveParticles(std::vector<ParticleType>& particles,
vtkm::Id& blockId)
override
68 std::lock_guard<std::mutex> lock(this->Mutex);
69 bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
70 this->WorkerActivate = val;
74 void UpdateActive(
const std::vector<ParticleType>& particles,
75 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& idsMap)
override
77 if (!particles.empty())
79 std::lock_guard<std::mutex> lock(this->Mutex);
80 this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
83 this->WorkerActivateCondition.notify_all();
84 this->WorkerActivate =
true;
90 std::lock_guard<std::mutex> lock(this->Mutex);
96 std::lock_guard<std::mutex> lock(this->Mutex);
98 this->WorkerActivateCondition.notify_all();
101 static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
105 std::unique_lock<std::mutex> lock(this->Mutex);
106 this->WorkerActivateCondition.wait(lock, [
this] {
return WorkerActivate || Done; });
109 void UpdateWorkerResult(
vtkm::Id blockId, DSIHelperInfo<ParticleType>& b)
111 std::lock_guard<std::mutex> lock(this->Mutex);
112 auto& it = this->WorkerResults[blockId];
118 while (!this->CheckDone())
120 std::vector<ParticleType> v;
122 if (this->GetActiveParticles(v, blockId))
124 auto& block = this->GetDataSet(blockId);
125 DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
126 block.Advect(bb, this->StepSize);
127 this->UpdateWorkerResult(blockId, bb);
136 if (!this->UseAsynchronousCommunication)
139 "Synchronous communication not supported for AdvectAlgorithmThreaded."
140 "Forcing asynchronous communication.");
143 bool useAsync =
true;
144 vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
145 this->Comm, useAsync, this->BoundsMap, 1, 128);
147 while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
149 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
150 this->GetWorkerResults(workerResults);
153 for (
auto& it : workerResults)
155 for (
auto& r : it.second)
156 numTerm += this->UpdateResult(r);
160 this->Communicate(messenger, numTerm, numTermMessages);
162 this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
163 if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
171 bool GetBlockAndWait(
const bool& syncComm,
const vtkm::Id& numLocalTerm)
override
173 std::lock_guard<std::mutex> lock(this->Mutex);
177 return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
178 !this->WorkerActivate && this->WorkerResults.empty());
181 void GetWorkerResults(
182 std::unordered_map<
vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
186 std::lock_guard<std::mutex> lock(this->Mutex);
187 if (!this->WorkerResults.empty())
189 results = this->WorkerResults;
190 this->WorkerResults.clear();
194 std::atomic<bool> Done;
197 std::condition_variable WorkerActivateCondition;
198 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
206 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h