11 #ifndef vtk_m_filter_flow_internal_Messenger_h
12 #define vtk_m_filter_flow_internal_Messenger_h
16 #include <vtkm/thirdparty/diy/diy.h>
23 #ifdef VTKM_ENABLE_MPI
39 VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm,
bool useAsyncComm);
42 #ifdef VTKM_ENABLE_MPI
43 this->CleanupRequests();
47 int GetRank()
const {
return this->Rank; }
48 int GetNumRanks()
const {
return this->NumRanks; }
50 #ifdef VTKM_ENABLE_MPI
51 VTKM_CONT void RegisterTag(
int tag, std::size_t numRecvs, std::size_t size);
53 bool UsingSyncCommunication()
const {
return !this->UsingAsyncCommunication(); }
54 bool UsingAsyncCommunication()
const {
return this->UseAsynchronousCommunication; }
57 static std::size_t CalcMessageBufferSize(std::size_t msgSz);
59 void InitializeBuffers();
60 void CheckPendingSendRequests();
61 void CleanupRequests(
int tag = TAG_ANY);
62 void SendData(
int dst,
int tag, vtkmdiy::MemoryBuffer& buff)
64 if (this->UseAsynchronousCommunication)
65 this->SendDataAsync(dst, tag, buff);
67 this->SendDataSync(dst, tag, buff);
69 bool RecvData(
const std::set<int>& tags,
70 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
71 bool blockAndWait =
false)
73 if (this->UseAsynchronousCommunication)
74 return this->RecvDataAsync(tags, buffers, blockAndWait);
76 return this->RecvDataSync(tags, buffers, blockAndWait);
80 void SendDataAsync(
int dst,
int tag,
const vtkmdiy::MemoryBuffer& buff);
81 void SendDataSync(
int dst,
int tag, vtkmdiy::MemoryBuffer& buff);
82 bool RecvDataAsync(
const std::set<int>& tags,
83 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
85 bool RecvDataSync(
const std::set<int>& tags,
86 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
88 void PostRecv(
int tag);
89 void PostRecv(
int tag, std::size_t sz,
int src = -1);
96 std::size_t id, numPackets, packet, packetSz, dataSz;
99 void PrepareForSend(
int tag,
const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
100 vtkm::Id GetMsgID() {
return this->MsgID++; }
101 static bool PacketCompare(
const char* a,
const char* b);
102 void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
103 std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers);
106 using RequestTagPair = std::pair<MPI_Request, int>;
107 using RankIdPair = std::pair<int, int>;
111 std::map<int, std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>> SyncSendBuffers;
112 std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
117 std::map<RequestTagPair, char*> RecvBuffers;
118 std::map<RankIdPair, std::list<char*>> RecvPackets;
119 std::map<RequestTagPair, char*> SendBuffers;
120 static constexpr
int TAG_ANY = -1;
121 bool UseAsynchronousCommunication =
true;
123 void CheckRequests(
const std::map<RequestTagPair, char*>& buffer,
124 const std::set<int>& tags,
126 std::vector<RequestTagPair>& reqTags);
130 static constexpr
int NumRanks = 1;
131 static constexpr
int Rank = 0;
136 template <
typename T>
137 std::ostream&
operator<<(std::ostream& os,
const std::vector<T>& v)
140 for (std::size_t i = 0; i < v.size(); ++i)
143 if (i != v.size() - 1)
155 #endif // vtk_m_filter_flow_internal_Messenger_h