00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef AlpsKnowledgeBrokerMPI_h_
00024 #define AlpsKnowledgeBrokerMPI_h_
00025
00026 #include <cmath>
00027 #include <iosfwd>
00028
00029 #undef SEEK_SET
00030 #undef SEEK_END
00031 #undef SEEK_CUR
00032 #include "mpi.h"
00033
00034 #include "AlpsEnumProcessT.h"
00035 #include "AlpsKnowledge.h"
00036 #include "AlpsKnowledgeBroker.h"
00037 #include "AlpsParams.h"
00038
00039
00040
00041 class AlpsKnowledgeBrokerMPI : public AlpsKnowledgeBroker {
00042
00043 private:
00045 AlpsKnowledgeBrokerMPI(const AlpsKnowledgeBrokerMPI&);
00046 AlpsKnowledgeBrokerMPI& operator=(const AlpsKnowledgeBrokerMPI&);
00047
00048 protected:
00049
00055 int processNum_;
00056
00058 int hubNum_;
00059
00061 int globalRank_;
00062
00064 MPI_Comm clusterComm_;
00065
00067 MPI_Comm hubComm_;
00068
00070 MPI_Group hubGroup_;
00071
00073 int clusterSize_;
00074
00076 int userClusterSize_;
00077
00079 int clusterRank_;
00080
00082 int* hubRanks_;
00083
00085 int myHubRank_;
00086
00088 int masterRank_;
00089
00091 AlpsProcessType processType_;
00092
00094 AlpsProcessType* processTypeList_;
00095
00097 bool hubWork_;
00098
00100 MPI_Request subTreeRequest_;
00101
00103 MPI_Request solRequestL_;
00104 MPI_Request solRequestR_;
00105
00107 MPI_Request modelKnowRequestL_;
00108 MPI_Request modelKnowRequestR_;
00109
00111 MPI_Request forwardRequestL_;
00112 MPI_Request forwardRequestR_;
00114
00120 double incumbentValue_;
00121
00123 int incumbentID_;
00124
00127 bool updateIncumbent_;
00129
00135 double workQuality_;
00136
00138 double clusterWorkQuality_;
00139
00141 double systemWorkQuality_;
00142
00144 double* hubWorkQualities_;
00145
00148 double* workerWorkQualities_;
00149
00151 double workQuantity_;
00152
00154 double clusterWorkQuantity_;
00155
00157 double systemWorkQuantity_;
00158
00160 double systemWorkQuantityForce_;
00161
00163 double* hubWorkQuantities_;
00164
00167 double* workerWorkQuantities_;
00168
00170 bool* workerReported_;
00171
00173 bool* hubReported_;
00174
00176 bool allHubReported_;
00177
00179 int masterDoBalance_;
00180
00182 int hubDoBalance_;
00183
00185 int* workerNodeProcesseds_;
00186
00188 int clusterNodeProcessed_;
00189
00191 int* hubNodeProcesseds_;
00192
00194 int systemNodeProcessed_;
00196
00202 int sendCount_;
00203
00205 int recvCount_;
00206
00209 int clusterSendCount_;
00210
00213 int clusterRecvCount_;
00214
00216 int systemSendCount_;
00217
00219 int systemRecvCount_;
00221
00226 int masterIndexBatch_;
00228
00234 AlpsTimer masterTimer_;
00235
00237 AlpsTimer hubTimer_;
00238
00240 AlpsTimer workerTimer_;
00241
00243 double rampUpTime_;
00244
00246 double rampDownTime_;
00247
00249 double idleTime_;
00250
00252 double msgTime_;
00253
00255 AlpsPsStats psStats_;
00257
00259 bool forceTerminate_;
00260
00262 bool blockTermCheck_;
00263
00265 bool blockHubReport_;
00266
00268 bool blockWorkerReport_;
00269
00271 bool blockAskForWork_;
00272
00274 char *attachBuffer_;
00275
00277 char *largeBuffer_;
00278
00280 char *largeBuffer2_;
00281
00283 char *smallBuffer_;
00284
00287 double masterBalancePeriod_;
00288
00291 double hubReportPeriod_;
00292
00294 int modelGenID_;
00295
00297 int modelGenPos_;
00298
00300 AlpsSubTree* rampUpSubTree_;
00301
00303 int unitWorkNodes_;
00304
00306 int haltSearch_;
00307
00308 protected:
00309
00311 void init();
00312
00319 void masterMain(AlpsTreeNode* root);
00320
00323 void hubMain();
00324
00327 void workerMain();
00329
00331
00332
00333 AlpsReturnStatus doOneUnitWork(int unitWork,
00334 double unitTime,
00335 AlpsExitStatus & exitStatus,
00336 int & numNodesProcessed,
00337 int & depth,
00338 bool & betterSolution);
00339
00341 void processMessages(char *&buffer,
00342 MPI_Status &status,
00343 MPI_Request &request);
00344
00346 void rootInitMaster(AlpsTreeNode* root);
00347 void rootInitHub();
00348 void rootInitWorker();
00349
00351 void spiralMaster(AlpsTreeNode* root);
00352 void spiralHub();
00353 void spiralWorker();
00354
00355
00356
00361 void masterAskHubDonate(int donorID,
00362 int receiverID,
00363 double receiverWorkload);
00364
00366 void hubAskWorkerDonate(int donorID,
00367 int receiverID,
00368 double receiverWorkload);
00369
00371 void updateWorkloadInfo();
00372
00374 void donateWork(char*& buf,
00375 int tag,
00376 MPI_Status* status,
00377 int recvID = -1,
00378 double recvWL = 0.0);
00379
00381 void hubAllocateDonation(char*& buf, MPI_Status* status);
00382
00384 void hubBalanceWorkers();
00385
00387 void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status);
00388
00390
00391 void hubReportStatus(int tag, MPI_Comm comm);
00392
00394
00395 void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
00396
00398 void hubsShareWork(char*& buf, MPI_Status* status);
00399
00401 void masterBalanceHubs();
00402
00404
00405 void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
00406
00408 void refreshSysStatus();
00409
00411 void refreshClusterStatus();
00412
00414
00415 void workerReportStatus(int tag, MPI_Comm comm);
00417
00418
00419
00425 void workerAskIndices();
00426
00428 void workerRecvIndices(char *&bufLarge);
00429
00431 void masterSendIndices(char *&bufLarge);
00433
00434
00435
00441 void broadcastModel(const int id, const int source);
00442
00444 void sendIncumbent();
00445
00448 bool unpackSetIncumbent(char*& buf, MPI_Status* status);
00449
00451 void collectBestSolution(int destination);
00452
00455 void tellMasterRecv();
00456
00459
00460 void tellHubRecv();
00461
00466 void packEncoded(AlpsEncoded* enc,
00467 char*& buf,
00468 int& size,
00469 int& position,
00470 MPI_Comm comm);
00471
00473 AlpsEncoded* unpackEncoded(char*& buf,
00474 int& position,
00475 MPI_Comm comm,
00476 int size = -1);
00477
00480
00481 void receiveSizeBuf(char*& buf,
00482 int sender,
00483 int tag,
00484 MPI_Comm comm,
00485 MPI_Status* status);
00486
00489
00490 void receiveRampUpNode(int sender,
00491 MPI_Comm comm,
00492 MPI_Status* status);
00493
00496 void receiveSubTree(char*& buf, int sender, MPI_Status* status);
00497
00499
00500 void sendSizeBuf(char*& buf,
00501 int size,
00502 int position,
00503 const int target,
00504 const int tag,
00505 MPI_Comm comm);
00506
00509
00510 void sendRampUpNode(const int target, MPI_Comm comm);
00511
00514 void sendNodeModelGen(int receiver, int doUnitWork);
00515
00517 bool sendSubTree(const int target, AlpsSubTree*& st, int tag);
00518
00520
00521 void sendFinishInit(const int target, MPI_Comm comm);
00523
00525 void deleteSubTrees();
00526
00527
00528 void forwardModelKnowledge();
00529
00531
00532 void sendModelKnowledge(MPI_Comm comm, int receiver=-1);
00533
00535
00536 void receiveModelKnowledge(MPI_Comm comm);
00537
00542 void incSendCount(const char* how, int s = 1);
00544 void decSendCount(const char* how, int s = 1);
00546 void incRecvCount(const char* how, int s = 1);
00548 void decRecvCount(const char* how, int s = 1);
00550
00552 void masterForceHubTerm();
00553
00555 void hubForceWorkerTerm();
00556
00558 void changeWorkingSubTree(double & changeWorkThreshold);
00559
00561 void sendErrorCodeToMaster(int errorCode);
00562
00564 void recvErrorCode(char *& bufLarge);
00565
00567 void spiralRecvProcessNode();
00568
00570 void spiralDonateNode();
00571
00572 public:
00573
00576 AlpsKnowledgeBrokerMPI()
00577 :
00578 AlpsKnowledgeBroker()
00579 {
00580 init();
00581 }
00582
00584 AlpsKnowledgeBrokerMPI(int argc,
00585 char* argv[],
00586 AlpsModel& model)
00587 :
00588 AlpsKnowledgeBroker()
00589 {
00590 init();
00591 initializeSearch(argc, argv, model);
00592 }
00593
00595 ~AlpsKnowledgeBrokerMPI();
00596
00598 virtual int getProcRank() const { return globalRank_; }
00599
00601 virtual int getMasterRank() const { return masterRank_; }
00602
00604 virtual AlpsProcessType getProcType() const { return processType_; }
00605
00618 void initializeSearch(int argc, char* argv[], AlpsModel& model);
00619
00621 void search(AlpsModel *model);
00622
00630 void rootSearch(AlpsTreeNode* root);
00631
00635 virtual double getIncumbentValue() const {
00636 return incumbentValue_;
00637 }
00639 virtual double getBestQuality() const {
00640 if (globalRank_ == masterRank_) {
00641 if (getNumKnowledges(AlpsKnowledgeTypeSolution) > 0) {
00642 return getBestKnowledge(AlpsKnowledgeTypeSolution).second;
00643 }
00644 else {
00645 return ALPS_OBJ_MAX;
00646 }
00647 }
00648 else {
00649 return ALPS_OBJ_MAX;
00650 }
00651 }
00652
00654 virtual void printBestSolution(char* outputFile = 0) const;
00655
00657 virtual void searchLog();
00659
00660
00661
00666 void sendKnowledge(AlpsKnowledgeType type,
00667 int sender,
00668 int receiver,
00669 char *& msgBuffer,
00670 int msgSize,
00671 int msgTag,
00672 MPI_Comm comm,
00673 bool blocking);
00674
00676 void receiveKnowledge(AlpsKnowledgeType type,
00677 int sender,
00678 int receiver,
00679 char *& msgBuffer,
00680 int msgSize,
00681 int msgTag,
00682 MPI_Comm comm,
00683 MPI_Status* status,
00684 bool blocking);
00685
00687 void requestKnowledge(AlpsKnowledgeType type,
00688 int sender,
00689 int receiver,
00690 char *& msgBuffer,
00691 int msgSize,
00692 int msgTag,
00693 MPI_Comm comm,
00694 bool blocking);
00696
00697 };
00698 #endif
00699
00700