VTK-m  2.2
AdvectAlgorithmThreaded.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
12 #define vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
13 
19 
20 #include <thread>
21 
22 namespace vtkm
23 {
24 namespace filter
25 {
26 namespace flow
27 {
28 namespace internal
29 {
30 
31 template <typename DSIType>
32 class AdvectAlgorithmThreaded : public AdvectAlgorithm<DSIType>
33 {
34 public:
35  using ParticleType = typename DSIType::PType;
36 
37  AdvectAlgorithmThreaded(const vtkm::filter::flow::internal::BoundsMap& bm,
38  std::vector<DSIType>& blocks,
39  bool useAsyncComm)
40  : AdvectAlgorithm<DSIType>(bm, blocks, useAsyncComm)
41  , Done(false)
42  , WorkerActivate(false)
43  {
44  //For threaded algorithm, the particles go out of scope in the Work method.
45  //When this happens, they are destructed by the time the Manage thread gets them.
46  //Set the copy flag so the std::vector is copied into the ArrayHandle
47  for (auto& block : this->Blocks)
48  block.SetCopySeedFlag(true);
49  }
50 
51  void Go() override
52  {
53  this->ComputeTotalNumParticles();
54 
55  std::vector<std::thread> workerThreads;
56  workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
57  this->Manage();
58 
59  //This will only work for 1 thread. For > 1, the Blocks will need a mutex.
60  VTKM_ASSERT(workerThreads.size() == 1);
61  for (auto& t : workerThreads)
62  t.join();
63  }
64 
65 protected:
66  bool GetActiveParticles(std::vector<ParticleType>& particles, vtkm::Id& blockId) override
67  {
68  std::lock_guard<std::mutex> lock(this->Mutex);
69  bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
70  this->WorkerActivate = val;
71  return val;
72  }
73 
74  void UpdateActive(const std::vector<ParticleType>& particles,
75  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& idsMap) override
76  {
77  if (!particles.empty())
78  {
79  std::lock_guard<std::mutex> lock(this->Mutex);
80  this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
81 
82  //Let workers know there is new work
83  this->WorkerActivateCondition.notify_all();
84  this->WorkerActivate = true;
85  }
86  }
87 
88  bool CheckDone()
89  {
90  std::lock_guard<std::mutex> lock(this->Mutex);
91  return this->Done;
92  }
93 
94  void SetDone()
95  {
96  std::lock_guard<std::mutex> lock(this->Mutex);
97  this->Done = true;
98  this->WorkerActivateCondition.notify_all();
99  }
100 
101  static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
102 
103  void WorkerWait()
104  {
105  std::unique_lock<std::mutex> lock(this->Mutex);
106  this->WorkerActivateCondition.wait(lock, [this] { return WorkerActivate || Done; });
107  }
108 
109  void UpdateWorkerResult(vtkm::Id blockId, DSIHelperInfo<ParticleType>& b)
110  {
111  std::lock_guard<std::mutex> lock(this->Mutex);
112  auto& it = this->WorkerResults[blockId];
113  it.emplace_back(b);
114  }
115 
116  void Work()
117  {
118  while (!this->CheckDone())
119  {
120  std::vector<ParticleType> v;
121  vtkm::Id blockId = -1;
122  if (this->GetActiveParticles(v, blockId))
123  {
124  auto& block = this->GetDataSet(blockId);
125  DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
126  block.Advect(bb, this->StepSize);
127  this->UpdateWorkerResult(blockId, bb);
128  }
129  else
130  this->WorkerWait();
131  }
132  }
133 
134  void Manage()
135  {
136  if (!this->UseAsynchronousCommunication)
137  {
139  "Synchronous communication not supported for AdvectAlgorithmThreaded."
140  "Forcing asynchronous communication.");
141  }
142 
143  bool useAsync = true;
144  vtkm::filter::flow::internal::ParticleMessenger<ParticleType> messenger(
145  this->Comm, useAsync, this->BoundsMap, 1, 128);
146 
147  while (this->TotalNumTerminatedParticles < this->TotalNumParticles)
148  {
149  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
150  this->GetWorkerResults(workerResults);
151 
152  vtkm::Id numTerm = 0;
153  for (auto& it : workerResults)
154  {
155  for (auto& r : it.second)
156  numTerm += this->UpdateResult(r);
157  }
158 
159  vtkm::Id numTermMessages = 0;
160  this->Communicate(messenger, numTerm, numTermMessages);
161 
162  this->TotalNumTerminatedParticles += (numTerm + numTermMessages);
163  if (this->TotalNumTerminatedParticles > this->TotalNumParticles)
164  throw vtkm::cont::ErrorFilterExecution("Particle count error");
165  }
166 
167  //Let the workers know that we are done.
168  this->SetDone();
169  }
170 
171  bool GetBlockAndWait(const bool& syncComm, const vtkm::Id& numLocalTerm) override
172  {
173  std::lock_guard<std::mutex> lock(this->Mutex);
174  if (this->Done)
175  return true;
176 
177  return (this->AdvectAlgorithm<DSIType>::GetBlockAndWait(syncComm, numLocalTerm) &&
178  !this->WorkerActivate && this->WorkerResults.empty());
179  }
180 
181  void GetWorkerResults(
182  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
183  {
184  results.clear();
185 
186  std::lock_guard<std::mutex> lock(this->Mutex);
187  if (!this->WorkerResults.empty())
188  {
189  results = this->WorkerResults;
190  this->WorkerResults.clear();
191  }
192  }
193 
194  std::atomic<bool> Done;
195  std::mutex Mutex;
196  bool WorkerActivate;
197  std::condition_variable WorkerActivateCondition;
198  std::unordered_map<vtkm::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
199 };
200 
201 }
202 }
203 }
204 } //vtkm::filter::flow::internal
205 
206 #endif //vtk_m_filter_flow_internal_AdvectAlgorithmThreaded_h
BoundsMap.h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
vtkm::cont::ErrorFilterExecution
This class is primarily intended to filters to throw in the control environment to indicate an execut...
Definition: ErrorFilterExecution.h:27
DataSetIntegrator.h
ParticleMessenger.h
vtkm::cont::LogLevel::Info
@ Info
Information messages (detected hardware, etc) and temporary debugging output.
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
AdvectAlgorithm.h
VTKM_LOG_S
#define VTKM_LOG_S(level,...)
Writes a message using stream syntax to the indicated log level.
Definition: Logging.h:208
PartitionedDataSet.h