00001
00002
00003
00004 #include "BCP_os.hpp"
00005 #include "BCP_tm.hpp"
00006 #include "BCP_tm_node.hpp"
00007 #include "BCP_tm_user.hpp"
00008 #include "BCP_tm_functions.hpp"
00009
00010 #define NUMNODES_BASED_ON_BUFSIZE
00011
00012
00013
00014 static bool BCP_tm_scan_children(BCP_tm_prob& p, BCP_tm_node* node,
00015 std::vector<BCP_tm_node*>& nodes_to_send,
00016 const long bufsize)
00017 {
00018 #ifndef NUMNODES_BASED_ON_BUFSIZE
00019 const size_t send_size = 100;
00020 #endif
00021 BCP_vec<BCP_tm_node*>& children = node->_children;
00022 for (int i = children.size() - 1; i >= 0; --i) {
00023 BCP_tm_node* s = children[i];
00024 if (s == NULL)
00025 continue;
00026 if (BCP_tm_scan_children(p, s, nodes_to_send, bufsize))
00027 return true;
00028 }
00029 if (node->_data._desc.IsValid()) {
00030 nodes_to_send.push_back(node);
00031 const bool def = p.param(BCP_tm_par::ReportWhenDefaultIsExecuted);
00032 p.msg_buf.pack(node->_index);
00033 node->_data._desc->pack(p.packer, def, p.msg_buf);
00034 bool has_user_data = node->_data._user.IsValid();
00035 p.msg_buf.pack(has_user_data);
00036 if (has_user_data) {
00037 p.packer->pack_user_data(node->_data._user.GetRawPtr(), p.msg_buf);
00038 }
00039 }
00040 #ifndef NUMNODES_BASED_ON_BUFSIZE
00041 return (nodes_to_send.size() >= send_size);
00042 #else
00043 return (p.msg_buf.size() > bufsize);
00044 #endif
00045 }
00046
00047
00048
00049 static bool BCP_tm_scan_siblings(BCP_tm_prob& p, BCP_tm_node* node,
00050 std::vector<BCP_tm_node*>& nodes_to_send,
00051 const long bufsize)
00052 {
00053 #ifndef NUMNODES_BASED_ON_BUFSIZE
00054 const size_t send_size = 100;
00055 #endif
00056 if (node->_data._desc.IsValid()) {
00057 nodes_to_send.push_back(node);
00058 const bool def = p.param(BCP_tm_par::ReportWhenDefaultIsExecuted);
00059 p.msg_buf.pack(node->_index);
00060 node->_data._desc->pack(p.packer, def, p.msg_buf);
00061 bool has_user_data = node->_data._user.IsValid();
00062 p.msg_buf.pack(has_user_data);
00063 if (has_user_data) {
00064 p.packer->pack_user_data(node->_data._user.GetRawPtr(), p.msg_buf);
00065 }
00066 #ifndef NUMNODES_BASED_ON_BUFSIZE
00067 if (nodes_to_send.size() >= send_size)
00068 #else
00069 if (p.msg_buf.size() > bufsize)
00070 #endif
00071 {
00072 return true;
00073 }
00074 }
00075 BCP_tm_node* parent = node->_parent;
00076 if (parent == NULL)
00077 return false;
00078 BCP_vec<BCP_tm_node*>& siblings = parent->_children;
00079 for (int i = siblings.size() - 1; i >= 0; --i) {
00080 BCP_tm_node* s = siblings[i];
00081 if (s == node || s == NULL)
00082 continue;
00083 if (BCP_tm_scan_children(p, s, nodes_to_send, bufsize))
00084 return true;
00085 }
00086 return BCP_tm_scan_siblings(p, parent, nodes_to_send, bufsize);
00087 }
00088
00089
00090
00098 bool BCP_tm_is_data_balanced(BCP_tm_prob& p)
00099 {
00100 const int maxheap = p.param(BCP_tm_par::MaxHeapSize);
00101 if (maxheap == -1) {
00102 return true;
00103 }
00104 #if 0
00105
00106 printf("local nodes: %i\n", BCP_tm_node::num_local_nodes);
00107 printf("remote nodes: %i\n", BCP_tm_node::num_remote_nodes);
00108 #endif
00109
00110
00111 const double usedheap = BCP_used_heap();
00112 if (usedheap == -1)
00113 return true;
00114 const double freeheap = maxheap - usedheap;
00115 printf("free: %f used: %f free/max: %f\n",
00116 freeheap, usedheap, freeheap/maxheap);
00117 return (freeheap > 1<<24 && freeheap / maxheap > 0.15);
00118 }
00119
00120
00121
00138 bool BCP_tm_balance_data(BCP_tm_prob& p)
00139 {
00140 if (! p.need_a_TS)
00141 return false;
00142
00143 int pid = -1;
00144
00145
00146 int max_space = 0;
00147 for (std::map<int,int>::const_iterator tsi = p.ts_space.begin();
00148 tsi != p.ts_space.end(); ++tsi) {
00149 if (tsi->second > max_space) {
00150 max_space = tsi->second;
00151 pid = tsi->first;
00152 }
00153 }
00154 if (max_space < 1 << 22 ) {
00155 pid = -1;
00156 }
00157
00158 BCP_buffer& buf = p.msg_buf;
00159
00160 if (pid == -1) {
00161
00162 if (p.lp_scheduler.maxNodeIds() > 1) {
00163 pid = p.lp_scheduler.request_node_id();
00164 if (pid == -1) {
00165 return true;
00166 }
00167 std::vector<int>::iterator ipid =
00168 std::find(p.lp_procs.begin(), p.lp_procs.end(), pid);
00169 if (ipid == p.lp_procs.end()) {
00170 throw BCP_fatal_error("\
00171 Trying to convert an LP into TS, but its pid is not in lp_procs!\n");
00172 }
00173 p.lp_procs.erase(ipid);
00174 p.ts_procs.push_back(pid);
00175 printf("Turning LP (#%i) into a TS\n", pid);
00176 BCP_tm_notify_process_type(p, BCP_ProcessType_TS, 1, &pid);
00177 }
00178 }
00179
00180 if (pid == -1)
00181 return true;
00182
00183
00184
00185
00186
00187
00188
00189 std::vector<BCP_tm_node*> nodes_to_send;
00190 const std::vector<CoinTreeSiblings*>& candList =
00191 p.candidate_list.getTree()->getCandidates();
00192 BCP_tm_node* node =
00193 dynamic_cast<BCP_tm_node*>(candList.back()->currentNode());
00194
00195
00196
00197
00198 int usedheap = BCP_used_heap();
00199 assert(usedheap > 0);
00200 const int maxheap = p.param(BCP_tm_par::MaxHeapSize);
00201 assert(maxheap > 0);
00202 int freeheap = maxheap - usedheap;
00203 printf("Before sending off: freeheap: %i usedheap: %i\n",
00204 freeheap, usedheap);
00205 buf.clear();
00206 BCP_tm_scan_siblings(p, node, nodes_to_send, freeheap >> 2);
00207 int num = nodes_to_send.size();
00208 if (num == 0) {
00209
00210 throw BCP_fatal_error("No memory left in TM\n");
00211 }
00212 int fake_index = -1;
00213 buf.pack(fake_index);
00214 p.msg_env->send(pid, BCP_Msg_NodeList, buf);
00215 buf.clear();
00216 int saved = 0, fm = 0;
00217 p.msg_env->receive(pid, BCP_Msg_NodeListReply, buf, -1);
00218 buf.unpack(saved);
00219 buf.unpack(fm);
00220 p.ts_space[pid] = fm;
00221 for (int i = 0; i < saved; ++i) {
00222 node = nodes_to_send[i];
00223 node->_locally_stored = false;
00224 node->_data._desc = NULL;
00225 node->_data._user = NULL;
00226 node->_data_location = pid;
00227 }
00228 nodes_to_send.clear();
00229
00230 usedheap = BCP_used_heap();
00231 freeheap = maxheap - usedheap;
00232 printf("After sending off: freeheap: %i usedheap: %i\n",
00233 freeheap, usedheap);
00234
00235 if (saved == 0) {
00236
00237
00238 throw BCP_fatal_error("TS did not accept anything\n");
00239 }
00240
00241 BCP_tm_node::num_local_nodes -= saved;
00242 BCP_tm_node::num_remote_nodes += saved;
00243
00244 #if 0
00245
00246 for (size_t k = 0; k < p.search_tree.size(); ++k) {
00247 BCP_tm_node* n = p.search_tree[k];
00248 if (n && n->_locally_stored) {
00249 assert(n->_data._desc.IsNull() && n->_data._user.IsNull());
00250 }
00251 }
00252 #endif
00253
00254 return false;
00255 }
00256