distributed_property_map.ipp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. // Copyright (C) 2004-2006 The Trustees of Indiana University.
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // Authors: Douglas Gregor
  6. // Nick Edmonds
  7. // Andrew Lumsdaine
  8. #include <boost/assert.hpp>
  9. #include <boost/property_map/parallel/distributed_property_map.hpp>
  10. #include <boost/property_map/parallel/detail/untracked_pair.hpp>
  11. #include <boost/type_traits/is_base_and_derived.hpp>
  12. #include <boost/bind.hpp>
  13. #include <boost/property_map/parallel/simple_trigger.hpp>
  14. namespace boost { namespace parallel {
  15. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  16. template<typename Reduce>
  17. PBGL_DISTRIB_PMAP
  18. ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
  19. const StorageMap& pm, const Reduce& reduce)
  20. : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
  21. {
  22. typedef handle_message<Reduce> Handler;
  23. data->ghost_cells.reset(new ghost_cells_type());
  24. data->reset = &data_t::template do_reset<Reduce>;
  25. data->process_group.replace_handler(Handler(data, reduce));
  26. data->process_group.template get_receiver<Handler>()
  27. ->setup_triggers(data->process_group);
  28. }
  29. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  30. PBGL_DISTRIB_PMAP::~distributed_property_map() { }
  31. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  32. template<typename Reduce>
  33. void
  34. PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
  35. {
  36. typedef handle_message<Reduce> Handler;
  37. data->process_group.replace_handler(Handler(data, reduce));
  38. Handler* handler = data->process_group.template get_receiver<Handler>();
  39. BOOST_ASSERT(handler);
  40. handler->setup_triggers(data->process_group);
  41. data->get_default_value = reduce;
  42. data->has_default_resolver = Reduce::non_default_resolver;
  43. int model = data->model;
  44. data->reset = &data_t::template do_reset<Reduce>;
  45. set_consistency_model(model);
  46. }
  47. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  48. void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
  49. {
  50. if (data->max_ghost_cells == 0)
  51. return;
  52. while (data->ghost_cells->size() > data->max_ghost_cells) {
  53. // Evict the last ghost cell
  54. if (data->model & cm_flush) {
  55. // We need to flush values when we evict them.
  56. boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
  57. = data->ghost_cells->back();
  58. send(data->process_group, get(data->global, victim.first).first,
  59. property_map_put, victim);
  60. }
  61. // Actually remove the ghost cell
  62. data->ghost_cells->pop_back();
  63. }
  64. }
  65. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  66. typename PBGL_DISTRIB_PMAP::value_type&
  67. PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
  68. {
  69. // Index by key
  70. ghost_cells_key_index_type const& key_index
  71. = data->ghost_cells->template get<1>();
  72. // Search for the ghost cell by key, and project back to the sequence
  73. iterator ghost_cell
  74. = data->ghost_cells->template project<0>(key_index.find(key));
  75. if (ghost_cell == data->ghost_cells->end()) {
  76. value_type value;
  77. if (data->has_default_resolver)
  78. // Since we have a default resolver, use it to create a default
  79. // value for this ghost cell.
  80. value = data->get_default_value(key);
  81. else if (request_if_missing)
  82. // Request the actual value of this key from its owner
  83. send_oob_with_reply(data->process_group, get(data->global, key).first,
  84. property_map_get, key, value);
  85. else
  86. value = value_type();
  87. // Create a ghost cell containing the new value
  88. ghost_cell
  89. = data->ghost_cells->push_front(std::make_pair(key, value)).first;
  90. // If we need to, prune the ghost cells
  91. if (data->max_ghost_cells > 0)
  92. prune_ghost_cells();
  93. } else if (data->max_ghost_cells > 0)
  94. // Put this cell at the beginning of the MRU list
  95. data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
  96. return const_cast<value_type&>(ghost_cell->second);
  97. }
  98. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  99. template<typename Reduce>
  100. void
  101. PBGL_DISTRIB_PMAP
  102. ::handle_message<Reduce>::operator()(process_id_type source, int tag)
  103. {
  104. BOOST_ASSERT(false);
  105. }
  106. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  107. template<typename Reduce>
  108. void
  109. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  110. handle_put(int /*source*/, int /*tag*/,
  111. const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
  112. {
  113. using boost::get;
  114. shared_ptr<data_t> data(data_ptr);
  115. owner_local_pair p = get(data->global, req.first);
  116. BOOST_ASSERT(p.first == process_id(data->process_group));
  117. detail::maybe_put(data->storage, p.second,
  118. reduce(req.first,
  119. get(data->storage, p.second),
  120. req.second));
  121. }
  122. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  123. template<typename Reduce>
  124. typename PBGL_DISTRIB_PMAP::value_type
  125. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  126. handle_get(int source, int /*tag*/, const key_type& key,
  127. trigger_receive_context)
  128. {
  129. using boost::get;
  130. shared_ptr<data_t> data(data_ptr);
  131. BOOST_ASSERT(data);
  132. owner_local_pair p = get(data->global, key);
  133. return get(data->storage, p.second);
  134. }
  135. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  136. template<typename Reduce>
  137. void
  138. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  139. handle_multiget(int source, int tag, const std::vector<key_type>& keys,
  140. trigger_receive_context)
  141. {
  142. shared_ptr<data_t> data(data_ptr);
  143. BOOST_ASSERT(data);
  144. typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
  145. std::vector<key_value> results;
  146. std::size_t n = keys.size();
  147. results.reserve(n);
  148. using boost::get;
  149. for (std::size_t i = 0; i < n; ++i) {
  150. local_key_type local_key = get(data->global, keys[i]).second;
  151. results.push_back(key_value(keys[i], get(data->storage, local_key)));
  152. }
  153. send(data->process_group, source, property_map_multiget_reply, results);
  154. }
  155. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  156. template<typename Reduce>
  157. void
  158. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  159. handle_multiget_reply
  160. (int source, int tag,
  161. const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
  162. trigger_receive_context)
  163. {
  164. shared_ptr<data_t> data(data_ptr);
  165. BOOST_ASSERT(data);
  166. // Index by key
  167. ghost_cells_key_index_type const& key_index
  168. = data->ghost_cells->template get<1>();
  169. std::size_t n = msg.size();
  170. for (std::size_t i = 0; i < n; ++i) {
  171. // Search for the ghost cell by key, and project back to the sequence
  172. iterator position
  173. = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
  174. if (position != data->ghost_cells->end())
  175. const_cast<value_type&>(position->second) = msg[i].second;
  176. }
  177. }
  178. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  179. template<typename Reduce>
  180. void
  181. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  182. handle_multiput
  183. (int source, int tag,
  184. const std::vector<unsafe_pair<local_key_type, value_type> >& values,
  185. trigger_receive_context)
  186. {
  187. using boost::get;
  188. shared_ptr<data_t> data(data_ptr);
  189. BOOST_ASSERT(data);
  190. std::size_t n = values.size();
  191. for (std::size_t i = 0; i < n; ++i) {
  192. local_key_type local_key = values[i].first;
  193. value_type local_value = get(data->storage, local_key);
  194. detail::maybe_put(data->storage, values[i].first,
  195. reduce(values[i].first,
  196. local_value,
  197. values[i].second));
  198. }
  199. }
  200. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  201. template<typename Reduce>
  202. void
  203. PBGL_DISTRIB_PMAP::handle_message<Reduce>::
  204. setup_triggers(process_group_type& pg)
  205. {
  206. using boost::parallel::simple_trigger;
  207. simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
  208. simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
  209. simple_trigger(pg, property_map_multiget, this,
  210. &handle_message::handle_multiget);
  211. simple_trigger(pg, property_map_multiget_reply, this,
  212. &handle_message::handle_multiget_reply);
  213. simple_trigger(pg, property_map_multiput, this,
  214. &handle_message::handle_multiput);
  215. }
  216. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  217. void
  218. PBGL_DISTRIB_PMAP
  219. ::on_synchronize::operator()()
  220. {
  221. int stage=0; // we only get called at the start now
  222. shared_ptr<data_t> data(data_ptr);
  223. BOOST_ASSERT(data);
  224. // Determine in which stage backward consistency messages should be sent.
  225. int backward_stage = -1;
  226. if (data->model & cm_backward) {
  227. if (data->model & cm_flush) backward_stage = 1;
  228. else backward_stage = 0;
  229. }
  230. // Flush results in first stage
  231. if (stage == 0 && data->model & cm_flush)
  232. data->flush();
  233. // Backward consistency
  234. if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
  235. data->refresh_ghost_cells();
  236. // Optionally clear results
  237. if (data->model & cm_clear)
  238. data->clear();
  239. // Optionally reset results
  240. if (data->model & cm_reset) {
  241. if (data->reset) ((*data).*data->reset)();
  242. }
  243. }
  244. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  245. void
  246. PBGL_DISTRIB_PMAP::set_consistency_model(int model)
  247. {
  248. data->model = model;
  249. bool need_on_synchronize = (model != cm_forward);
  250. // Backward consistency is a two-stage process.
  251. if (model & cm_backward) {
  252. // For backward consistency to work, we absolutely cannot throw
  253. // away any ghost cells.
  254. data->max_ghost_cells = 0;
  255. }
  256. // attach the on_synchronize handler.
  257. if (need_on_synchronize)
  258. data->process_group.replace_on_synchronize_handler(on_synchronize(data));
  259. }
  260. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  261. void
  262. PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
  263. {
  264. if ((data->model & cm_backward) && max_ghost_cells > 0)
  265. boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
  266. "cannot limit ghost-cell usage with a backward "
  267. "consistency model"));
  268. if (max_ghost_cells == 1)
  269. // It is not safe to have only 1 ghost cell; the cell() method
  270. // will fail.
  271. max_ghost_cells = 2;
  272. data->max_ghost_cells = max_ghost_cells;
  273. prune_ghost_cells();
  274. }
  275. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  276. void PBGL_DISTRIB_PMAP::clear()
  277. {
  278. data->clear();
  279. }
  280. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  281. void PBGL_DISTRIB_PMAP::data_t::clear()
  282. {
  283. ghost_cells->clear();
  284. }
  285. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  286. void PBGL_DISTRIB_PMAP::reset()
  287. {
  288. if (data->reset) ((*data).*data->reset)();
  289. }
  290. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  291. void PBGL_DISTRIB_PMAP::flush()
  292. {
  293. data->flush();
  294. }
  295. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  296. void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
  297. {
  298. using boost::get;
  299. std::vector<std::vector<key_type> > keys;
  300. keys.resize(num_processes(process_group));
  301. // Collect the set of keys for which we will request values
  302. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
  303. keys[get(global, i->first).first].push_back(i->first);
  304. // Send multiget requests to each of the other processors
  305. typedef typename ProcessGroup::process_size_type process_size_type;
  306. process_size_type n = num_processes(process_group);
  307. process_id_type id = process_id(process_group);
  308. for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
  309. if (!keys[p].empty())
  310. send(process_group, p, property_map_multiget, keys[p]);
  311. }
  312. }
  313. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  314. void PBGL_DISTRIB_PMAP::data_t::flush()
  315. {
  316. using boost::get;
  317. int n = num_processes(process_group);
  318. std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
  319. values.resize(n);
  320. // Collect all of the flushed values
  321. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
  322. std::pair<int, local_key_type> g = get(global, i->first);
  323. values[g.first].push_back(std::make_pair(g.second, i->second));
  324. }
  325. // Transmit flushed values
  326. for (int p = 0; p < n; ++p) {
  327. if (!values[p].empty())
  328. send(process_group, p, property_map_multiput, values[p]);
  329. }
  330. }
  331. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  332. void PBGL_DISTRIB_PMAP::do_synchronize()
  333. {
  334. if (data->model & cm_backward) {
  335. synchronize(data->process_group);
  336. return;
  337. }
  338. // Request refreshes of the values of our ghost cells
  339. data->refresh_ghost_cells();
  340. // Allows all of the multigets to get to their destinations
  341. synchronize(data->process_group);
  342. // Allows all of the multiget responses to get to their destinations
  343. synchronize(data->process_group);
  344. }
  345. template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
  346. template<typename Resolver>
  347. void PBGL_DISTRIB_PMAP::data_t::do_reset()
  348. {
  349. Resolver* resolver = get_default_value.template target<Resolver>();
  350. BOOST_ASSERT(resolver);
  351. for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
  352. const_cast<value_type&>(i->second) = (*resolver)(i->first);
  353. }
  354. } } // end namespace boost::parallel