VTK-m  2.2
Messenger.h
Go to the documentation of this file.
1 //============================================================================
2 // Copyright (c) Kitware, Inc.
3 // All rights reserved.
4 // See LICENSE.txt for details.
5 //
6 // This software is distributed WITHOUT ANY WARRANTY; without even
7 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8 // PURPOSE. See the above copyright notice for more information.
9 //============================================================================
10 
11 #ifndef vtk_m_filter_flow_internal_Messenger_h
12 #define vtk_m_filter_flow_internal_Messenger_h
13 
14 #include <vtkm/Types.h>
16 #include <vtkm/thirdparty/diy/diy.h>
17 
18 #include <list>
19 #include <map>
20 #include <set>
21 #include <vector>
22 
23 #ifdef VTKM_ENABLE_MPI
24 #include <mpi.h>
25 #endif
26 
27 namespace vtkm
28 {
29 namespace filter
30 {
31 namespace flow
32 {
33 namespace internal
34 {
35 
36 class VTKM_FILTER_FLOW_EXPORT Messenger
37 {
38 public:
39  VTKM_CONT Messenger(vtkmdiy::mpi::communicator& comm, bool useAsyncComm);
40  VTKM_CONT virtual ~Messenger()
41  {
42 #ifdef VTKM_ENABLE_MPI
43  this->CleanupRequests();
44 #endif
45  }
46 
47  int GetRank() const { return this->Rank; }
48  int GetNumRanks() const { return this->NumRanks; }
49 
50 #ifdef VTKM_ENABLE_MPI
51  VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
52 
53  bool UsingSyncCommunication() const { return !this->UsingAsyncCommunication(); }
54  bool UsingAsyncCommunication() const { return this->UseAsynchronousCommunication; }
55 
56 protected:
57  static std::size_t CalcMessageBufferSize(std::size_t msgSz);
58 
59  void InitializeBuffers();
60  void CheckPendingSendRequests();
61  void CleanupRequests(int tag = TAG_ANY);
62  void SendData(int dst, int tag, vtkmdiy::MemoryBuffer& buff)
63  {
64  if (this->UseAsynchronousCommunication)
65  this->SendDataAsync(dst, tag, buff);
66  else
67  this->SendDataSync(dst, tag, buff);
68  }
69  bool RecvData(const std::set<int>& tags,
70  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
71  bool blockAndWait = false)
72  {
73  if (this->UseAsynchronousCommunication)
74  return this->RecvDataAsync(tags, buffers, blockAndWait);
75  else
76  return this->RecvDataSync(tags, buffers, blockAndWait);
77  }
78 
79 private:
80  void SendDataAsync(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
81  void SendDataSync(int dst, int tag, vtkmdiy::MemoryBuffer& buff);
82  bool RecvDataAsync(const std::set<int>& tags,
83  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
84  bool blockAndWait);
85  bool RecvDataSync(const std::set<int>& tags,
86  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
87  bool blockAndWait);
88  void PostRecv(int tag);
89  void PostRecv(int tag, std::size_t sz, int src = -1);
90 
91 
92  //Message headers.
93  typedef struct
94  {
95  int rank, tag;
96  std::size_t id, numPackets, packet, packetSz, dataSz;
97  } Header;
98 
99  void PrepareForSend(int tag, const vtkmdiy::MemoryBuffer& buff, std::vector<char*>& buffList);
100  vtkm::Id GetMsgID() { return this->MsgID++; }
101  static bool PacketCompare(const char* a, const char* b);
102  void ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
103  std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers);
104 
105  // Send/Recv buffer management structures.
106  using RequestTagPair = std::pair<MPI_Request, int>;
107  using RankIdPair = std::pair<int, int>;
108 
109  //Member data
110  // <tag, {dst, buffer}>
111  std::map<int, std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>> SyncSendBuffers;
112  std::map<int, std::pair<std::size_t, std::size_t>> MessageTagInfo;
113  MPI_Comm MPIComm;
114  std::size_t MsgID;
115  int NumRanks;
116  int Rank;
117  std::map<RequestTagPair, char*> RecvBuffers;
118  std::map<RankIdPair, std::list<char*>> RecvPackets;
119  std::map<RequestTagPair, char*> SendBuffers;
120  static constexpr int TAG_ANY = -1;
121  bool UseAsynchronousCommunication = true;
122 
123  void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
124  const std::set<int>& tags,
125  bool BlockAndWait,
126  std::vector<RequestTagPair>& reqTags);
127 
128 #else
129 protected:
130  static constexpr int NumRanks = 1;
131  static constexpr int Rank = 0;
132 #endif
133 };
134 
135 
136 template <typename T>
137 std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
138 {
139  os << "[";
140  for (std::size_t i = 0; i < v.size(); ++i)
141  {
142  os << v[i];
143  if (i != v.size() - 1)
144  os << ", ";
145  }
146  os << "]";
147  return os;
148 }
149 
150 }
151 }
152 }
153 } // vtkm::filter::flow::internal
154 
155 #endif // vtk_m_filter_flow_internal_Messenger_h
vtkm
Groups connected points that have the same field value.
Definition: Atomic.h:19
Types.h
vtkm::operator<<
std::ostream & operator<<(std::ostream &stream, const vtkm::Bounds &bounds)
Helper function for printing bounds during testing.
Definition: Bounds.h:248
VTKM_CONT
#define VTKM_CONT
Definition: ExportMacros.h:57
vtkm::Id
vtkm::Int64 Id
Base type to use to index arrays.
Definition: Types.h:227
VTKM_FILTER_FLOW_EXPORT
#define VTKM_FILTER_FLOW_EXPORT
Definition: vtkm_filter_flow_export.h:44
vtkm_filter_flow_export.h