/home/coin/SVN-release/OS-2.4.0/Bcp/src/Member/BCP_message_pvm.cpp

Go to the documentation of this file.
00001 // Copyright (C) 2000, International Business Machines
00002 // Corporation and others.  All Rights Reserved.
00003 
00004 #include "BcpConfig.h"
00005 #if defined(COIN_HAS_PVM)
00006 
00007 #include <cstdio>
00008 #include <cmath>
00009 
00010 #include <pvm3.h>
00011 
00012 #include "BCP_math.hpp"
00013 #include "BCP_error.hpp"
00014 #include "BCP_buffer.hpp"
00015 #include "BCP_vector.hpp"
00016 #include "BCP_message_pvm.hpp"
00017 
00018 //#############################################################################
00019 
00020 BCP_pvm_environment::~BCP_pvm_environment()
00021 {
00022     check_error( pvm_exit(), "~BCP_pvm_environment()");
00023 }
00024 
00025 //-----------------------------------------------------------------------------
00026 
00027 void
00028 BCP_pvm_environment::check_error(const int code, const char* str) const
00029 {
00030     if (code < 0){
00031         printf("%s returned error code %i.\n", str, code);
00032         throw BCP_fatal_error(" ERROR in PVM -- exiting.\n");
00033     }
00034 }
00035 
00036 //-----------------------------------------------------------------------------
00037 
00038 int
00039 BCP_pvm_environment::register_process(USER_initialize* user_init)
00040 {
00041     int pid = pvm_mytid();
00042     check_error(pid, "pvm_mytid()");
00043     int parent = parent_process();
00044     if (parent < 0)
00045         check_error(pvm_catchout(stdout),"register_process -- pvm_catchout\n");
00046     /* set stdout to be line buffered so that pvm_catchout will work faster */
00047     setvbuf(stdout, (char *)NULL, _IOLBF, 0);
00048     return pid;
00049 }
00050 
00051 int
00052 BCP_pvm_environment::parent_process()
00053 {
00054     int pid = pvm_parent();
00055     if (pid == PvmNoParent)
00056         return -1;
00057     check_error(pid, "pvm_parent()");
00058     return pid;
00059 }
00060 
00061 bool
00062 BCP_pvm_environment::alive(const int pid)
00063 {
00064     return pvm_pstat(pid) == PvmOk;
00065 }
00066 
00067 BCP_vec<int>::const_iterator
00068 BCP_pvm_environment::alive(const BCP_proc_array& parray)
00069 {
00070     BCP_vec<int>::const_iterator first = parray.procs().begin();
00071     BCP_vec<int>::const_iterator last = parray.procs().end();
00072     while (first != last) {
00073         if (! alive(*first))
00074             break;
00075         ++first;
00076     }
00077     return BCP_vec<int>::const_iterator>(first);
00078 }
00079 
00080 //-----------------------------------------------------------------------------
00081 
00082 void
00083 BCP_pvm_environment::send(const int target,
00084                           const BCP_message_tag tag)
00085 {
00086     // create an empty buffer and send it with the tag
00087     check_error( pvm_initsend(PvmDataInPlace), "send() - initsend");
00088     check_error( pvm_send(target, tag), "send() - send");
00089 }
00090 
00091 void
00092 BCP_pvm_environment::send(const int target,
00093                           const BCP_message_tag tag, const BCP_buffer& buf)
00094 {
00095     check_error( pvm_initsend(PvmDataInPlace), "send() - initsend");
00096     check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
00097                  "send() - pkbyte");
00098     check_error( pvm_send(target, tag), "send() - send");
00099 }
00100    
00101 //-----------------------------------------------------------------------------
00102 
00103 void
00104 BCP_pvm_environment::multicast(const BCP_proc_array* const target,
00105                                const BCP_message_tag tag)
00106 {
00107     check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
00108     check_error( pvm_mcast(&target->procs()[0], target->size(), tag),
00109                  "multicast() - send");
00110 }
00111 
00112 void
00113 BCP_pvm_environment::multicast(const BCP_proc_array* const target,
00114                                const BCP_message_tag tag,
00115                                const BCP_buffer& buf)
00116 {
00117     check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
00118     check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
00119                  "multicast() - pkbyte");
00120     check_error( pvm_mcast(&target->procs()[0], target->size(), tag),
00121                  "multicast() - send");
00122 }
00123 
00124 void
00125 BCP_pvm_environment::multicast(BCP_vec<int>::const_iterator beg,
00126                                BCP_vec<int>::const_iterator end,
00127                                const BCP_message_tag tag) {
00128     int* pids = BCP_process_vec_2_int(beg, end, "multicast() - parray_2_int");
00129     check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
00130     check_error( pvm_mcast(pids, end - beg, tag), "multicast() - send");
00131     delete[] pids;
00132 }
00133 
00134 void
00135 BCP_pvm_environment::multicast(BCP_vec<int>::const_iterator beg,
00136                                BCP_vec<int>::const_iterator end,
00137                                const BCP_message_tag tag,
00138                                const BCP_buffer& buf) {
00139     int* pids = BCP_process_vec_2_int(beg, end, "multicast() - parray_2_int");
00140     check_error( pvm_initsend(PvmDataInPlace), "multicast() - initsend");
00141     check_error( pvm_pkbyte(const_cast<char*>(buf.data()), buf.size(), 1),
00142                  "multicast() - pkbyte");
00143     check_error( pvm_mcast(pids, end - beg, tag), "multicast() - send");
00144     delete[] pids;
00145 }
00146 
00147 //-----------------------------------------------------------------------------
00148 
00149 void
00150 BCP_pvm_environment::receive(const int const source,
00151                              const BCP_message_tag tag, BCP_buffer& buf,
00152                              const double timeout) {
00153     buf.clear();
00154     delete buf._sender;   buf._sender = 0;
00155 
00156     int pid = source == BCP_AnyProcess? -1 : BCP_is_pvm_id(source, "receive()");
00157     struct timeval tout;
00158     int bufid = 0;
00159     int msgtag = tag == BCP_Msg_AnyMessage ? -1 : tag;
00160     if (timeout < 0) {
00161         if (source != BCP_AnyProcess) {
00162             // waiting for a particular process. check from time to time that
00163             // it's still alive
00164             tout.tv_sec = 10; tout.tv_usec = 0;
00165             do {
00166                 check_error( bufid = pvm_trecv(pid, msgtag, &tout),
00167                              "receive() - trecv");
00168                 if (pvm_pstat(pid) != PvmOk)
00169                     throw BCP_fatal_error("receive() - source died.\n");
00170             } while (! bufid);
00171         }else{
00172             // waiting for anyone
00173             check_error( bufid = pvm_recv(pid, msgtag), "receive() - recv");
00174         }
00175     } else {
00176         tout.tv_sec = static_cast<int>(floor(timeout));
00177         tout.tv_usec = static_cast<int>(floor((timeout - tout.tv_sec)*1e6));
00178         check_error( bufid = pvm_trecv(pid, msgtag, &tout), "receive() - trecv");
00179     }
00180     if (! bufid) {
00181         buf._msgtag = BCP_Msg_NoMessage;
00182         return;
00183     }
00184 
00185     int bytes;
00186     check_error( pvm_bufinfo(bufid, &bytes, &msgtag, &pid),
00187                  "receive() - bufinfo");
00188     buf.make_fit(bytes);
00189     buf._msgtag = static_cast<BCP_message_tag>(msgtag);
00190     delete buf._sender;
00191     buf._sender = new BCP_pvm_id(pid);
00192     buf._size = bytes;
00193     check_error( pvm_upkbyte(buf._data, bytes, 1), "receive() - upkbyte");
00194 }
00195 
00196 //-----------------------------------------------------------------------------
00197 
00198 bool
00199 BCP_pvm_environment::probe(const int const source,
00200                            const BCP_message_tag tag) {
00201     int pid = source == BCP_AnyProcess? -1 : BCP_is_pvm_id(source, "probe()");
00202     if (source != BCP_AnyProcess) {
00203         // probing for message from a particular process. check that it's still
00204         // alive
00205         if (pvm_pstat(pid) != PvmOk)
00206             throw BCP_fatal_error("probe() - source died.\n");
00207     }
00208     // check if we have a matching message
00209     int msgtag = tag == BCP_Msg_AnyMessage ? -1 : tag;
00210     int probed = pvm_probe(pid, msgtag);
00211     if (probed < 0)
00212         throw BCP_fatal_error("probe() - pvm error :-(\n");
00213     return probed > 0;
00214 }
00215 
00216 //-----------------------------------------------------------------------------
00217 
00218 int
00219 BCP_pvm_environment::unpack_proc_id(BCP_buffer& buf) {
00220     int pid;
00221     buf.unpack(pid);
00222     return new BCP_pvm_id(pid);
00223 }
00224 
00225 void
00226 BCP_pvm_environment::pack_proc_id(BCP_buffer& buf, const int pid) {
00227     buf.pack(BCP_is_pvm_id(pid, "pack_proc_id()"));
00228 }
00229             
00230 //-----------------------------------------------------------------------------
00231 
00232 static inline char*
00233 BCP_get_next_word(const char*& ctmp, const char* last)
00234 {
00235     for ( ; ctmp != last && !isgraph(*ctmp); ++ctmp);
00236     const char* word = ctmp;
00237     for ( ; ctmp != last && !isspace(*ctmp); ++ctmp);
00238     if (word == ctmp)
00239         return 0;
00240     const int len = ctmp - word;
00241     char* new_word = new char[len + 1];
00242     memcpy(new_word, word, len);
00243     new_word[len] = 0;
00244     return new_word;
00245 }
00246 
00247 static void
00248 BCP_pvm_split_exe(const BCP_string& exe, char*& exe_name, char**& exe_args)
00249 {
00250     const char* ctmp = exe.c_str();
00251     const char* last = ctmp + exe.length();
00252     std::vector<char*> arglist;
00253     exe_name = BCP_get_next_word(ctmp, last);
00254     while (ctmp != last) {
00255         char* word = BCP_get_next_word(ctmp, last);
00256         if (word)
00257             arglist.push_back(word);
00258     }
00259     if (arglist.size() == 0) {
00260         exe_args = 0;
00261     } else {
00262         exe_args = new char*[arglist.size() + 1];
00263         std::copy(arglist.begin(), arglist.end(), exe_args);
00264         exe_args[arglist.size()] = 0;
00265     }
00266 }
00267 
00268 int
00269 BCP_pvm_environment::start_process(const BCP_string& exe, const bool debug) {
00270     int flag = debug ? PvmTaskDebug : 0;
00271     int pid;
00272     char* exe_name;
00273     char** exe_args;
00274     BCP_pvm_split_exe(exe, exe_name, exe_args);
00275     pvm_spawn(exe_name, exe_args, flag, 0, 1, &pid);
00276     delete[] exe_name;
00277     if (exe_args != 0) {
00278         while (*exe_args != 0) {
00279             delete[] *exe_args;
00280             ++exe_args;
00281         }
00282         delete[] exe_args;
00283     }
00284     check_error(pid, "start_process() - spawn");
00285     return new BCP_pvm_id(pid);
00286 }
00287 
00288 int
00289 BCP_pvm_environment::start_process(const BCP_string& exe,
00290                                    const BCP_string& machine,
00291                                    const bool debug) {
00292     int flag = PvmTaskHost | (debug ? PvmTaskDebug : 0);
00293     int pid;
00294     char* exe_name;
00295     char** exe_args;
00296     BCP_pvm_split_exe(exe, exe_name, exe_args);
00297     pvm_spawn(exe_name, exe_args, flag,
00298               const_cast<char*>(machine.c_str()), 1, &pid);
00299     delete[] exe_name;
00300     if (exe_args != 0) {
00301         while (*exe_args != 0) {
00302             delete[] *exe_args;
00303             ++exe_args;
00304         }
00305         delete[] exe_args;
00306     }
00307     check_error(pid, "start_process() - spawn");
00308     return new BCP_pvm_id(pid);
00309 }
00310 
00311 BCP_proc_array*
00312 BCP_pvm_environment::start_processes(const BCP_string& exe,
00313                                      const int proc_num,
00314                                      const bool debug) {
00315     BCP_vec<int> procs;
00316     procs.reserve(proc_num);
00317 
00318     int flag = debug ? PvmTaskDebug : 0;
00319     int* pids = new int[proc_num];
00320     char* exe_name;
00321     char** exe_args;
00322     BCP_pvm_split_exe(exe, exe_name, exe_args);
00323     pvm_spawn(exe_name, exe_args, flag, 0, proc_num, pids);
00324     delete[] exe_name;
00325     if (exe_args != 0) {
00326         while (*exe_args != 0) {
00327             delete[] *exe_args;
00328             ++exe_args;
00329         }
00330         delete[] exe_args;
00331     }
00332     for (int i = 0; i != proc_num; ++i)
00333         check_error(pids[i], "start_processes() - spawn");
00334     for (int i = 0; i != proc_num; ++i)
00335         procs.push_back(new BCP_pvm_id(pids[i]));
00336     delete[] pids;
00337     BCP_proc_array* pa = new BCP_proc_array;
00338     pa->add_procs(procs.begin(), procs.end());
00339     return pa;
00340 }
00341 
00342 BCP_proc_array*
00343 BCP_pvm_environment::start_processes(const BCP_string& exe,
00344                                      const int proc_num,
00345                                      const BCP_vec<BCP_string>& machines,
00346                                      const bool debug){
00347     BCP_vec<int> procs;
00348     procs.reserve(proc_num);
00349     // spawn the jobs one-by-one on the specified machines
00350     for (int i = 0; i != proc_num; ++i)
00351         procs.push_back(start_process(exe, machines[i%machines.size()], debug));
00352     BCP_proc_array* pa = new BCP_proc_array;
00353     pa->add_procs(procs.begin(), procs.end());
00354     return pa;
00355 }
00356 
00357 //-----------------------------------------------------------------------------
00358 
00359 // void BCP_pvm_environment::stop_process(const int process) {
00360 //    check_error( pvm_kill(process, "stop_process()"), "stop_process()");
00361 // }
00362 
00363 // void BCP_pvm_environment::stop_processes(const BCP_proc_array* processes){
00364 //    BCP_vec<int>::const_iterator first = processes.procs.begin();
00365 //    BCP_vec<int>::const_iterator last = pprocesses.procs.end();
00366 //    while (first != last) {
00367 //       check_error( pvm_kill(*first, "stop_processes()"),"stop_processes()");
00368 //       ++first;
00369 //    }
00370 // }
00371 
00372 #endif /* COIN_HAS_PVM */

Generated on Thu Sep 22 03:05:52 2011 by  doxygen 1.4.7