11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
31 template <
typename DSIType,
template <
typename>
class ResultType,
typename ParticleType>
32 class AdvectAlgorithmThreaded :
public AdvectAlgorithm<DSIType, ResultType, ParticleType>
35 AdvectAlgorithmThreaded(
const vtkm::filter::flow::internal::BoundsMap& bm,
36 std::vector<DSIType>& blocks)
37 : AdvectAlgorithm<DSIType, ResultType, ParticleType>(bm, blocks)
39 , WorkerActivate(false)
44 for (
auto& block : this->Blocks)
45 block.SetCopySeedFlag(
true);
50 vtkm::Id nLocal =
static_cast<vtkm::Id>(this->Active.size() + this->Inactive.size());
51 this->ComputeTotalNumParticles(nLocal);
53 std::vector<std::thread> workerThreads;
54 workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker,
this));
59 for (
auto& t : workerThreads)
64 bool GetActiveParticles(std::vector<ParticleType>& particles,
vtkm::Id& blockId)
override
66 std::lock_guard<std::mutex> lock(this->Mutex);
67 bool val = this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetActiveParticles(
69 this->WorkerActivate = val;
73 void UpdateActive(
const std::vector<ParticleType>& particles,
74 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& idsMap)
override
76 if (!particles.empty())
78 std::lock_guard<std::mutex> lock(this->Mutex);
79 this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::UpdateActive(particles, idsMap);
82 this->WorkerActivateCondition.notify_all();
83 this->WorkerActivate =
true;
89 std::lock_guard<std::mutex> lock(this->Mutex);
95 std::lock_guard<std::mutex> lock(this->Mutex);
97 this->WorkerActivateCondition.notify_all();
100 static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
104 std::unique_lock<std::mutex> lock(this->Mutex);
105 this->WorkerActivateCondition.wait(lock, [
this] {
return WorkerActivate || Done; });
108 void UpdateWorkerResult(
vtkm::Id blockId, DSIHelperInfoType& b)
110 std::lock_guard<std::mutex> lock(this->Mutex);
111 auto& it = this->WorkerResults[blockId];
117 while (!this->CheckDone())
119 std::vector<ParticleType> v;
121 if (this->GetActiveParticles(v, blockId))
123 auto& block = this->GetDataSet(blockId);
124 DSIHelperInfoType bb =
125 DSIHelperInfo<ParticleType>(v, this->BoundsMap, this->ParticleBlockIDsMap);
126 block.Advect(bb, this->StepSize, this->NumberOfSteps);
127 this->UpdateWorkerResult(blockId, bb);
136 vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
137 this->Comm, this->BoundsMap, 1, 128);
139 while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
141 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> workerResults;
142 this->GetWorkerResults(workerResults);
145 for (
auto& it : workerResults)
147 for (
auto& r : it.second)
148 numTerm += this->UpdateResult(r.Get<DSIHelperInfo<ParticleType>>());
152 this->Communicate(messenger, numTerm, numTermMessages);
154 this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
155 if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
163 bool GetBlockAndWait(
const vtkm::Id& numLocalTerm)
override
165 std::lock_guard<std::mutex> lock(this->Mutex);
168 this->AdvectAlgorithm<DSIType, ResultType, ParticleType>::GetBlockAndWait(numLocalTerm) &&
169 !this->WorkerActivate && this->WorkerResults.empty());
172 void GetWorkerResults(std::unordered_map<
vtkm::Id, std::vector<DSIHelperInfoType>>& results)
176 std::lock_guard<std::mutex> lock(this->Mutex);
177 if (!this->WorkerResults.empty())
179 results = this->WorkerResults;
180 this->WorkerResults.clear();
184 std::atomic<bool> Done;
187 std::condition_variable WorkerActivateCondition;
188 std::unordered_map<vtkm::Id, std::vector<DSIHelperInfoType>> WorkerResults;
196 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h