11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
30 template <
typename DSIType>
31 class AdvectAlgorithmThreaded :
public AdvectAlgorithm<DSIType>
34 using ParticleType =
typename DSIType::PType;
36 AdvectAlgorithmThreaded(
const vtkm::filter::flow::internal::BoundsMap& bm,
37 std::vector<DSIType>& blocks)
38 : AdvectAlgorithm<DSIType>(bm, blocks)
44 for (
auto& block : this->Blocks)
45 block.SetCopySeedFlag(
true);
50 std::vector<std::thread> workerThreads;
51 workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker,
this));
56 for (
auto& t : workerThreads)
61 bool HaveWork()
override
63 std::lock_guard<std::mutex> lock(this->Mutex);
64 return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
67 virtual bool GetDone()
override
69 std::lock_guard<std::mutex> lock(this->Mutex);
70 #ifndef VTKM_ENABLE_MPI
71 return !this->CheckHaveWork();
73 return this->Terminator.Done();
79 std::lock_guard<std::mutex> lock(this->Mutex);
83 bool GetActiveParticles(std::vector<ParticleType>& particles,
vtkm::Id& blockId)
override
85 std::lock_guard<std::mutex> lock(this->Mutex);
86 bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
87 this->WorkerActivate = val;
91 void UpdateActive(
const std::vector<ParticleType>& particles,
92 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& idsMap)
override
94 if (!particles.empty())
96 std::lock_guard<std::mutex> lock(this->Mutex);
97 this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
100 this->WorkerActivateCondition.notify_all();
101 this->WorkerActivate =
true;
107 std::lock_guard<std::mutex> lock(this->Mutex);
109 this->WorkerActivateCondition.notify_all();
112 static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
116 std::unique_lock<std::mutex> lock(this->Mutex);
117 this->WorkerActivateCondition.wait(lock, [
this] {
return WorkerActivate || Done; });
120 void UpdateWorkerResult(
vtkm::Id blockId, DSIHelperInfo<ParticleType>& b)
122 std::lock_guard<std::mutex> lock(this->Mutex);
123 auto& it = this->WorkerResults[blockId];
129 while (!this->WorkerGetDone())
131 std::vector<ParticleType> v;
133 if (this->GetActiveParticles(v, blockId))
135 auto& block = this->GetDataSet(blockId);
136 DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
137 block.Advect(bb, this->StepSize);
138 this->UpdateWorkerResult(blockId, bb);
147 while (!this->GetDone())
149 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
150 this->GetWorkerResults(workerResults);
153 for (
auto& it : workerResults)
154 for (
auto& r : it.second)
155 numTerm += this->UpdateResult(r);
157 this->ExchangeParticles();
162 void GetWorkerResults(
163 std::unordered_map<
vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
167 std::lock_guard<std::mutex> lock(this->Mutex);
168 if (!this->WorkerResults.empty())
170 results = this->WorkerResults;
171 this->WorkerResults.clear();
178 return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
181 std::atomic<bool> Done;
183 bool WorkerActivate =
false;
184 std::condition_variable WorkerActivateCondition;
185 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
193 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h