00001
00002
00003 #include <cstdio>
00004 #include <cstring>
00005 #include <fstream>
00006 #include <cctype>
00007 #include <algorithm>
00008
00009 #include <pvm3.h>
00010
00011 enum messages {
00012 BCP_CONFIG_CHANGE = 1,
00013 BCP_CONFIG_ERROR = 2,
00014 BCP_CONFIG_OK = 3,
00015 BCP_ARE_YOU_TREEMANAGER = 4,
00016 BCP_I_AM_TREEMANAGER = 5
00017 };
00018
00019 static inline bool
00020 str_eq(const char * str0, const char * str1)
00021 {
00022 return strcmp(str0, str1) == 0;
00023 }
00024
00025 static inline int
00026 str_lt(const char * str0, const char * str1)
00027 {
00028 return strcmp(str0, str1) < 0;
00029 }
00030
00031 static void
00032 find_tree_manager(const int my_tid, int &tm_tid);
00033
00034 static void
00035 stop(const char * msg);
00036
00037 int main(int argc, char** argv)
00038 {
00039 int pid = pvm_mytid();
00040
00041 if (argc != 2 && argc != 3)
00042 stop("Usage: tm_driver <control_file> [TM_tid]\n");
00043
00044 int tm_tid = 0;
00045 char * control_file = strdup(argv[1]);
00046
00047 if (argc == 3)
00048 sscanf(argv[2], "t%x", &tm_tid);
00049
00050 int info = 0;
00051
00052
00053 int nhost = 0;
00054 int narch = 0;
00055 struct pvmhostinfo *hostp = 0;
00056 info = pvm_config(&nhost, &narch, &hostp);
00057
00058
00059 int to_delete_size = 0;
00060 char ** to_delete = 0;
00061 int to_add_size = 0;
00062 char ** to_add = 0;
00063
00064 int delete_proc_num = 0;
00065 int * tid_delete = 0;
00066
00067
00068 int lp_num = 0;
00069 int cg_num = 0;
00070 int vg_num = 0;
00071 int cp_num = 0;
00072 int vp_num = 0;
00073
00074 char ** lp_mach = 0;
00075 char ** cg_mach = 0;
00076 char ** vg_mach = 0;
00077 char ** cp_mach = 0;
00078 char ** vp_mach = 0;
00079
00080
00081 ifstream ctl(control_file);
00082 if (!ctl)
00083 stop("Cannot open parameter file... Aborting.\n");
00084
00085
00086 const int MAX_PARAM_LINE_LENGTH = 1024;
00087 char line[MAX_PARAM_LINE_LENGTH+1], *end_of_line, *keyword, *value, *ctmp;
00088 char ch;
00089 while (ctl) {
00090 ctl.get(line, MAX_PARAM_LINE_LENGTH);
00091 if (ctl) {
00092 ctl.get(ch);
00093 if (ch != '\n') {
00094 printf("Too long (>= %i chars) line in the parameter file.\n",
00095 MAX_PARAM_LINE_LENGTH);
00096 stop("This is absurd. Aborting.\n");
00097 }
00098 }
00099 end_of_line = line + strlen(line);
00100
00101 keyword = find_if(line, end_of_line, isgraph);
00102 if (keyword == end_of_line)
00103 continue;
00104 ctmp = find_if(keyword, end_of_line, isspace);
00105 if (ctmp == end_of_line)
00106 continue;
00107 *ctmp = 0;
00108 ++ctmp;
00109
00110 value = find_if(ctmp, end_of_line, isgraph);
00111 if (value == end_of_line)
00112 continue;
00113
00114 ctmp = find_if(value, end_of_line, isspace);
00115 *ctmp = 0;
00116
00117
00118 if (str_eq(keyword, "BCP_delete_machine")) {
00119 ++to_delete_size;
00120 } else if (str_eq(keyword, "BCP_add_machine")) {
00121 ++to_add_size;
00122 } else if (str_eq(keyword, "BCP_delete_proc")) {
00123 ++delete_proc_num;
00124 } else if (str_eq(keyword, "BCP_lp_process")) {
00125 ++lp_num;
00126 } else if (str_eq(keyword, "BCP_cg_process")) {
00127 ++cg_num;
00128 } else if (str_eq(keyword, "BCP_vg_process")) {
00129 ++vg_num;
00130 } else if (str_eq(keyword, "BCP_cp_process")) {
00131 ++cp_num;
00132 } else if (str_eq(keyword, "BCP_vp_process")) {
00133 ++vp_num;
00134 }
00135 }
00136 ctl.close();
00137
00138 if (to_delete_size > 0) {
00139 to_delete = new char*[to_delete_size];
00140 to_delete_size = 0;
00141 }
00142 if (to_add_size > 0) {
00143 to_add = new char*[to_add_size];
00144 to_add_size = 0;
00145 }
00146 if (delete_proc_num > 0) {
00147 tid_delete = new int[delete_proc_num];
00148 delete_proc_num = 0;
00149 }
00150 if (lp_num) {
00151 lp_mach = new char*[lp_num];
00152 lp_num = 0;
00153 }
00154 if (cg_num) {
00155 cg_mach = new char*[cg_num];
00156 cg_num = 0;
00157 }
00158 if (vg_num) {
00159 vg_mach = new char*[vg_num];
00160 vg_num = 0;
00161 }
00162 if (cp_num) {
00163 cp_mach = new char*[cp_num];
00164 cp_num = 0;
00165 }
00166 if (vp_num) {
00167 vp_mach = new char*[vp_num];
00168 vp_num = 0;
00169 }
00170
00171 ctl.open(control_file);
00172 while (ctl) {
00173 ctl.get(line, MAX_PARAM_LINE_LENGTH);
00174 if (ctl) {
00175 ctl.get(ch);
00176 if (ch != '\n') {
00177 printf("Too long (>= %i chars) line in the parameter file.\n",
00178 MAX_PARAM_LINE_LENGTH);
00179 stop("This is absurd. Aborting.\n");
00180 }
00181 }
00182 end_of_line = line + strlen(line);
00183
00184 keyword = find_if(line, end_of_line, isgraph);
00185 if (keyword == end_of_line)
00186 continue;
00187 ctmp = find_if(keyword, end_of_line, isspace);
00188 if (ctmp == end_of_line)
00189 continue;
00190 *ctmp = 0;
00191 ++ctmp;
00192
00193 value = find_if(ctmp, end_of_line, isgraph);
00194 if (value == end_of_line)
00195 continue;
00196
00197 ctmp = find_if(value, end_of_line, isspace);
00198 *ctmp = 0;
00199
00200
00201 if (str_eq(keyword, "BCP_delete_machine")) {
00202 to_delete[to_delete_size++] = strdup(value);
00203 } else if (str_eq(keyword, "BCP_add_machine")) {
00204 to_add[to_add_size++] = strdup(value);
00205 } else if (str_eq(keyword, "BCP_delete_proc")) {
00206 sscanf(value, "t%x", &tid_delete[delete_proc_num++]);
00207 } else if (str_eq(keyword, "BCP_lp_process")) {
00208 lp_mach[lp_num++] = strdup(value);
00209 } else if (str_eq(keyword, "BCP_cg_process")) {
00210 cg_mach[cg_num++] = strdup(value);
00211 } else if (str_eq(keyword, "BCP_vg_process")) {
00212 vg_mach[vg_num++] = strdup(value);
00213 } else if (str_eq(keyword, "BCP_cp_process")) {
00214 cp_mach[cp_num++] = strdup(value);
00215 } else if (str_eq(keyword, "BCP_vp_process")) {
00216 vp_mach[vp_num++] = strdup(value);
00217 }
00218 }
00219 ctl.close();
00220
00221
00222
00223 char ** last = 0;
00224
00225
00226 if (to_delete_size > 0) {
00227 sort(to_delete, to_delete + to_delete_size, str_lt);
00228 last = unique(to_delete, to_delete + to_delete_size, str_eq);
00229 if (to_delete_size != last - to_delete)
00230 stop("A machine to be deleted is listed twice... Aborting.\n");
00231 }
00232
00233
00234 if (to_add_size > 0) {
00235 sort(to_add, to_add + to_add_size, str_lt);
00236 last = unique(to_add, to_add + to_add_size, str_eq);
00237 if (to_add_size != last - to_add)
00238 stop("A machine to be added is listed twice... Aborting.\n");
00239 }
00240
00241 int i;
00242 char ** mach_list = new char*[nhost + to_add_size];
00243 for (i = 0; i < nhost; ++i)
00244 mach_list[i] = strdup(hostp[i].hi_name);
00245 sort(mach_list, mach_list + nhost, str_lt);
00246
00247 char ** current_list = new char*[nhost + to_add_size];
00248
00249
00250 if (to_delete_size > 0) {
00251 last = set_difference(to_delete, to_delete + to_delete_size,
00252 mach_list, mach_list + nhost,
00253 current_list, str_lt);
00254 if (last != current_list)
00255 stop("A nonexisting machine is to be deleted... Aborting.\n");
00256 last = set_difference(mach_list, mach_list + nhost,
00257 to_delete, to_delete + to_delete_size,
00258 current_list, str_lt);
00259 ::swap(mach_list, current_list);
00260 }
00261
00262
00263 if (to_add_size > 0) {
00264 last = set_intersection(to_add, to_add + to_add_size,
00265 mach_list, mach_list + nhost,
00266 current_list, str_lt);
00267 if (last != current_list)
00268 stop("A machine to be added is already there... Aborting.\n");
00269 last = merge(to_add, to_add + to_add_size,
00270 mach_list, mach_list + nhost,
00271 current_list, str_lt);
00272 ::swap(mach_list, current_list);
00273 }
00274
00275 const int mach_num = nhost - to_delete_size + to_add_size;
00276
00277
00278
00279
00280 if (lp_num > 0) {
00281 sort(lp_mach, lp_mach + lp_num, str_lt);
00282 if (set_difference(lp_mach, lp_mach + lp_num,
00283 mach_list, mach_list + mach_num,
00284 current_list, str_lt) != current_list)
00285 stop("An lp machine is not in the final machine list... Aborting.\n");
00286 }
00287 if (cg_num > 0) {
00288 sort(cg_mach, cg_mach + cg_num, str_lt);
00289 if (set_difference(cg_mach, cg_mach + cg_num,
00290 mach_list, mach_list + mach_num,
00291 current_list, str_lt) != current_list)
00292 stop("An cg machine is not in the final machine list... Aborting.\n");
00293 }
00294 if (vg_num > 0) {
00295 sort(vg_mach, vg_mach + vg_num, str_lt);
00296 if (set_difference(vg_mach, vg_mach + vg_num,
00297 mach_list, mach_list + mach_num,
00298 current_list, str_lt) != current_list)
00299 stop("An vg machine is not in the final machine list... Aborting.\n");
00300 }
00301 if (cp_num > 0) {
00302 sort(cp_mach, cp_mach + cp_num, str_lt);
00303 if (set_difference(cp_mach, cp_mach + cp_num,
00304 mach_list, mach_list + mach_num,
00305 current_list, str_lt) != current_list)
00306 stop("An cp machine is not in the final machine list... Aborting.\n");
00307 }
00308 if (vp_num > 0) {
00309 sort(vp_mach, vp_mach + vp_num, str_lt);
00310 if (set_difference(vp_mach, vp_mach + vp_num,
00311 mach_list, mach_list + mach_num,
00312 current_list, str_lt) != current_list)
00313 stop("An vp machine is not in the final machine list... Aborting.\n");
00314 }
00315
00316
00317 find_tree_manager(pid, tm_tid);
00318
00319
00320 if (to_delete_size > 0) {
00321 const int dtid = pvm_tidtohost(tm_tid);
00322 for (i = 0; i < nhost; ++i) {
00323 if (hostp[i].hi_tid == dtid)
00324 for (int j = 0; j < to_delete_size; ++j) {
00325 if (str_eq(hostp[i].hi_name, to_delete[j]))
00326 stop("Can't delete the machine the TM is on. Aborting.\n");
00327 }
00328 }
00329 }
00330
00331
00332 if (delete_proc_num > 0) {
00333 if (find(tid_delete, tid_delete + delete_proc_num, tm_tid) !=
00334 tid_delete + delete_proc_num)
00335 stop("Can't delete the TM... Aborting.\n");
00336 }
00337
00338
00339 if (to_delete_size > 0 || to_add_size > 0) {
00340 int * infos = new int[max(to_delete_size, to_add_size)];
00341 if (to_delete_size > 0)
00342 if (pvm_delhosts(to_delete, to_delete_size, infos) < 0) {
00343 printf("Failed to delete all specified machines...\n");
00344 stop("Please check the situation manually... Aborting.\n");
00345 }
00346 if (to_add_size > 0)
00347 if (pvm_addhosts(to_add, to_add_size, infos) < 0) {
00348 printf("Failed to add all specified machines...\n");
00349 stop("Please check the situation manually... Aborting.\n");
00350 }
00351 }
00352
00353
00354 for (i = 0; i < delete_proc_num; ++i)
00355 pvm_kill(tid_delete[i]);
00356
00357
00358
00359 int len = (lp_num + cg_num + vg_num + cp_num + vp_num) * sizeof(int);
00360 if (len > 0) {
00361 len += 5 * sizeof(int);
00362 for (i = 0; i < lp_num; ++i) len += strlen(lp_mach[i]);
00363 for (i = 0; i < cg_num; ++i) len += strlen(cg_mach[i]);
00364 for (i = 0; i < vg_num; ++i) len += strlen(vg_mach[i]);
00365 for (i = 0; i < cp_num; ++i) len += strlen(cp_mach[i]);
00366 for (i = 0; i < vp_num; ++i) len += strlen(vp_mach[i]);
00367
00368 char * buf = new char[len];
00369
00370 memcpy(buf, &lp_num, sizeof(int));
00371 buf += sizeof(int);
00372 for (i = 0; i < lp_num; ++i) {
00373 const int l = strlen(lp_mach[i]);
00374 memcpy(buf, &l, sizeof(int));
00375 buf += sizeof(int);
00376 memcpy(buf, lp_mach[i], l);
00377 buf += l;
00378 }
00379
00380 memcpy(buf, &cg_num, sizeof(int));
00381 buf += sizeof(int);
00382 for (i = 0; i < cg_num; ++i) {
00383 const int l = strlen(cg_mach[i]);
00384 memcpy(buf, &l, sizeof(int));
00385 buf += sizeof(int);
00386 memcpy(buf, cg_mach[i], l);
00387 buf += l;
00388 }
00389
00390 memcpy(buf, &vg_num, sizeof(int));
00391 buf += sizeof(int);
00392 for (i = 0; i < vg_num; ++i) {
00393 const int l = strlen(vg_mach[i]);
00394 memcpy(buf, &l, sizeof(int));
00395 buf += sizeof(int);
00396 memcpy(buf, vg_mach[i], l);
00397 buf += l;
00398 }
00399
00400 memcpy(buf, &cp_num, sizeof(int));
00401 buf += sizeof(int);
00402 for (i = 0; i < cp_num; ++i) {
00403 const int l = strlen(cp_mach[i]);
00404 memcpy(buf, &l, sizeof(int));
00405 buf += sizeof(int);
00406 memcpy(buf, cp_mach[i], l);
00407 buf += l;
00408 }
00409
00410 memcpy(buf, &vp_num, sizeof(int));
00411 buf += sizeof(int);
00412 for (i = 0; i < vp_num; ++i) {
00413 const int l = strlen(vp_mach[i]);
00414 memcpy(buf, &l, sizeof(int));
00415 buf += sizeof(int);
00416 memcpy(buf, vp_mach[i], l);
00417 buf += l;
00418 }
00419
00420 buf -= len;
00421
00422 pvm_initsend(PvmDataRaw);
00423 pvm_pkbyte(buf, len, 1);
00424 pvm_send(tm_tid, BCP_CONFIG_CHANGE);
00425
00426 int bufid = pvm_recv(tm_tid, -1);
00427 int bytes = 0, msgtag = 0;
00428 pvm_bufinfo(bufid, &bytes, &msgtag, &tm_tid);
00429 if (msgtag == BCP_CONFIG_ERROR)
00430 stop("TM had difficulties. Please check the situation manually.\n");
00431 }
00432
00433 pvm_exit();
00434 return 0;
00435 }
00436
00437
00438
00439
00440 static void
00441 find_tree_manager(const int my_tid, int &tm_tid)
00442 {
00443 struct pvmtaskinfo *taskp = 0;
00444 int ntask = 0;
00445 pvm_tasks(0, &ntask, &taskp);
00446 int * tids = new int[ntask];
00447 int i, k;
00448
00449 for (i = 0, k = 0; i < ntask; ++i) {
00450 if (taskp[i].ti_ptid != 0)
00451 continue;
00452 if (taskp[i].ti_tid == my_tid)
00453 continue;
00454
00455 tids[k++] = taskp[i].ti_tid;
00456 }
00457
00458 if (tm_tid != 0) {
00459
00460 for (i = 0; i < k; ++i)
00461 if (tids[i] == tm_tid)
00462 break;
00463 if (i == k)
00464 stop("No TM candidate has the given tid... Aborting.\n");
00465 } else {
00466
00467 pvm_initsend(PvmDataRaw);
00468 pvm_mcast(tids, k, BCP_ARE_YOU_TREEMANAGER);
00469
00470 struct timeval tout = {15, 0};
00471 int bufid = pvm_trecv(-1, BCP_I_AM_TREEMANAGER, &tout);
00472 if (bufid == 0)
00473 stop("No TM candidates replied within 30 seconds... Aborting.\n");
00474 int bytes = 0, msgtag = 0;
00475 pvm_bufinfo(bufid, &bytes, &msgtag, &tm_tid);
00476 }
00477
00478 delete[] tids;
00479 }
00480
00481
00482
00483 static void
00484 stop(const char * msg)
00485 {
00486 printf("%s", msg);
00487 pvm_exit();
00488 abort();
00489 }