remote_update_set.hpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Copyright (C) 2005-2006 The Trustees of Indiana University.
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // Authors: Douglas Gregor
  6. // Andrew Lumsdaine
  7. #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
  8. #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
  9. #ifndef BOOST_GRAPH_USE_MPI
  10. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  11. #endif
  12. #include <boost/graph/parallel/process_group.hpp>
  13. #include <boost/type_traits/is_convertible.hpp>
  14. #include <vector>
  15. #include <boost/assert.hpp>
  16. #include <boost/optional.hpp>
  17. #include <queue>
  18. namespace boost { namespace graph { namespace detail {
  19. template<typename ProcessGroup>
  20. void do_synchronize(ProcessGroup& pg)
  21. {
  22. using boost::parallel::synchronize;
  23. synchronize(pg);
  24. }
  25. struct remote_set_queued {};
  26. struct remote_set_immediate {};
  27. template<typename ProcessGroup>
  28. class remote_set_semantics
  29. {
  30. BOOST_STATIC_CONSTANT
  31. (bool,
  32. queued = (is_convertible<
  33. typename ProcessGroup::communication_category,
  34. boost::parallel::bsp_process_group_tag>::value));
  35. public:
  36. typedef typename mpl::if_c<queued,
  37. remote_set_queued,
  38. remote_set_immediate>::type type;
  39. };
  40. template<typename Derived, typename ProcessGroup, typename Value,
  41. typename OwnerMap,
  42. typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
  43. class remote_update_set;
  44. /**********************************************************************
  45. * Remote updating set that queues messages until synchronization *
  46. **********************************************************************/
  47. template<typename Derived, typename ProcessGroup, typename Value,
  48. typename OwnerMap>
  49. class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
  50. remote_set_queued>
  51. {
  52. typedef typename property_traits<OwnerMap>::key_type Key;
  53. typedef std::vector<std::pair<Key, Value> > Updates;
  54. typedef typename Updates::size_type updates_size_type;
  55. typedef typename Updates::value_type updates_pair_type;
  56. public:
  57. private:
  58. typedef typename ProcessGroup::process_id_type process_id_type;
  59. enum message_kind {
  60. /** Message containing the number of updates that will be sent in
  61. * a msg_updates message that will immediately follow. This
  62. * message will contain a single value of type
  63. * updates_size_type.
  64. */
  65. msg_num_updates,
  66. /** Contains (key, value) pairs with all of the updates from a
  67. * particular source. The number of updates is variable, but will
  68. * be provided in a msg_num_updates message that immediately
  69. * preceeds this message.
  70. *
  71. */
  72. msg_updates
  73. };
  74. struct handle_messages
  75. {
  76. explicit
  77. handle_messages(remote_update_set* self, const ProcessGroup& pg)
  78. : self(self), update_sizes(num_processes(pg), 0) { }
  79. void operator()(process_id_type source, int tag)
  80. {
  81. switch(tag) {
  82. case msg_num_updates:
  83. {
  84. // Receive the # of updates
  85. updates_size_type num_updates;
  86. receive(self->process_group, source, tag, num_updates);
  87. update_sizes[source] = num_updates;
  88. }
  89. break;
  90. case msg_updates:
  91. {
  92. updates_size_type num_updates = update_sizes[source];
  93. BOOST_ASSERT(num_updates);
  94. // Receive the actual updates
  95. std::vector<updates_pair_type> updates(num_updates);
  96. receive(self->process_group, source, msg_updates, &updates[0],
  97. num_updates);
  98. // Send updates to derived "receive_update" member
  99. Derived* derived = static_cast<Derived*>(self);
  100. for (updates_size_type u = 0; u < num_updates; ++u)
  101. derived->receive_update(source, updates[u].first, updates[u].second);
  102. update_sizes[source] = 0;
  103. }
  104. break;
  105. };
  106. }
  107. private:
  108. remote_update_set* self;
  109. std::vector<updates_size_type> update_sizes;
  110. };
  111. friend struct handle_messages;
  112. protected:
  113. remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
  114. : process_group(pg, handle_messages(this, pg)),
  115. updates(num_processes(pg)), owner(owner) {
  116. }
  117. void update(const Key& key, const Value& value)
  118. {
  119. if (get(owner, key) == process_id(process_group)) {
  120. Derived* derived = static_cast<Derived*>(this);
  121. derived->receive_update(get(owner, key), key, value);
  122. }
  123. else {
  124. updates[get(owner, key)].push_back(std::make_pair(key, value));
  125. }
  126. }
  127. void collect() { }
  128. void synchronize()
  129. {
  130. // Emit all updates and then remove them
  131. process_id_type num_processes = updates.size();
  132. for (process_id_type p = 0; p < num_processes; ++p) {
  133. if (!updates[p].empty()) {
  134. send(process_group, p, msg_num_updates, updates[p].size());
  135. send(process_group, p, msg_updates,
  136. &updates[p].front(), updates[p].size());
  137. updates[p].clear();
  138. }
  139. }
  140. do_synchronize(process_group);
  141. }
  142. ProcessGroup process_group;
  143. private:
  144. std::vector<Updates> updates;
  145. OwnerMap owner;
  146. };
  147. /**********************************************************************
  148. * Remote updating set that sends messages immediately *
  149. **********************************************************************/
  150. template<typename Derived, typename ProcessGroup, typename Value,
  151. typename OwnerMap>
  152. class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
  153. remote_set_immediate>
  154. {
  155. typedef typename property_traits<OwnerMap>::key_type Key;
  156. typedef std::pair<Key, Value> update_pair_type;
  157. typedef typename std::vector<update_pair_type>::size_type updates_size_type;
  158. public:
  159. typedef typename ProcessGroup::process_id_type process_id_type;
  160. private:
  161. enum message_kind {
  162. /** Contains a (key, value) pair that will be updated. */
  163. msg_update
  164. };
  165. struct handle_messages
  166. {
  167. explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
  168. : self(self)
  169. { update_sizes.resize(num_processes(pg), 0); }
  170. void operator()(process_id_type source, int tag)
  171. {
  172. // Receive the # of updates
  173. BOOST_ASSERT(tag == msg_update);
  174. update_pair_type update;
  175. receive(self->process_group, source, tag, update);
  176. // Send update to derived "receive_update" member
  177. Derived* derived = static_cast<Derived*>(self);
  178. derived->receive_update(source, update.first, update.second);
  179. }
  180. private:
  181. std::vector<updates_size_type> update_sizes;
  182. remote_update_set* self;
  183. };
  184. friend struct handle_messages;
  185. protected:
  186. remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
  187. : process_group(pg, handle_messages(this, pg)), owner(owner) { }
  188. void update(const Key& key, const Value& value)
  189. {
  190. if (get(owner, key) == process_id(process_group)) {
  191. Derived* derived = static_cast<Derived*>(this);
  192. derived->receive_update(get(owner, key), key, value);
  193. }
  194. else
  195. send(process_group, get(owner, key), msg_update,
  196. update_pair_type(key, value));
  197. }
  198. void collect()
  199. {
  200. typedef std::pair<process_id_type, int> probe_type;
  201. handle_messages handler(this, process_group);
  202. while (optional<probe_type> stp = probe(process_group))
  203. if (stp->second == msg_update) handler(stp->first, stp->second);
  204. }
  205. void synchronize()
  206. {
  207. do_synchronize(process_group);
  208. }
  209. ProcessGroup process_group;
  210. OwnerMap owner;
  211. };
  212. } } } // end namespace boost::graph::detail
  213. #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP