11 #ifndef vtk_m_filter_flow_internal_ParticleExchanger_h
12 #define vtk_m_filter_flow_internal_ParticleExchanger_h
23 template <
typename ParticleType>
24 class ParticleExchanger
27 #ifdef VTKM_ENABLE_MPI
28 ParticleExchanger(vtkmdiy::mpi::communicator& comm)
29 : MPIComm(vtkmdiy::mpi::mpi_cast(comm.handle()))
30 , NumRanks(comm.size())
33 ParticleExchanger(vtkmdiy::mpi::communicator&
vtkmNotUsed(comm))
37 #ifdef VTKM_ENABLE_MPI
38 ~ParticleExchanger() {}
41 bool HaveWork()
const {
return !this->SendBuffers.empty(); }
43 void Exchange(
const std::vector<ParticleType>& outData,
44 const std::vector<vtkm::Id>& outRanks,
45 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
46 std::vector<ParticleType>& inData,
47 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
51 if (this->NumRanks == 1)
52 this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
53 #ifdef VTKM_ENABLE_MPI
56 this->CleanupSendBuffers(
true);
57 this->SendParticles(outData, outRanks, outBlockIDsMap);
58 this->RecvParticles(inData, inDataBlockIDsMap);
64 void SerialExchange(
const std::vector<ParticleType>& outData,
65 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap,
66 std::vector<ParticleType>& inData,
67 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
70 for (
const auto& p : outData)
72 const auto& bids = outBlockIDsMap.find(p.GetID())->second;
73 inData.emplace_back(p);
74 inDataBlockIDsMap[p.GetID()] = bids;
78 #ifdef VTKM_ENABLE_MPI
79 using ParticleCommType = std::pair<ParticleType, std::vector<vtkm::Id>>;
81 void CleanupSendBuffers(
bool checkRequests)
85 for (
auto& entry : this->SendBuffers)
87 this->SendBuffers.clear();
91 if (this->SendBuffers.empty())
94 std::vector<MPI_Request> requests;
95 for (
auto& req : this->SendBuffers)
96 requests.emplace_back(req.first);
101 auto requestsOrig = requests;
103 std::vector<MPI_Status> status(requests.size());
104 std::vector<int> indices(requests.size());
106 int err = MPI_Testsome(requests.size(), requests.data(), &num, indices.data(), status.data());
108 if (err != MPI_SUCCESS)
110 "Error with MPI_Testsome in ParticleExchanger::CleanupSendBuffers");
114 for (
int i = 0; i < num; i++)
116 std::size_t idx =
static_cast<std::size_t
>(indices[i]);
117 const auto& req = requestsOrig[idx];
119 auto it = this->SendBuffers.find(req);
120 if (it == this->SendBuffers.end())
122 "Missing request in ParticleExchanger::CleanupSendBuffers");
126 this->SendBuffers.erase(it);
132 void SendParticles(
const std::vector<ParticleType>& outData,
133 const std::vector<vtkm::Id>& outRanks,
134 const std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& outBlockIDsMap)
140 std::size_t n = outData.size();
141 std::unordered_map<int, std::vector<ParticleCommType>> sendData;
144 for (std::size_t i = 0; i < n; i++)
146 const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
148 sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
152 for (
auto& si : sendData)
153 this->SendParticlesToDst(si.first, si.second);
156 void SendParticlesToDst(
int dst,
const std::vector<ParticleCommType>& data)
158 if (dst == this->Rank)
165 vtkmdiy::MemoryBuffer* bb =
new vtkmdiy::MemoryBuffer();
166 vtkmdiy::save(*bb, data);
171 MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, this->Tag, this->MPIComm, &req);
172 if (err != MPI_SUCCESS)
174 this->SendBuffers[req] = bb;
177 void RecvParticles(std::vector<ParticleType>& inData,
178 std::unordered_map<
vtkm::Id, std::vector<vtkm::Id>>& inDataBlockIDsMap)
const
181 inDataBlockIDsMap.clear();
183 std::vector<vtkmdiy::MemoryBuffer> buffers;
189 int err = MPI_Iprobe(MPI_ANY_SOURCE, this->Tag, this->MPIComm, &flag, &status);
190 if (err != MPI_SUCCESS)
192 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
199 err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
200 if (err != MPI_SUCCESS)
202 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
204 std::vector<char> recvBuff;
205 recvBuff.resize(incomingSize);
206 MPI_Status recvStatus;
208 err = MPI_Recv(recvBuff.data(),
215 if (err != MPI_SUCCESS)
217 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
220 vtkmdiy::MemoryBuffer memBuff;
221 memBuff.save_binary(recvBuff.data(), incomingSize);
224 std::vector<ParticleCommType> data;
227 for (
const auto& d : data)
229 const auto& particle = d.first;
230 const auto& bids = d.second;
231 inDataBlockIDsMap[particle.GetID()] = bids;
232 inData.emplace_back(particle);
243 std::unordered_map<MPI_Request, vtkmdiy::MemoryBuffer*> SendBuffers;
257 #endif //vtk_m_filter_flow_internal_ParticleExchanger_h