BCP_message_mpi.cpp
Go to the documentation of this file.
1 // BCP_message_mpi.cpp is a declaration of BCP_message_mpi
2 // Sonya Marcarelli & Igor Vasil'ev (vil@icc.ru)
3 // All Rights Reserved.
4 
5 #include "BcpConfig.h"
6 #if defined(COIN_HAS_MPI)
7 
8 #include <cstdio>
9 #include <cmath>
10 
11 #define MPICH_SKIP_MPICXX
12 #include <mpi.h>
13 
14 #include "BCP_error.hpp"
15 #include "BCP_process.hpp"
16 #include "BCP_buffer.hpp"
17 #include "BCP_vector.hpp"
18 #include "BCP_message_mpi.hpp"
19 
20 bool BCP_mpi_environment::mpi_init_called = false;
21 int BCP_mpi_environment::num_proc = 0;
22 int BCP_mpi_environment::seqproc = 0;
23 
24 //#############################################################################
25 
26 int BCP_mpi_environment::is_mpi(int argc, char *argv[])
27 {
28  int pid;
29  if (! mpi_init_called) {
30  MPI_Init(&argc, &argv);
31  // MPI_Init may or may not have succeeded. In any case check if we can
32  // get the number of procs
33  if (MPI_Comm_size(MPI_COMM_WORLD, &num_proc) != MPI_SUCCESS) {
34  // Now it's certain. Not an MPI environment
35  return -1;
36  }
37  mpi_init_called = true;
38  }
39  MPI_Comm_rank(MPI_COMM_WORLD, &pid);
40  return pid;
41 }
42 
43 //#############################################################################
44 
45 
46 BCP_mpi_environment::BCP_mpi_environment(int argc, char *argv[])
47 {
48  /* Initialize the MPI environment. */
49  is_mpi(argc, argv);
50 }
51 
52 //-----------------------------------------------------------------------------
53 
54 BCP_mpi_environment::~BCP_mpi_environment()
55 {
56  MPI_Finalize();
57  mpi_init_called = false;
58 }
59 
60 //-----------------------------------------------------------------------------
61 
62 void
63 BCP_mpi_environment::check_error(const int code, const char* str) const
64 {
65  if (code != MPI_SUCCESS){
66  printf("%s returned error code %i.\n", str, code);
67  throw BCP_fatal_error(" ERROR in mpi -- exiting.\n");
68  }
69 }
70 
71 //-----------------------------------------------------------------------------
72 
73 int
74 BCP_mpi_environment::register_process(USER_initialize* user_init)
75 {
76  int pid;
77  MPI_Comm_rank(MPI_COMM_WORLD, &pid);
78  setvbuf(stdout, (char *)NULL, _IOLBF, 0);
79  return pid;
80 }
81 
82 int
83 BCP_mpi_environment::parent_process()
84 {
85  int pid;
86  MPI_Comm_rank(MPI_COMM_WORLD, &pid);
87  if (pid == 0)
88  //I am the master and I have no parent
89  return -1;
90  //The master process has got always pid=0
91  return 0;
92 }
93 
94 bool
95 BCP_mpi_environment::alive(const int pid)
96 {
97  //In Mpi is not possible check if a process is alive
98  /* FIXME */
99  return true;
100 }
101 
102 const int*
103 BCP_mpi_environment::alive(int num, const int* pids)
104 {
105  //In Mpi is not possible check if a process is alive
106  return NULL;
107 }
108 
109 //-----------------------------------------------------------------------------
110 
111 void
112 BCP_mpi_environment::send(const int target, const BCP_message_tag tag)
113 {
114  //create an empty buffer and send it with the tag
115  void * buf = NULL;
116  check_error(MPI_Send(&buf, 0, MPI_CHAR,
117  target, tag, MPI_COMM_WORLD),"MPI_Send");
118 }
119 
120 void
121 BCP_mpi_environment::send(const int target,
122  const BCP_message_tag tag, const BCP_buffer& buf)
123 {
124  check_error(MPI_Send(const_cast<char*>(buf.data()), buf.size(), MPI_CHAR,
125  target, tag, MPI_COMM_WORLD),"MPI_Send");
126 }
127 
128 //-----------------------------------------------------------------------------
129 
130 void
131 BCP_mpi_environment::multicast(int num, const int* targets,
132  const BCP_message_tag tag)
133 {
134  void * buf = NULL;
135  for (int i = 0; i < num; ++i) {
136  check_error(MPI_Send(&buf, 0, MPI_CHAR,
137  targets[i], tag, MPI_COMM_WORLD),"MPI_Send");
138  }
139 }
140 
141 void
142 BCP_mpi_environment::multicast(int num, const int* targets,
143  const BCP_message_tag tag,
144  const BCP_buffer& buf)
145 {
146  for (int i = 0; i < num; ++i) {
147  check_error(MPI_Send(const_cast<char*>(buf.data()), buf.size(),
148  MPI_CHAR, targets[i], tag, MPI_COMM_WORLD),
149  "MPI_Send");
150  }
151 }
152 
153 //-----------------------------------------------------------------------------
154 
155 void
156 BCP_mpi_environment::receive(const int source,
157  const BCP_message_tag tag, BCP_buffer& buf,
158  const double timeout)
159 {
160  buf.clear();
161  buf._sender = -1;
162  int pid = (source == BCP_AnyProcess ? MPI_ANY_SOURCE : source);
163  int msgtag = (tag == BCP_Msg_AnyMessage ? MPI_ANY_TAG : tag);
164  int flag = 0;
165  MPI_Status status;
166 
167  if (timeout < 0) {
168  check_error(MPI_Probe(pid, msgtag, MPI_COMM_WORLD, &status),
169  "MPI_Probe");
170  flag = 1;
171  } else {
172  check_error(MPI_Iprobe(pid, msgtag, MPI_COMM_WORLD, &flag, &status),
173  "MPI_Iprobe");
174  }
175  //There is no message
176  if (!flag) {
178  return;
179  }
180 
181  int bytes;
182  //Get the size of message
183  check_error(MPI_Get_count( &status, MPI_CHAR, &bytes ),"MPI_Get_Count");
184  buf.make_fit(bytes);
185  buf._msgtag = static_cast<BCP_message_tag>(status.MPI_TAG);
186  buf._sender = status.MPI_SOURCE;
187  buf._size = bytes;
188 
189  //Receive the content of the message
190  check_error(MPI_Recv(buf._data, bytes, MPI_CHAR,
191  pid, msgtag, MPI_COMM_WORLD, &status),"MPI_Recv");
192 }
193 
194 
195 
196 //-----------------------------------------------------------------------------
197 
198 bool
199 BCP_mpi_environment::probe(const int source, const BCP_message_tag tag)
200 {
201  MPI_Status status;
202  // MPI_Request request;
203  int flag = 0;
204  int pid = (source == BCP_AnyProcess ? MPI_ANY_SOURCE : source);
205  int msgtag = (tag == BCP_Msg_AnyMessage ? MPI_ANY_TAG : tag);
206  check_error(MPI_Iprobe(pid, msgtag, MPI_COMM_WORLD, &flag, &status),
207  "MPI_Iprobe");
208  return flag > 0;
209 }
210 
211 //-----------------------------------------------------------------------------
212 
213 int
214 BCP_mpi_environment::start_process(const BCP_string& exe, const bool debug)
215 {
216 #ifdef COIN_HAS_MPI2
217  // FIXME: implement MPI2 proces spawning
218  printf("Sorry, MPI2 process spawning is not supported yet...\n");
219  abort();
220 #else
221  // Fake it...
222  return ++seqproc;
223 #endif
224 }
225 
226 int
227 BCP_mpi_environment::start_process(const BCP_string& exe,
228  const BCP_string& machine,
229  const bool debug)
230 {
231 #ifdef COIN_HAS_MPI2
232  // FIXME: implement MPI2 proces spawning
233  printf("Sorry, MPI2 process spawning is not supported yet...\n");
234  abort();
235 #else
236  // Fake it...
237  return ++seqproc;
238 #endif
239 }
240 
241 bool
242 BCP_mpi_environment::start_processes(const BCP_string& exe,
243  const int proc_num,
244  const bool debug,
245  int* ids)
246 {
247 #ifdef COIN_HAS_MPI2
248  // FIXME: implement MPI2 proces spawning
249  printf("Sorry, MPI2 process spawning is not supported yet...\n");
250  abort();
251 #else
252  // Fake it...
253  for (int i = 0; i < proc_num; ++i) {
254  ids[i] = ++seqproc;
255  }
256  return true;
257 #endif
258 }
259 
260 bool
261 BCP_mpi_environment::start_processes(const BCP_string& exe,
262  const int proc_num,
263  const BCP_vec<BCP_string>& machines,
264  const bool debug,
265  int* ids)
266 {
267 #ifdef COIN_HAS_MPI2
268  // FIXME: implement MPI2 proces spawning
269  printf("Sorry, MPI2 process spawning is not supported yet...\n");
270  abort();
271 #else
272  // Fake it...
273  for (int i = 0; i < proc_num; ++i) {
274  ids[i] = ++seqproc;
275  }
276  return true;
277 #endif
278 }
279 
280 int BCP_mpi_environment::num_procs() {
281  return num_proc;
282 }
283 
284 //#define MPID_DEV_REQUEST_DECL
285 #if 0
286 struct MPIDI_BGLTS_Request {
287  MPIDI_Message_match match; /* enough information to to MPI matching */
288  MPI_Datatype datatype; /* data type of message */
289  struct MPID_Datatype *datatype_ptr;
290  int ca; /* completion action */
291  char *userbuf; /* user buffer for messages */
292  unsigned userbufcount; /* count (not size) of userbuf */
293  char *uebuf; /* unexpected buffer */
294  unsigned uebuflen; /* length (bytes) of unexpected buffer */
295  MPID_Segment segment; /* segment used for non-contigouous buffers */
296  MPIDI_BGLTS_REQUEST_STATE state; /* state of request */
297  int type; /* type of request, used by gettype and settype */
298  /* one of send, rsend, recv, irecv. etc. */
299  int msgtype; /* type of associated message */
300  /* one of self, eager, etc. */
301  BGLML_Message msgdata;
302  char slack[300];
303  struct MPID_Request *next; /* link to next request */
304  int cancel_pending; /* Cancel State */
305  int dest_rank;
306  int dest_tag;
307  int dest_context_id;
308 } bglts;
309 #endif
310 
311 #ifdef MPID_DEV_REQUEST_DECL
312 int MPIDI_BGLTS_get_num_messages()
313 {
314  register MPID_Request * rreq = MPIDI_Process.recv_posted_head;
315  register int cnt = 0;
316  while (rreq != NULL) {
317  ++cnt;
318  rreq = rreq->bglts.next;
319  }
320  rreq = MPIDI_Process.recv_unexpected_head;
321  while (rreq != NULL) {
322  ++cnt;
323  rreq = rreq->bglts.next;
324  }
325  return cnt;
326 }
327 #else
328 int MPIDI_BGLTS_get_num_messages()
329 {
330  return -1;
331 }
332 #endif
333 
334 #endif /* COIN_HAS_MPI */
BCP_message_tag
This enumerative constant describes the message tags different processes of BCP understand.
Used when receiving, message with any message tag will be received.
void send(OSCommandLine *oscommandline, OSnl2OS *osnl2os)
Used to indicate that there is no message in the buffer of a process.
const int BCP_AnyProcess
Definition: BCP_message.hpp:21
static int match(const char **sp, const char *t)
Definition: OSdtoa.cpp:1604
int _sender
The process id of the sender of the last received message.
Definition: BCP_buffer.hpp:73
This class is a very simple impelementation of a constant length string.
Definition: BCP_string.hpp:13
This class is an abstract base class for the initializer class the user has to provide.
Definition: BCP_USER.hpp:160
char * _data
Pointer to the buffer itself.
Definition: BCP_buffer.hpp:82
void clear()
Completely clear the buffer.
Definition: BCP_buffer.hpp:168
Currently there isn&#39;t any error handling in BCP.
Definition: BCP_error.hpp:20
size_t _size
The current size of the message (the first _size bytes of the buffer).
Definition: BCP_buffer.hpp:80
This class describes the message buffer used for all processes of BCP.
Definition: BCP_buffer.hpp:39
void make_fit(const int add_size)
Reallocate the buffer if necessary so that at least add_size number of additional bytes will fit into...
Definition: BCP_buffer.hpp:153
const char * data() const
Return a const pointer to the data stored in the buffer.
Definition: BCP_buffer.hpp:97
BCP_message_tag _msgtag
The message tag of the last received message.
Definition: BCP_buffer.hpp:70
int size() const
Return the size of the current message in the buffer.
Definition: BCP_buffer.hpp:95