VTK-m  2.2
AdvectAlgorithmThreaded.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
13 
18 
19 #include <thread>
20 
21 namespace vtkm
22 {
23 namespace filter
24 {
25 namespace flow
26 {
27 namespace internal
28 {
29 
30 template <typename DSIType>
31 class AdvectAlgorithmThreaded : public AdvectAlgorithm<DSIType>
32 {
33 public:
34  using ParticleType = typename DSIType::PType;
35 
36  AdvectAlgorithmThreaded(const vtkm::filter::flow::internal::BoundsMap& bm,
37  std::vector<DSIType>& blocks)
38  : AdvectAlgorithm<DSIType>(bm, blocks)
39  , Done(false)
40  {
41  //For threaded algorithm, the particles go out of scope in the Work method.
42  //When this happens, they are destructed by the time the Manage thread gets them.
43  //Set the copy flag so the std::vector is copied into the ArrayHandle
44  for (auto& block : this->Blocks)
45  block.SetCopySeedFlag(true);
46  }
47 
48  void Go() override
49  {
50  std::vector<std::thread> workerThreads;
51  workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
52  this->Manage();
53 
54  //This will only work for 1 thread. For > 1, the Blocks will need a mutex.
55  VTKM_ASSERT(workerThreads.size() == 1);
56  for (auto& t : workerThreads)
57  t.join();
58  }
59 
60 protected:
61  bool HaveWork() override
62  {
63  std::lock_guard<std::mutex> lock(this->Mutex);
64  return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
65  }
66 
67  virtual bool GetDone() override
68  {
69  std::lock_guard<std::mutex> lock(this->Mutex);
70 #ifndef VTKM_ENABLE_MPI
71  return !this->CheckHaveWork();
72 #else
73  return this->Terminator.Done();
74 #endif
75  }
76 
77  bool WorkerGetDone()
78  {
79  std::lock_guard<std::mutex> lock(this->Mutex);
80  return this->Done;
81  }
82 
83  bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
84  {
85  std::lock_guard<std::mutex> lock(this->Mutex);
86  bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
87  this->WorkerActivate = val;
88  return val;
89  }
90 
91  void UpdateActive(const std::vector<ParticleType>& particles,
92  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
93  {
94  if (!particles.empty())
95  {
96  std::lock_guard<std::mutex> lock(this->Mutex);
97  this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
98 
99  //Let workers know there is new work
100  this->WorkerActivateCondition.notify_all();
101  this->WorkerActivate = true;
102  }
103  }
104 
105  void SetDone()
106  {
107  std::lock_guard<std::mutex> lock(this->Mutex);
108  this->Done = true;
109  this->WorkerActivateCondition.notify_all();
110  }
111 
112  static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
113 
114  void WorkerWait()
115  {
116  std::unique_lock<std::mutex> lock(this->Mutex);
117  this->WorkerActivateCondition.wait(lock, [this] { return WorkerActivate || Done; });
118  }
119 
120  void UpdateWorkerResult(vtkm::Id blockId, DSIHelperInfo<ParticleType>& b)
121  {
122  std::lock_guard<std::mutex> lock(this->Mutex);
123  auto& it = this->WorkerResults[blockId];
124  it.emplace_back(b);
125  }
126 
127  void Work()
128  {
129  while (!this->WorkerGetDone())
130  {
131  std::vector<ParticleType> v;
132  vtkm::Id blockId = -1;
133  if (this->GetActiveParticles(v, blockId))
134  {
135  auto& block = this->GetDataSet(blockId);
136  DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
137  block.Advect(bb, this->StepSize);
138  this->UpdateWorkerResult(blockId, bb);
139  }
140  else
141  this->WorkerWait();
142  }
143  }
144 
145  void Manage()
146  {
147  while (!this->GetDone())
148  {
149  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
150  this->GetWorkerResults(workerResults);
151 
152  vtkm::Id numTerm = 0;
153  for (auto& it : workerResults)
154  for (auto& r : it.second)
155  numTerm += this->UpdateResult(r);
156 
157  this->ExchangeParticles();
158  }
159  this->SetDone();
160  }
161 
162  void GetWorkerResults(
163  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
164  {
165  results.clear();
166 
167  std::lock_guard<std::mutex> lock(this->Mutex);
168  if (!this->WorkerResults.empty())
169  {
170  results = this->WorkerResults;
171  this->WorkerResults.clear();
172  }
173  }
174 
175 private:
176  bool CheckHaveWork()
177  {
178  return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
179  }
180 
181  std::atomic<bool> Done;
182  std::mutex Mutex;
183  bool WorkerActivate = false;
184  std::condition_variable WorkerActivateCondition;
185  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
186 };
187 
188 }
189 }
190 }
191 } //vtkm::filter::flow::internal
192 
193 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
DataSetIntegrator.h
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
AdvectAlgorithm.h
PartitionedDataSet.h