VTK-m  2.2
ParticleExchanger.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_ParticleExchanger_h
12 #define vtk_m_filter_flow_internal_ParticleExchanger_h
13 
14 namespace vtkm
15 {
16 namespace filter
17 {
18 namespace flow
19 {
20 namespace internal
21 {
22 
23 template <typename ParticleType>
24 class ParticleExchanger
25 {
26 public:
27 #ifdef VTKM_ENABLE_MPI
28  ParticleExchanger(vtkmdiy::mpi::communicator& comm)
29  : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
30  , NumRanks(comm.size())
31  , Rank(comm.rank())
32 #else
33  ParticleExchanger(vtkmdiy::mpi::communicator& vtkmNotUsed(comm))
34 #endif
35  {
36  }
37 #ifdef VTKM_ENABLE_MPI
38  ~ParticleExchanger() {} //{ this->CleanupSendBuffers(false); }
39 #endif
40 
41  bool HaveWork() const { return !this->SendBuffers.empty(); }
42 
43  void Exchange(const std::vector<ParticleType>& outData,
44  const std::vector<vtkm::Id>& outRanks,
45  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
46  std::vector<ParticleType>& inData,
47  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
48  {
49  VTKM_ASSERT(outData.size() == outRanks.size());
50 
51  if (this->NumRanks == 1)
52  this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
53 #ifdef VTKM_ENABLE_MPI
54  else
55  {
56  this->CleanupSendBuffers(true);
57  this->SendParticles(outData, outRanks, outBlockIDsMap);
58  this->RecvParticles(inData, inDataBlockIDsMap);
59  }
60 #endif
61  }
62 
63 private:
64  void SerialExchange(const std::vector<ParticleType>& outData,
65  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
66  std::vector<ParticleType>& inData,
67  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
68  {
69  //Copy output to input.
70  for (const auto& p : outData)
71  {
72  const auto& bids = outBlockIDsMap.find(p.GetID())->second;
73  inData.emplace_back(p);
74  inDataBlockIDsMap[p.GetID()] = bids;
75  }
76  }
77 
78 #ifdef VTKM_ENABLE_MPI
79  using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
80 
81  void CleanupSendBuffers(bool checkRequests)
82  {
83  if (!checkRequests)
84  {
85  for (auto& entry : this->SendBuffers)
86  delete entry.second;
87  this->SendBuffers.clear();
88  return;
89  }
90 
91  if (this->SendBuffers.empty())
92  return;
93 
94  std::vector<MPI_Request> requests;
95  for (auto& req : this->SendBuffers)
96  requests.emplace_back(req.first);
97 
98  //MPI_Testsome will update the complete requests to MPI_REQUEST_NULL.
99  //Because we are using the MPI_Request as a key in SendBuffers, we need
100  //to make a copy.
101  auto requestsOrig = requests;
102 
103  std::vector<MPI_Status> status(requests.size());
104  std::vector<int> indices(requests.size());
105  int num = 0;
106  int err = MPI_Testsome(requests.size(), requests.data(), &num, indices.data(), status.data());
107 
108  if (err != MPI_SUCCESS)
110  "Error with MPI_Testsome in ParticleExchanger::CleanupSendBuffers");
111 
112  if (num > 0)
113  {
114  for (int i = 0; i < num; i++)
115  {
116  std::size_t idx = static_cast<std::size_t>(indices[i]);
117  const auto& req = requestsOrig[idx];
118  //const auto& stat = status[idx];
119  auto it = this->SendBuffers.find(req);
120  if (it == this->SendBuffers.end())
122  "Missing request in ParticleExchanger::CleanupSendBuffers");
123 
124  //Delete the buffer and remove from SendBuffers.
125  delete it->second;
126  this->SendBuffers.erase(it);
127  //std::cout<<this->Rank<<" SendBuffer: Delete"<<std::endl;
128  }
129  }
130  }
131 
132  void SendParticles(const std::vector<ParticleType>& outData,
133  const std::vector<vtkm::Id>& outRanks,
134  const std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap)
135  {
136  if (outData.empty())
137  return;
138 
139  //create the send data: vector of particles, vector of vector of blockIds.
140  std::size_t n = outData.size();
141  std::unordered_map<int, std::vector<ParticleCommType>> sendData;
142 
143  // dst, vector of pair(particles, blockIds)
144  for (std::size_t i = 0; i < n; i++)
145  {
146  const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
147  //sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids)));
148  sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
149  }
150 
151  //Send to dst, vector<pair<particle, bids>>
152  for (auto& si : sendData)
153  this->SendParticlesToDst(si.first, si.second);
154  }
155 
156  void SendParticlesToDst(int dst, const std::vector<ParticleCommType>& data)
157  {
158  if (dst == this->Rank)
159  {
160  VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
161  return;
162  }
163 
164  //Serialize vector(pair(particle, bids)) and send.
165  vtkmdiy::MemoryBuffer* bb = new vtkmdiy::MemoryBuffer();
166  vtkmdiy::save(*bb, data);
167  bb->reset();
168 
169  MPI_Request req;
170  int err =
171  MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, this->Tag, this->MPIComm, &req);
172  if (err != MPI_SUCCESS)
173  throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
174  this->SendBuffers[req] = bb;
175  }
176 
177  void RecvParticles(std::vector<ParticleType>& inData,
178  std::unordered_map<vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap) const
179  {
180  inData.resize(0);
181  inDataBlockIDsMap.clear();
182 
183  std::vector<vtkmdiy::MemoryBuffer> buffers;
184 
185  MPI_Status status;
186  while (true)
187  {
188  int flag = 0;
189  int err = MPI_Iprobe(MPI_ANY_SOURCE, this->Tag, this->MPIComm, &flag, &status);
190  if (err != MPI_SUCCESS)
192  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
193 
194  if (flag == 0) //no message arrived we are done.
195  break;
196 
197  //Otherwise, recv the incoming data
198  int incomingSize;
199  err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
200  if (err != MPI_SUCCESS)
202  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
203 
204  std::vector<char> recvBuff;
205  recvBuff.resize(incomingSize);
206  MPI_Status recvStatus;
207 
208  err = MPI_Recv(recvBuff.data(),
209  incomingSize,
210  MPI_BYTE,
211  status.MPI_SOURCE,
212  status.MPI_TAG,
213  this->MPIComm,
214  &recvStatus);
215  if (err != MPI_SUCCESS)
217  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
218 
219  //Add incoming data to inData and inDataBlockIds.
220  vtkmdiy::MemoryBuffer memBuff;
221  memBuff.save_binary(recvBuff.data(), incomingSize);
222  memBuff.reset();
223 
224  std::vector<ParticleCommType> data;
225  vtkmdiy::load(memBuff, data);
226  memBuff.reset();
227  for (const auto& d : data)
228  {
229  const auto& particle = d.first;
230  const auto& bids = d.second;
231  inDataBlockIDsMap[particle.GetID()] = bids;
232  inData.emplace_back(particle);
233  }
234 
235  //Note, we don't terminate the while loop here. We want to go back and
236  //check if any messages came in while buffers were being processed.
237  }
238  }
239 
240  MPI_Comm MPIComm;
241  vtkm::Id NumRanks;
242  vtkm::Id Rank;
243  std::unordered_map<MPI_Request, vtkmdiy::MemoryBuffer*> SendBuffers;
244  int Tag = 100;
245 #else
246  vtkm::Id NumRanks = 1;
247  vtkm::Id Rank = 0;
248 #endif
249 };
250 
251 }
252 }
253 }
254 } //vtkm::filter::flow::internal
255 
256 
257 #endif //vtk_m_filter_flow_internal_ParticleExchanger_h
vtkm::exec::arg::load
T load(const U &u, vtkm::Id v)
Definition: FetchTagArrayDirectIn.h:36
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
VTKM_ASSERT
#define VTKM_ASSERT(condition)
Definition: Assert.h:43
vtkm::cont::ErrorFilterExecution
This class is primarily intended to filters to throw in the control environment to indicate an execut...
Definition: ErrorFilterExecution.h:27
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
vtkmNotUsed
#define vtkmNotUsed(parameter_name)
Simple macro to identify a parameter as unused.
Definition: ExportMacros.h:128
vtkm::cont::LogLevel::Error
@ Error
Important but non-fatal errors, such as device fail-over.
VTKM_LOG_S
#define VTKM_LOG_S(level,...)
Writes a message using stream syntax to the indicated log level.
Definition: Logging.h:208