AlpsKnowledgeBrokerMPI.h
Go to the documentation of this file.
1 /*===========================================================================*
2  * This file is part of the Abstract Library for Parallel Search (ALPS). *
3  * *
4  * ALPS is distributed under the Eclipse Public License as part of the *
5  * COIN-OR repository (http://www.coin-or.org). *
6  * *
7  * Authors: *
8  * *
9  * Yan Xu, Lehigh University *
10  * Ted Ralphs, Lehigh University *
11  * *
12  * Conceptual Design: *
13  * *
14  * Yan Xu, Lehigh University *
15  * Ted Ralphs, Lehigh University *
16  * Laszlo Ladanyi, IBM T.J. Watson Research Center *
17  * Matthew Saltzman, Clemson University *
18  * *
19  * *
20  * Copyright (C) 2001-2013, Lehigh University, Yan Xu, and Ted Ralphs. *
21  *===========================================================================*/
22 
23 #ifndef AlpsKnowledgeBrokerMPI_h_
24 #define AlpsKnowledgeBrokerMPI_h_
25 
26 #include <cmath>
27 #include <iosfwd>
28 
29 #undef SEEK_SET
30 #undef SEEK_END
31 #undef SEEK_CUR
32 #include "mpi.h"
33 
34 #include "AlpsEnumProcessT.h"
35 #include "AlpsKnowledge.h"
36 #include "AlpsKnowledgeBroker.h"
37 #include "AlpsParams.h"
38 
39 //#############################################################################
40 
42 
43  private:
47 
48  protected:
49 
56 
58  int hubNum_;
59 
62 
64  MPI_Comm clusterComm_;
65 
67  MPI_Comm hubComm_;
68 
70  MPI_Group hubGroup_;
71 
74 
77 
80 
82  int* hubRanks_;
83 
86 
89 
92 
95 
97  bool hubWork_;
98 
100  MPI_Request subTreeRequest_;
101 
103  MPI_Request solRequestL_;
104  MPI_Request solRequestR_;
105 
107  MPI_Request modelKnowRequestL_;
108  MPI_Request modelKnowRequestR_;
109 
111  MPI_Request forwardRequestL_;
112  MPI_Request forwardRequestR_;
114 
121 
124 
129 
135  double workQuality_;
136 
139 
142 
145 
149 
152 
155 
158 
161 
164 
168 
171 
174 
177 
180 
183 
186 
189 
193 
200 
203 
207 
211 
214 
218 
225 
232 
235 
238 
240  double rampUpTime_;
241 
244 
246  double idleTime_;
247 
249  double msgTime_;
250 
254 
257 
260 
263 
266 
269 
272 
275 
278 
281 
285 
289 
292 
295 
298 
301 
304 
305  protected:
306 
308  void init();
309 
316  void masterMain(AlpsTreeNode* root);
317 
320  void hubMain();
321 
324  void workerMain();
326 
328  // The same subtree will be explored next time if it still have
329  // unexplored nodes.
330  AlpsReturnStatus doOneUnitWork(int unitWork,
331  double unitTime,
332  AlpsExitStatus & exitStatus,
333  int & numNodesProcessed,
334  int & numNodesBranched,
335  int & numNodesDiscarded,
336  int & numNodesPartial,
337  int & depth,
338  bool & betterSolution);
339 
341  void processMessages(char *&buffer,
342  MPI_Status &status,
343  MPI_Request &request);
344 
346  void rootInitMaster(AlpsTreeNode* root);
347  void rootInitHub();
348  void rootInitWorker();
349 
351  void spiralMaster(AlpsTreeNode* root);
352  void spiralHub();
353  void spiralWorker();
354 
355  //------------------------------------------------------
356 
361  void masterAskHubDonate(int donorID,
362  int receiverID,
363  double receiverWorkload);
364 
366  void hubAskWorkerDonate(int donorID,
367  int receiverID,
368  double receiverWorkload);
369 
371  void updateWorkloadInfo();
372 
373  virtual int getNumNodeLeftSystem()
374  { return static_cast<int>(systemWorkQuantity_); }
375 
377  void donateWork(char*& buf,
378  int tag,
379  MPI_Status* status,
380  int recvID = -1,
381  double recvWL = 0.0);
382 
384  void hubAllocateDonation(char*& buf, MPI_Status* status);
385 
387  void hubBalanceWorkers();
388 
390  void hubSatisfyWorkerRequest(char*& buf, MPI_Status* status);
391 
393  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
394  void hubReportStatus(int tag, MPI_Comm comm);
395 
397  // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
398  void hubUpdateCluStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
399 
401  void hubsShareWork(char*& buf, MPI_Status* status);
402 
404  void masterBalanceHubs();
405 
407  // NOTE: comm is hubComm or MPI_COMM_WORLD.
408  void masterUpdateSysStatus(char*& buf, MPI_Status* status, MPI_Comm comm);
409 
411  void refreshSysStatus();
412 
414  void refreshClusterStatus();
415 
417  // NOTE: comm is clusterComm_ or MPI_COMM_WORLD.
418  void workerReportStatus(int tag, MPI_Comm comm);
420 
421  //------------------------------------------------------
422 
428  void workerAskIndices();
429 
431  void workerRecvIndices(char *&bufLarge);
432 
434  void masterSendIndices(char *&bufLarge);
436 
437  //------------------------------------------------------
438 
444  void broadcastModel(const int id, const int source);
445 
447  void sendIncumbent();
448 
451  bool unpackSetIncumbent(char*& buf, MPI_Status* status);
452 
454  void collectBestSolution(int destination);
455 
458  void tellMasterRecv();
459 
462  // Not used
463  void tellHubRecv();
464 
469  void packEncoded(AlpsEncoded* enc,
470  char*& buf,
471  int& size,
472  int& position,
473  MPI_Comm comm);
474 
476  AlpsEncoded* unpackEncoded(char*& buf,
477  int& position,
478  MPI_Comm comm,
479  int size = -1);
480 
483  // NOTE: comm is hubComm_ or clusterComm_
484  void receiveSizeBuf(char*& buf,
485  int sender,
486  int tag,
487  MPI_Comm comm,
488  MPI_Status* status);
489 
492  // NOTE: comm is hubComm_ or clusterComm_
493  void receiveRampUpNode(int sender,
494  MPI_Comm comm,
495  MPI_Status* status);
496 
499  void receiveSubTree(char*& buf, int sender, MPI_Status* status);
500 
502  // NOTE: comm is hubComm_ or clusterComm_.
503  void sendSizeBuf(char*& buf,
504  int size,
505  int position,
506  const int target,
507  const int tag,
508  MPI_Comm comm);
509 
512  // NOTE: comm is hubComm_ or clusterComm_.
513  void sendRampUpNode(const int target, MPI_Comm comm);
514 
517  void sendNodeModelGen(int receiver, int doUnitWork);
518 
520  bool sendSubTree(const int target, AlpsSubTree*& st, int tag);
521 
523  // NOTE: comm is hubComm_ or clusterComm_.
524  void sendFinishInit(const int target, MPI_Comm comm);
526 
528  void deleteSubTrees();
529 
530 
531  void forwardModelKnowledge();
532 
534  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
535  void sendModelKnowledge(MPI_Comm comm, int receiver=-1);
536 
538  // NOTE: comm is hubComm_ or MPI_COMM_WORLD.
539  void receiveModelKnowledge(MPI_Comm comm);
540 
545  void incSendCount(const char* how, int s = 1);
547  void decSendCount(const char* how, int s = 1);
549  void incRecvCount(const char* how, int s = 1);
551  void decRecvCount(const char* how, int s = 1);
553 
555  void masterForceHubTerm();
556 
558  void hubForceWorkerTerm();
559 
561  void changeWorkingSubTree(double & changeWorkThreshold);
562 
564  void sendErrorCodeToMaster(int errorCode);
565 
567  void recvErrorCode(char *& bufLarge);
568 
570  void spiralRecvProcessNode();
571 
573  void spiralDonateNode();
574 
575  public:
576 
580  :
582  {
583  init();
584  }
585 
588  char* argv[],
589  AlpsModel& model)
590  :
592  {
593  init();
594  initializeSearch(argc, argv, model);
595  }
596 
599 
601  virtual int getProcRank() const { return globalRank_; }
602 
604  virtual int getMasterRank() const { return masterRank_; }
605 
607  virtual AlpsProcessType getProcType() const { return processType_; }
608 
621  void initializeSearch(int argc, char* argv[], AlpsModel& model);
622 
624  void search(AlpsModel *model);
625 
633  void rootSearch(AlpsTreeNode* root);
634 
638  virtual double getIncumbentValue() const {
639  double bestObj = ALPS_OBJ_MAX;
642  if (incumbentValue_ > bestObj) {
643  return bestObj;
644  }
645  }
646  return incumbentValue_;
647  }
648 
650  virtual double getBestQuality() const {
651  if (globalRank_ == masterRank_) {
654  }
655  else {
656  return ALPS_OBJ_MAX;
657  }
658  }
659  else {
660  return ALPS_OBJ_MAX;
661  }
662  }
663 
665  virtual double getBestEstimateQuality() { return systemWorkQuality_; }
666 
668  virtual void printBestSolution(char* outputFile = 0) const;
669 
671  virtual void searchLog();
673 
674  //------------------------------------------------------
675 
680  void sendKnowledge(AlpsKnowledgeType type,
681  int sender,
682  int receiver,
683  char *& msgBuffer,
684  int msgSize,
685  int msgTag,
686  MPI_Comm comm,
687  bool blocking);
688 
691  int sender,
692  int receiver,
693  char *& msgBuffer,
694  int msgSize,
695  int msgTag,
696  MPI_Comm comm,
697  MPI_Status* status,
698  bool blocking);
699 
702  int sender,
703  int receiver,
704  char *& msgBuffer,
705  int msgSize,
706  int msgTag,
707  MPI_Comm comm,
708  bool blocking);
710 
711 };
712 #endif
713 
714 //#############################################################################
AlpsTimer masterTimer_
Master timer.
double * workerWorkQualities_
The workload qualities of workers in the cluster to which this proces belongs.
void init()
Initialize member data.
void masterBalanceHubs()
Master balance the workload of hubs.
The base class of knowledge broker class.
void tellMasterRecv()
Inform master that a proc has received workload during a load balance initialized by master...
void initializeSearch(int argc, char *argv[], AlpsModel &model)
This function.
void workerMain()
Worker first receive subtrees, then start to explore them.
bool allHubReported_
Indicate whether all hubs have reported status to master at least once.
double clusterWorkQuantity_
The workload quantity of the cluster to which the process belongs.
bool sendSubTree(const int target, AlpsSubTree *&st, int tag)
Send a given subtree to the target process.
void workerAskIndices()
A worker ask for node index from master.
void incSendCount(const char *how, int s=1)
Increment the number of sent message.
void requestKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, bool blocking)
Request knowlege.
virtual int getNumNodeLeftSystem()
Master asks a hub to donate its workload to another hub.
int * workerNodeProcesseds_
To record how many nodes processed for each worker in a cluster.
int clusterSendCount_
The number of new messages sent by the processes in clusterComm_ after last survey.
double msgTime_
The time spent processing messages (include idle).
double hubReportPeriod_
The period that a hub load balancing and report cluster status.
void sendKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, bool blocking)
Set knowlege.
void hubMain()
Hub generates subtrees and sends them to workers in Round-Robin way.
void hubUpdateCluStatus(char *&buf, MPI_Status *status, MPI_Comm comm)
A hub unpacks the status of a worker from buffer.
void tellHubRecv()
Inform hub that a proc has received workload during a load balance initialized by a hub...
bool blockWorkerReport_
Indicate whether a worker need to report state to its hub.
AlpsKnowledgeBrokerMPI()
Default construtor.
char * largeBuffer_
Large message buffer.
char * attachBuffer_
Buffer attached to MPI when sharing generated knowledge.
void sendSizeBuf(char *&buf, int size, int position, const int target, const int tag, MPI_Comm comm)
Send the size and content of a buffer to the target process.
AlpsReturnStatus
Definition: Alps.h:118
virtual AlpsProcessType getProcType() const
Query the type (master, hub, or worker) of the process.
int clusterNodeProcessed_
To record how many nodes by a cluster.
void receiveSizeBuf(char *&buf, int sender, int tag, MPI_Comm comm, MPI_Status *status)
Receive the size of buffer, allocate memory for buffer, then receive the message and put it in buffer...
int clusterRank_
The local rank of the process in clusterComm_.
AlpsProcessType processType_
The AlpsProcessType of this process.
void hubForceWorkerTerm()
Hub tell workers to terminate due to reaching limits or other reason.
void sendErrorCodeToMaster(int errorCode)
Send error code to master.
void masterForceHubTerm()
Master tell hubs to terminate due to reaching limits or other reason.
bool blockAskForWork_
Indicate whether a worker need to as for work from its hub.
int recvCount_
The number of new messages received by the process after last survey.
int processNum_
The Number of processes launched.
void receiveKnowledge(AlpsKnowledgeType type, int sender, int receiver, char *&msgBuffer, int msgSize, int msgTag, MPI_Comm comm, MPI_Status *status, bool blocking)
Receive knowlege.
void search(AlpsModel *model)
Search best solution for a given model.
double clusterWorkQuality_
The workload quality of the cluster to which the process belong.
This class contains the data pertaining to a particular subtree in the search tree.
Definition: AlpsSubTree.h:47
void hubAllocateDonation(char *&buf, MPI_Status *status)
Hub allocates the donated workload to its workers.
char * smallBuffer_
Small message buffer.
void receiveModelKnowledge(MPI_Comm comm)
Receive generated knowlege (related to model) from sender.
AlpsEncoded * unpackEncoded(char *&buf, int &position, MPI_Comm comm, int size=-1)
Unpack the given buffer into an AlpsEncoded instance.
double systemWorkQuantityForce_
The workload quantity of the whole system before forcing termination.
double incumbentValue_
Incumbent value.
MPI_Request solRequestL_
Send model knoledge request.
void hubSatisfyWorkerRequest(char *&buf, MPI_Status *status)
Hub satisfies the workload rquest from a worker.
void decSendCount(const char *how, int s=1)
Decrement the number of sent message.
void changeWorkingSubTree(double &changeWorkThreshold)
Change subtree to be explored if it is too worse.
double systemWorkQuality_
The workload quality of the whole system.
This data structure is to contain the packed form of an encodable knowledge.
Definition: AlpsEncoded.h:25
void masterUpdateSysStatus(char *&buf, MPI_Status *status, MPI_Comm comm)
Master unpack the status of a hub from buf and update system status.
MPI_Request modelKnowRequestR_
The Number of processes launched.
char * largeBuffer2_
Large message buffer.
void receiveSubTree(char *&buf, int sender, MPI_Status *status)
Receive a subtree from the sender process and add it into the subtree pool.
int clusterSize_
The actual size of the cluster to which the process belongs.
MPI_Request modelKnowRequestL_
Send model knoledge request.
void masterSendIndices(char *&bufLarge)
Master send a batch of node indices to the receiving worker.
void spiralMaster(AlpsTreeNode *root)
Static load balancing: spiral.
int masterRank_
The global rank of the master.
int hubDoBalance_
Whether a hub do load balance.
bool blockHubReport_
Indicate whether a hub need to report state to master.
void sendFinishInit(const int target, MPI_Comm comm)
Send finish initialization signal to the target process.
AlpsSubTree * rampUpSubTree_
A subtree used in during up.
void decRecvCount(const char *how, int s=1)
Decrement the number of sent message.
double rampUpTime_
The time spent in ramp up.
void hubBalanceWorkers()
Hub balances the workloads of its workers.
MPI_Request subTreeRequest_
Send subtree request.
MPI_Comm clusterComm_
Communicator of the cluster to which the process belongs.
int systemSendCount_
The total number of messages sent by the all processes.
void sendRampUpNode(const int target, MPI_Comm comm)
Send the size and the content of the best node of a given subtree to the target process.
AlpsKnowledgeBrokerMPI(int argc, char *argv[], AlpsModel &model)
Useful construtor.
AlpsProcessType
This enumerative constant describes the various process types.
int modelGenPos_
Size of the shared knowledge.
This class holds one node of the search tree.
Definition: AlpsTreeNode.h:50
virtual int getMasterRank() const
Query the global rank of the Master.
void hubAskWorkerDonate(int donorID, int receiverID, double receiverWorkload)
Hub asks a worker to donate its workload to another worker.
AlpsKnowledgeType
Definition: Alps.h:88
int modelGenID_
The global rank of the process that share generated model knowledge.
bool * workerReported_
Indicate which worker has been reported its work.
void broadcastModel(const int id, const int source)
Broadcast the model from source to other processes.
AlpsExitStatus
Definition: Alps.h:101
double * hubWorkQualities_
The workload qualities of hubs.
int haltSearch_
Temporily halt search.
AlpsProcessType * processTypeList_
The AlpsProcessType of all process.
void hubReportStatus(int tag, MPI_Comm comm)
A hub reports its status (workload and msg counts) to the master.
double * workerWorkQuantities_
The workload quantities of workers in the cluster to which this proces belongs.
int systemRecvCount_
The total number of messages sent by the all processes.
void rootInitMaster(AlpsTreeNode *root)
Static load balancing: Root Initialization.
void collectBestSolution(int destination)
Send the best solution from the process having it to destination.
int myHubRank_
The global rank of its hub for a worker.
AlpsPsStats psStats_
More statistics.
#define ALPS_OBJ_MAX
Definition: Alps.h:145
double workQuantity_
The workload quantity of the workload on the process.
void recvErrorCode(char *&bufLarge)
Receive error code and set solution status.
int sendCount_
The number of new messages sent by the process after last survey.
virtual void searchLog()
Log search statistics.
void workerRecvIndices(char *&bufLarge)
A worker receive node index from master.
int incumbentID_
The process id that store the incumbent.
MPI_Request solRequestR_
The Number of processes launched.
int masterDoBalance_
Whether master do load balance.
MPI_Group hubGroup_
MPI_Group consists of all hubs.
void sendModelKnowledge(MPI_Comm comm, int receiver=-1)
Set generated knowlege (related to model) to receiver.
void packEncoded(AlpsEncoded *enc, char *&buf, int &size, int &position, MPI_Comm comm)
Pack an AlpsEncoded instance into buf.
void refreshSysStatus()
The master re-calculate the system status.
double workQuality_
The workload quality of the process.
int userClusterSize_
The user reqested size of a cluster.
AlpsTimer workerTimer_
Worker timer.
bool unpackSetIncumbent(char *&buf, MPI_Status *status)
unpack the incumbent value, then store it and the id of the process having the incumbent in AlpsDataP...
AlpsReturnStatus doOneUnitWork(int unitWork, double unitTime, AlpsExitStatus &exitStatus, int &numNodesProcessed, int &numNodesBranched, int &numNodesDiscarded, int &numNodesPartial, int &depth, bool &betterSolution)
Explore a subtree from subtree pool for certain units of work/time.
void receiveRampUpNode(int sender, MPI_Comm comm, MPI_Status *status)
First receive the size and the contend of a node, then construct a subtree with this received node...
int * hubNodeProcesseds_
To record how many nodes processed for each hub.
virtual std::pair< AlpsKnowledge *, double > getBestKnowledge(AlpsKnowledgeType kt) const
Get the best knowledge in the given type of knowledge pools.
AlpsTimer hubTimer_
Hub timer.
bool updateIncumbent_
Indicate whether the incumbent value is updated between two checking point.
void masterAskHubDonate(int donorID, int receiverID, double receiverWorkload)
Master asks a hub to donate its workload to another hub.
AlpsKnowledgeBrokerMPI & operator=(const AlpsKnowledgeBrokerMPI &)
void sendNodeModelGen(int receiver, int doUnitWork)
Send a node from rampUpSubTree&#39;s node pool and generated model knowledge.
virtual void printBestSolution(char *outputFile=0) const
Master prints out the best solution that it knows.
bool * hubReported_
Indicate which hub has been reported its work.
virtual int getProcRank() const
Query the global rank of the process.
MPI_Comm hubComm_
Communicator consists of all hubs.
void updateWorkloadInfo()
Calculate the work quality and quantity on this process.
double * hubWorkQuantities_
The workload quantities of all clusters/hubs.
bool forceTerminate_
Terminate due to reaching limits (time and node) or other reason.
double idleTime_
The time spent waiting for work.
void hubsShareWork(char *&buf, MPI_Status *status)
Two hubs share their workload.
MPI_Request forwardRequestL_
Forward model knoledge request.
void sendIncumbent()
Sent the incumbent value and rank to its two child if eixt.
virtual double getBestQuality() const
The master queries the quality of the best solution it knowns.
virtual bool hasKnowledge(AlpsKnowledgeType kt) const
Query whether there are knowledges in the given type of knowledge pools.
void donateWork(char *&buf, int tag, MPI_Status *status, int recvID=-1, double recvWL=0.0)
A worker donate its workload to the specified worker.
bool hubWork_
Whether hub should also work as a worker.
int clusterRecvCount_
The number of new messages received by the processes in clusterComm_ after last survey.
void spiralDonateNode()
Unpack msg and donate a node.
int globalRank_
The rank of the process in MPI_COMM_WORLD.
void rootSearch(AlpsTreeNode *root)
This function.
bool blockTermCheck_
Indicate whether do termination check.
void incRecvCount(const char *how, int s=1)
Increment the number of received message.
MPI_Request forwardRequestR_
The Number of processes launched.
double rampDownTime_
The time spent in ramp down.
int hubNum_
The Number of hubs.
void refreshClusterStatus()
A hub adds its status to the cluster&#39;s status.
void deleteSubTrees()
Delete subTrees in pools and the active subtree.
void masterMain(AlpsTreeNode *root)
Master generates subtrees and sends them to hubs in Round-Robin way.
virtual double getBestEstimateQuality()
Get best estimalted quality in system.
virtual double getIncumbentValue() const
The process queries the quality of the incumbent this process stores.
virtual int getNumKnowledges(AlpsKnowledgeType kt) const
Query the number of knowledge in the given type of a knowledge pool.
~AlpsKnowledgeBrokerMPI()
Destructor.
void processMessages(char *&buffer, MPI_Status &status, MPI_Request &request)
Processing messages.
int unitWorkNodes_
Number of nodes in one unit of work.
void spiralRecvProcessNode()
Unpack the node, explore it and send load info to master.
double masterBalancePeriod_
The period that master do load balancing.
double systemWorkQuantity_
The workload quantity of the whole system.
int * hubRanks_
The global ranks of the hubs.
void workerReportStatus(int tag, MPI_Comm comm)
A worker report its status (workload and msg counts) to its hub.