BCP_message_pvm.cpp
Go to the documentation of this file.
1 // Copyright (C) 2000, International Business Machines
2 // Corporation and others. All Rights Reserved.
3 
4 #include "BcpConfig.h"
5 #if defined(COIN_HAS_PVM)
6 
7 #include <cstdio>
8 #include <cmath>
9 
10 #include <pvm3.h>
11 
12 #include "BCP_math.hpp"
13 #include "BCP_error.hpp"
14 #include "BCP_buffer.hpp"
15 #include "BCP_vector.hpp"
16 #include "BCP_message_pvm.hpp"
17 
18 //#############################################################################
19 
20 BCP_pvm_environment::~BCP_pvm_environment()
21 {
22  check_error( pvm_exit(), "~BCP_pvm_environment()");
23 }
24 
25 //-----------------------------------------------------------------------------
26 
27 void
28 BCP_pvm_environment::check_error(const int code, const char* str) const
29 {
30  if (code < 0){
31  printf("%s returned error code %i.\n", str, code);
32  throw BCP_fatal_error(" ERROR in PVM -- exiting.\n");
33  }
34 }
35 
36 //-----------------------------------------------------------------------------
37 
38 int
39 BCP_pvm_environment::register_process(USER_initialize* user_init)
40 {
41  int pid = pvm_mytid();
42  check_error(pid, "pvm_mytid()");
43  int parent = parent_process();
44  if (parent < 0)
45  check_error(pvm_catchout(stdout),"register_process -- pvm_catchout\n");
46  /* set stdout to be line buffered so that pvm_catchout will work faster */
47  setvbuf(stdout, (char *)NULL, _IOLBF, 0);
48  return pid;
49 }
50 
51 int
52 BCP_pvm_environment::parent_process()
53 {
54  int pid = pvm_parent();
55  if (pid == PvmNoParent)
56  return -1;
57  check_error(pid, "pvm_parent()");
58  return pid;
59 }
60 
61 bool
62 BCP_pvm_environment::alive(const int pid)
63 {
64  return pvm_pstat(pid) == PvmOk;
65 }
66 
68 BCP_pvm_environment::alive(const BCP_proc_array& parray)
69 {
70  BCP_vec<int>::const_iterator first = parray.procs().begin();
71  BCP_vec<int>::const_iterator last = parray.procs().end();
72  while (first != last) {
73  if (! alive(*first))
74  break;
75  ++first;
76  }
77  return BCP_vec<int>::const_iterator>(first);
78 }
79 
80 //-----------------------------------------------------------------------------
81 
82 void
83 BCP_pvm_environment::send(const int target,
84  const BCP_message_tag tag)
85 {
86  // create an empty buffer and send it with the tag
87  check_error( pvm_initsend(PvmDataInPlace), "send() - initsend");
88  check_error( pvm_send(target, tag), "send() - send");
89 }
90 
91 void
92 BCP_pvm_environment::send(const int target,
93  const BCP_message_tag tag, const BCP_buffer& buf)
94 {
95  check_error( pvm_initsend(PvmDataInPlace), "send() - initsend");
96  check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
97  "send() - pkbyte");
98  check_error( pvm_send(target, tag), "send() - send");
99 }
100 
101 //-----------------------------------------------------------------------------
102 
103 void
104 BCP_pvm_environment::multicast(const BCP_proc_array* const target,
105  const BCP_message_tag tag)
106 {
107  check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
108  check_error( pvm_mcast(&target->procs()[0], target->size(), tag),
109  "multicast() - send");
110 }
111 
112 void
113 BCP_pvm_environment::multicast(const BCP_proc_array* const target,
114  const BCP_message_tag tag,
115  const BCP_buffer& buf)
116 {
117  check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
118  check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
119  "multicast() - pkbyte");
120  check_error( pvm_mcast(&target->procs()[0], target->size(), tag),
121  "multicast() - send");
122 }
123 
124 void
125 BCP_pvm_environment::multicast(BCP_vec<int>::const_iterator beg,
127  const BCP_message_tag tag) {
128  int* pids = BCP_process_vec_2_int(beg, end, "multicast() - parray_2_int");
129  check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
130  check_error( pvm_mcast(pids, end - beg, tag), "multicast() - send");
131  delete[] pids;
132 }
133 
134 void
135 BCP_pvm_environment::multicast(BCP_vec<int>::const_iterator beg,
137  const BCP_message_tag tag,
138  const BCP_buffer& buf) {
139  int* pids = BCP_process_vec_2_int(beg, end, "multicast() - parray_2_int");
140  check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
141  check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
142  "multicast() - pkbyte");
143  check_error( pvm_mcast(pids, end - beg, tag), "multicast() - send");
144  delete[] pids;
145 }
146 
147 //-----------------------------------------------------------------------------
148 
149 void
150 BCP_pvm_environment::receive(const int const source,
151  const BCP_message_tag tag, BCP_buffer& buf,
152  const double timeout) {
153  buf.clear();
154  delete buf._sender; buf._sender = 0;
155 
156  int pid = source == BCP_AnyProcess? -1 : BCP_is_pvm_id(source, "receive()");
157  struct timeval tout;
158  int bufid = 0;
159  int msgtag = tag == BCP_Msg_AnyMessage ? -1 : tag;
160  if (timeout < 0) {
161  if (source != BCP_AnyProcess) {
162  // waiting for a particular process. check from time to time that
163  // it's still alive
164  tout.tv_sec = 10; tout.tv_usec = 0;
165  do {
166  check_error( bufid = pvm_trecv(pid, msgtag, &tout),
167  "receive() - trecv");
168  if (pvm_pstat(pid) != PvmOk)
169  throw BCP_fatal_error("receive() - source died.\n");
170  } while (! bufid);
171  }else{
172  // waiting for anyone
173  check_error( bufid = pvm_recv(pid, msgtag), "receive() - recv");
174  }
175  } else {
176  tout.tv_sec = static_cast<int>(floor(timeout));
177  tout.tv_usec = static_cast<int>(floor((timeout - tout.tv_sec)*1e6));
178  check_error( bufid = pvm_trecv(pid, msgtag, &tout), "receive() - trecv");
179  }
180  if (! bufid) {
182  return;
183  }
184 
185  int bytes;
186  check_error( pvm_bufinfo(bufid, &bytes, &msgtag, &pid),
187  "receive() - bufinfo");
188  buf.make_fit(bytes);
189  buf._msgtag = static_cast<BCP_message_tag>(msgtag);
190  delete buf._sender;
191  buf._sender = new BCP_pvm_id(pid);
192  buf._size = bytes;
193  check_error( pvm_upkbyte(buf._data, bytes, 1), "receive() - upkbyte");
194 }
195 
196 //-----------------------------------------------------------------------------
197 
198 bool
199 BCP_pvm_environment::probe(const int const source,
200  const BCP_message_tag tag) {
201  int pid = source == BCP_AnyProcess? -1 : BCP_is_pvm_id(source, "probe()");
202  if (source != BCP_AnyProcess) {
203  // probing for message from a particular process. check that it's still
204  // alive
205  if (pvm_pstat(pid) != PvmOk)
206  throw BCP_fatal_error("probe() - source died.\n");
207  }
208  // check if we have a matching message
209  int msgtag = tag == BCP_Msg_AnyMessage ? -1 : tag;
210  int probed = pvm_probe(pid, msgtag);
211  if (probed < 0)
212  throw BCP_fatal_error("probe() - pvm error :-(\n");
213  return probed > 0;
214 }
215 
216 //-----------------------------------------------------------------------------
217 
218 int
219 BCP_pvm_environment::unpack_proc_id(BCP_buffer& buf) {
220  int pid;
221  buf.unpack(pid);
222  return new BCP_pvm_id(pid);
223 }
224 
225 void
226 BCP_pvm_environment::pack_proc_id(BCP_buffer& buf, const int pid) {
227  buf.pack(BCP_is_pvm_id(pid, "pack_proc_id()"));
228 }
229 
230 //-----------------------------------------------------------------------------
231 
232 static inline char*
233 BCP_get_next_word(const char*& ctmp, const char* last)
234 {
235  for ( ; ctmp != last && !isgraph(*ctmp); ++ctmp);
236  const char* word = ctmp;
237  for ( ; ctmp != last && !isspace(*ctmp); ++ctmp);
238  if (word == ctmp)
239  return 0;
240  const int len = ctmp - word;
241  char* new_word = new char[len + 1];
242  memcpy(new_word, word, len);
243  new_word[len] = 0;
244  return new_word;
245 }
246 
247 static void
248 BCP_pvm_split_exe(const BCP_string& exe, char*& exe_name, char**& exe_args)
249 {
250  const char* ctmp = exe.c_str();
251  const char* last = ctmp + exe.length();
252  std::vector<char*> arglist;
253  exe_name = BCP_get_next_word(ctmp, last);
254  while (ctmp != last) {
255  char* word = BCP_get_next_word(ctmp, last);
256  if (word)
257  arglist.push_back(word);
258  }
259  if (arglist.size() == 0) {
260  exe_args = 0;
261  } else {
262  exe_args = new char*[arglist.size() + 1];
263  std::copy(arglist.begin(), arglist.end(), exe_args);
264  exe_args[arglist.size()] = 0;
265  }
266 }
267 
268 int
269 BCP_pvm_environment::start_process(const BCP_string& exe, const bool debug) {
270  int flag = debug ? PvmTaskDebug : 0;
271  int pid;
272  char* exe_name;
273  char** exe_args;
274  BCP_pvm_split_exe(exe, exe_name, exe_args);
275  pvm_spawn(exe_name, exe_args, flag, 0, 1, &pid);
276  delete[] exe_name;
277  if (exe_args != 0) {
278  while (*exe_args != 0) {
279  delete[] *exe_args;
280  ++exe_args;
281  }
282  delete[] exe_args;
283  }
284  check_error(pid, "start_process() - spawn");
285  return new BCP_pvm_id(pid);
286 }
287 
288 int
289 BCP_pvm_environment::start_process(const BCP_string& exe,
290  const BCP_string& machine,
291  const bool debug) {
292  int flag = PvmTaskHost | (debug ? PvmTaskDebug : 0);
293  int pid;
294  char* exe_name;
295  char** exe_args;
296  BCP_pvm_split_exe(exe, exe_name, exe_args);
297  pvm_spawn(exe_name, exe_args, flag,
298  const_cast<char*>(machine.c_str()), 1, &pid);
299  delete[] exe_name;
300  if (exe_args != 0) {
301  while (*exe_args != 0) {
302  delete[] *exe_args;
303  ++exe_args;
304  }
305  delete[] exe_args;
306  }
307  check_error(pid, "start_process() - spawn");
308  return new BCP_pvm_id(pid);
309 }
310 
311 BCP_proc_array*
312 BCP_pvm_environment::start_processes(const BCP_string& exe,
313  const int proc_num,
314  const bool debug) {
315  BCP_vec<int> procs;
316  procs.reserve(proc_num);
317 
318  int flag = debug ? PvmTaskDebug : 0;
319  int* pids = new int[proc_num];
320  char* exe_name;
321  char** exe_args;
322  BCP_pvm_split_exe(exe, exe_name, exe_args);
323  pvm_spawn(exe_name, exe_args, flag, 0, proc_num, pids);
324  delete[] exe_name;
325  if (exe_args != 0) {
326  while (*exe_args != 0) {
327  delete[] *exe_args;
328  ++exe_args;
329  }
330  delete[] exe_args;
331  }
332  for (int i = 0; i != proc_num; ++i)
333  check_error(pids[i], "start_processes() - spawn");
334  for (int i = 0; i != proc_num; ++i)
335  procs.push_back(new BCP_pvm_id(pids[i]));
336  delete[] pids;
337  BCP_proc_array* pa = new BCP_proc_array;
338  pa->add_procs(procs.begin(), procs.end());
339  return pa;
340 }
341 
342 BCP_proc_array*
343 BCP_pvm_environment::start_processes(const BCP_string& exe,
344  const int proc_num,
345  const BCP_vec<BCP_string>& machines,
346  const bool debug){
347  BCP_vec<int> procs;
348  procs.reserve(proc_num);
349  // spawn the jobs one-by-one on the specified machines
350  for (int i = 0; i != proc_num; ++i)
351  procs.push_back(start_process(exe, machines[i%machines.size()], debug));
352  BCP_proc_array* pa = new BCP_proc_array;
353  pa->add_procs(procs.begin(), procs.end());
354  return pa;
355 }
356 
357 //-----------------------------------------------------------------------------
358 
359 // void BCP_pvm_environment::stop_process(const int process) {
360 // check_error( pvm_kill(process, "stop_process()"), "stop_process()");
361 // }
362 
363 // void BCP_pvm_environment::stop_processes(const BCP_proc_array* processes){
364 // BCP_vec<int>::const_iterator first = processes.procs.begin();
365 // BCP_vec<int>::const_iterator last = pprocesses.procs.end();
366 // while (first != last) {
367 // check_error( pvm_kill(*first, "stop_processes()"),"stop_processes()");
368 // ++first;
369 // }
370 // }
371 
372 #endif /* COIN_HAS_PVM */
BCP_message_tag
This enumerative constant describes the message tags different processes of BCP understand.
BCP_buffer & pack(const T &value)
Pack a single object of type T.
Definition: BCP_buffer.hpp:177
Used when receiving, message with any message tag will be received.
void send(OSCommandLine *oscommandline, OSnl2OS *osnl2os)
Used to indicate that there is no message in the buffer of a process.
BCP_buffer & unpack(T &value)
Unpack a single object of type T.
Definition: BCP_buffer.hpp:186
const int BCP_AnyProcess
Definition: BCP_message.hpp:21
int _sender
The process id of the sender of the last received message.
Definition: BCP_buffer.hpp:73
const char * c_str() const
Definition: BCP_string.hpp:19
iterator begin()
Return an iterator to the beginning of the object.
Definition: BCP_vector.hpp:99
void reserve(const size_t n)
Reallocate the object to make space for n entries.
This class is a very simple impelementation of a constant length string.
Definition: BCP_string.hpp:13
void push_back(const_reference x)
Append x to the end of the vector.
This class is an abstract base class for the initializer class the user has to provide.
Definition: BCP_USER.hpp:160
fint end
int length() const
Definition: BCP_string.hpp:16
char * _data
Pointer to the buffer itself.
Definition: BCP_buffer.hpp:82
void clear()
Completely clear the buffer.
Definition: BCP_buffer.hpp:168
Currently there isn&#39;t any error handling in BCP.
Definition: BCP_error.hpp:20
size_t size() const
Return the current number of entries.
Definition: BCP_vector.hpp:116
iterator end()
Return an iterator to the end of the object.
Definition: BCP_vector.hpp:104
The class BCP_vec serves the same purpose as the vector class in the standard template library...
Definition: BCP_vector.hpp:24
size_t _size
The current size of the message (the first _size bytes of the buffer).
Definition: BCP_buffer.hpp:80
This class describes the message buffer used for all processes of BCP.
Definition: BCP_buffer.hpp:39
void make_fit(const int add_size)
Reallocate the buffer if necessary so that at least add_size number of additional bytes will fit into...
Definition: BCP_buffer.hpp:153
const char * data() const
Return a const pointer to the data stored in the buffer.
Definition: BCP_buffer.hpp:97
BCP_message_tag _msgtag
The message tag of the last received message.
Definition: BCP_buffer.hpp:70
int size() const
Return the size of the current message in the buffer.
Definition: BCP_buffer.hpp:95