mpi_process_group.ipp 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. // -*- C++ -*-
  2. // Copyright (C) 2004-2008 The Trustees of Indiana University.
  3. // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
  4. // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
  5. // Use, modification and distribution is subject to the Boost Software
  6. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  7. // http://www.boost.org/LICENSE_1_0.txt)
  8. // Authors: Douglas Gregor
  9. // Andrew Lumsdaine
  10. // Matthias Troyer
  11. //#define PBGL_PROCESS_GROUP_DEBUG
  12. #ifndef BOOST_GRAPH_USE_MPI
  13. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  14. #endif
  15. #include <boost/assert.hpp>
  16. #include <algorithm>
  17. #include <boost/graph/parallel/detail/untracked_pair.hpp>
  18. #include <numeric>
  19. #include <iterator>
  20. #include <functional>
  21. #include <vector>
  22. #include <queue>
  23. #include <stack>
  24. #include <list>
  25. #include <boost/graph/distributed/detail/tag_allocator.hpp>
  26. #include <stdio.h>
  27. // #define PBGL_PROCESS_GROUP_DEBUG
  28. #ifdef PBGL_PROCESS_GROUP_DEBUG
  29. # include <iostream>
  30. #endif
  31. namespace boost { namespace graph { namespace distributed {
  32. struct mpi_process_group::impl
  33. {
  34. typedef mpi_process_group::message_header message_header;
  35. typedef mpi_process_group::outgoing_messages outgoing_messages;
  36. /**
  37. * Stores the incoming messages from a particular processor.
  38. *
  39. * @todo Evaluate whether we should use a deque instance, which
  40. * would reduce could reduce the cost of "receiving" messages and
  41. allow us to deallocate memory earlier, but increases the time
  42. spent in the synchronization step.
  43. */
  44. struct incoming_messages {
  45. incoming_messages();
  46. ~incoming_messages() {}
  47. std::vector<message_header> headers;
  48. buffer_type buffer;
  49. std::vector<std::vector<message_header>::iterator> next_header;
  50. };
  51. struct batch_request {
  52. MPI_Request request;
  53. buffer_type buffer;
  54. };
  55. // send once we have a certain number of messages or bytes in the buffer
  56. // these numbers need to be tuned, we keep them small at first for testing
  57. std::size_t batch_header_number;
  58. std::size_t batch_buffer_size;
  59. std::size_t batch_message_size;
  60. /**
  61. * The actual MPI communicator used to transmit data.
  62. */
  63. boost::mpi::communicator comm;
  64. /**
  65. * The MPI communicator used to transmit out-of-band replies.
  66. */
  67. boost::mpi::communicator oob_reply_comm;
  68. /// Outgoing message information, indexed by destination processor.
  69. std::vector<outgoing_messages> outgoing;
  70. /// Incoming message information, indexed by source processor.
  71. std::vector<incoming_messages> incoming;
  72. /// The numbers of processors that have entered a synchronization stage
  73. std::vector<int> processors_synchronizing_stage;
  74. /// The synchronization stage of a processor
  75. std::vector<int> synchronizing_stage;
  76. /// Number of processors still sending messages
  77. std::vector<int> synchronizing_unfinished;
  78. /// Number of batches sent since last synchronization stage
  79. std::vector<int> number_sent_batches;
  80. /// Number of batches received minus number of expected batches
  81. std::vector<int> number_received_batches;
  82. /// The context of the currently-executing trigger, or @c trc_none
  83. /// if no trigger is executing.
  84. trigger_receive_context trigger_context;
  85. /// Non-zero indicates that we're processing batches
  86. /// Increment this when processing patches,
  87. /// decrement it when you're done.
  88. int processing_batches;
  89. /**
  90. * Contains all of the active blocks corresponding to attached
  91. * distributed data structures.
  92. */
  93. blocks_type blocks;
  94. /// Whether we are currently synchronizing
  95. bool synchronizing;
  96. /// The MPI requests for posted sends of oob messages
  97. std::vector<MPI_Request> requests;
  98. /// The MPI buffers for posted irecvs of oob messages
  99. std::map<int,buffer_type> buffers;
  100. /// Queue for message batches received while already processing messages
  101. std::queue<std::pair<int,outgoing_messages> > new_batches;
  102. /// Maximum encountered size of the new_batches queue
  103. std::size_t max_received;
  104. /// The MPI requests and buffers for batchess being sent
  105. std::list<batch_request> sent_batches;
  106. /// Maximum encountered size of the sent_batches list
  107. std::size_t max_sent;
  108. /// Pre-allocated requests in a pool
  109. std::vector<batch_request> batch_pool;
  110. /// A stack controlling which batches are available
  111. std::stack<std::size_t> free_batches;
  112. void free_sent_batches();
  113. // Tag allocator
  114. detail::tag_allocator allocated_tags;
  115. impl(std::size_t num_headers, std::size_t buffers_size,
  116. communicator_type parent_comm);
  117. ~impl();
  118. private:
  119. void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
  120. };
  121. inline trigger_receive_context mpi_process_group::trigger_context() const
  122. {
  123. return impl_->trigger_context;
  124. }
  125. template<typename T>
  126. void
  127. mpi_process_group::send_impl(int dest, int tag, const T& value,
  128. mpl::true_ /*is_mpi_datatype*/) const
  129. {
  130. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  131. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  132. // Start constructing the message header
  133. impl::message_header header;
  134. header.source = process_id(*this);
  135. header.tag = tag;
  136. header.offset = outgoing.buffer.size();
  137. boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
  138. oa << value;
  139. #ifdef PBGL_PROCESS_GROUP_DEBUG
  140. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  141. << tag << ", bytes = " << packed_size << std::endl;
  142. #endif
  143. // Store the header
  144. header.bytes = outgoing.buffer.size() - header.offset;
  145. outgoing.headers.push_back(header);
  146. maybe_send_batch(dest);
  147. }
  148. template<typename T>
  149. void
  150. mpi_process_group::send_impl(int dest, int tag, const T& value,
  151. mpl::false_ /*is_mpi_datatype*/) const
  152. {
  153. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  154. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  155. // Start constructing the message header
  156. impl::message_header header;
  157. header.source = process_id(*this);
  158. header.tag = tag;
  159. header.offset = outgoing.buffer.size();
  160. // Serialize into the buffer
  161. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  162. out << value;
  163. // Store the header
  164. header.bytes = outgoing.buffer.size() - header.offset;
  165. outgoing.headers.push_back(header);
  166. maybe_send_batch(dest);
  167. #ifdef PBGL_PROCESS_GROUP_DEBUG
  168. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  169. << tag << ", bytes = " << header.bytes << std::endl;
  170. #endif
  171. }
  172. template<typename T>
  173. inline void
  174. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  175. int tag, const T& value)
  176. {
  177. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
  178. boost::mpi::is_mpi_datatype<T>());
  179. }
  180. template<typename T>
  181. typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  182. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  183. int tag, const T values[], std::size_t n)
  184. {
  185. pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  186. boost::serialization::make_array(values,n),
  187. boost::mpl::true_());
  188. }
  189. template<typename T>
  190. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  191. mpi_process_group::
  192. array_send_impl(int dest, int tag, const T values[], std::size_t n) const
  193. {
  194. BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
  195. impl::outgoing_messages& outgoing = impl_->outgoing[dest];
  196. // Start constructing the message header
  197. impl::message_header header;
  198. header.source = process_id(*this);
  199. header.tag = tag;
  200. header.offset = outgoing.buffer.size();
  201. // Serialize into the buffer
  202. boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
  203. out << n;
  204. for (std::size_t i = 0; i < n; ++i)
  205. out << values[i];
  206. // Store the header
  207. header.bytes = outgoing.buffer.size() - header.offset;
  208. outgoing.headers.push_back(header);
  209. maybe_send_batch(dest);
  210. #ifdef PBGL_PROCESS_GROUP_DEBUG
  211. std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
  212. << tag << ", bytes = " << header.bytes << std::endl;
  213. #endif
  214. }
  215. template<typename T>
  216. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  217. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  218. int tag, const T values[], std::size_t n)
  219. {
  220. pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
  221. values, n);
  222. }
  223. template<typename InputIterator>
  224. void
  225. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  226. int tag, InputIterator first, InputIterator last)
  227. {
  228. typedef typename std::iterator_traits<InputIterator>::value_type value_type;
  229. std::vector<value_type> values(first, last);
  230. if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
  231. else send(pg, dest, tag, &values[0], values.size());
  232. }
  233. template<typename T>
  234. bool
  235. mpi_process_group::receive_impl(int source, int tag, T& value,
  236. mpl::true_ /*is_mpi_datatype*/) const
  237. {
  238. #ifdef PBGL_PROCESS_GROUP_DEBUG
  239. std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
  240. << tag << std::endl;
  241. #endif
  242. impl::incoming_messages& incoming = impl_->incoming[source];
  243. // Find the next header with the right tag
  244. std::vector<impl::message_header>::iterator header =
  245. incoming.next_header[my_block_number()];
  246. while (header != incoming.headers.end() && header->tag != tag) ++header;
  247. // If no header is found, notify the caller
  248. if (header == incoming.headers.end()) return false;
  249. // Unpack the data
  250. if (header->bytes > 0) {
  251. boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
  252. archive::no_header, header->offset);
  253. ia >> value;
  254. }
  255. // Mark this message as received
  256. header->tag = -1;
  257. // Move the "next header" indicator to the next unreceived message
  258. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  259. && incoming.next_header[my_block_number()]->tag == -1)
  260. ++incoming.next_header[my_block_number()];
  261. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  262. bool finished = true;
  263. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  264. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  265. }
  266. if (finished) {
  267. std::vector<impl::message_header> no_headers;
  268. incoming.headers.swap(no_headers);
  269. buffer_type empty_buffer;
  270. incoming.buffer.swap(empty_buffer);
  271. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  272. incoming.next_header[i] = incoming.headers.end();
  273. }
  274. }
  275. return true;
  276. }
  277. template<typename T>
  278. bool
  279. mpi_process_group::receive_impl(int source, int tag, T& value,
  280. mpl::false_ /*is_mpi_datatype*/) const
  281. {
  282. impl::incoming_messages& incoming = impl_->incoming[source];
  283. // Find the next header with the right tag
  284. std::vector<impl::message_header>::iterator header =
  285. incoming.next_header[my_block_number()];
  286. while (header != incoming.headers.end() && header->tag != tag) ++header;
  287. // If no header is found, notify the caller
  288. if (header == incoming.headers.end()) return false;
  289. // Deserialize the data
  290. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  291. archive::no_header, header->offset);
  292. in >> value;
  293. // Mark this message as received
  294. header->tag = -1;
  295. // Move the "next header" indicator to the next unreceived message
  296. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  297. && incoming.next_header[my_block_number()]->tag == -1)
  298. ++incoming.next_header[my_block_number()];
  299. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  300. bool finished = true;
  301. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  302. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  303. }
  304. if (finished) {
  305. std::vector<impl::message_header> no_headers;
  306. incoming.headers.swap(no_headers);
  307. buffer_type empty_buffer;
  308. incoming.buffer.swap(empty_buffer);
  309. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  310. incoming.next_header[i] = incoming.headers.end();
  311. }
  312. }
  313. return true;
  314. }
  315. template<typename T>
  316. typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
  317. mpi_process_group::
  318. array_receive_impl(int source, int tag, T* values, std::size_t& n) const
  319. {
  320. impl::incoming_messages& incoming = impl_->incoming[source];
  321. // Find the next header with the right tag
  322. std::vector<impl::message_header>::iterator header =
  323. incoming.next_header[my_block_number()];
  324. while (header != incoming.headers.end() && header->tag != tag) ++header;
  325. // If no header is found, notify the caller
  326. if (header == incoming.headers.end()) return false;
  327. // Deserialize the data
  328. boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
  329. archive::no_header, header->offset);
  330. std::size_t num_sent;
  331. in >> num_sent;
  332. if (num_sent > n)
  333. std::cerr << "ERROR: Have " << num_sent << " items but only space for "
  334. << n << " items\n";
  335. for (std::size_t i = 0; i < num_sent; ++i)
  336. in >> values[i];
  337. n = num_sent;
  338. // Mark this message as received
  339. header->tag = -1;
  340. // Move the "next header" indicator to the next unreceived message
  341. while (incoming.next_header[my_block_number()] != incoming.headers.end()
  342. && incoming.next_header[my_block_number()]->tag == -1)
  343. ++incoming.next_header[my_block_number()];
  344. if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
  345. bool finished = true;
  346. for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
  347. if (incoming.next_header[i] != incoming.headers.end()) finished = false;
  348. }
  349. if (finished) {
  350. std::vector<impl::message_header> no_headers;
  351. incoming.headers.swap(no_headers);
  352. buffer_type empty_buffer;
  353. incoming.buffer.swap(empty_buffer);
  354. for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
  355. incoming.next_header[i] = incoming.headers.end();
  356. }
  357. }
  358. return true;
  359. }
  360. // Construct triggers
  361. template<typename Type, typename Handler>
  362. void mpi_process_group::trigger(int tag, const Handler& handler)
  363. {
  364. BOOST_ASSERT(block_num);
  365. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  366. new trigger_launcher<Type, Handler>(*this, tag, handler)));
  367. }
  368. template<typename Type, typename Handler>
  369. void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
  370. {
  371. BOOST_ASSERT(block_num);
  372. install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
  373. new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
  374. }
  375. template<typename Type, typename Handler>
  376. void mpi_process_group::global_trigger(int tag, const Handler& handler,
  377. std::size_t sz)
  378. {
  379. if (sz==0) // normal trigger
  380. install_trigger(tag,0,shared_ptr<trigger_base>(
  381. new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
  382. else // trigger with irecv
  383. install_trigger(tag,0,shared_ptr<trigger_base>(
  384. new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
  385. }
  386. namespace detail {
  387. template<typename Type>
  388. void do_oob_receive(mpi_process_group const& self,
  389. int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
  390. {
  391. using boost::mpi::get_mpi_datatype;
  392. //self.impl_->comm.recv(source,tag,data);
  393. MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
  394. MPI_STATUS_IGNORE);
  395. }
  396. template<typename Type>
  397. void do_oob_receive(mpi_process_group const& self,
  398. int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
  399. {
  400. // self.impl_->comm.recv(source,tag,data);
  401. // Receive the size of the data packet
  402. boost::mpi::status status;
  403. status = self.impl_->comm.probe(source, tag);
  404. #if BOOST_VERSION >= 103600
  405. int size = status.count<boost::mpi::packed>().get();
  406. #else
  407. int size;
  408. MPI_Status& mpi_status = status;
  409. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  410. #endif
  411. // Receive the data packed itself
  412. boost::mpi::packed_iarchive in(self.impl_->comm);
  413. in.resize(size);
  414. MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
  415. MPI_STATUS_IGNORE);
  416. // Deserialize the data
  417. in >> data;
  418. }
  419. template<typename Type>
  420. void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
  421. {
  422. do_oob_receive(self, source, tag, data,
  423. boost::mpi::is_mpi_datatype<Type>());
  424. }
  425. } // namespace detail
  426. template<typename Type, typename Handler>
  427. void
  428. mpi_process_group::trigger_launcher<Type, Handler>::
  429. receive(mpi_process_group const&, int source, int tag,
  430. trigger_receive_context context, int block) const
  431. {
  432. #ifdef PBGL_PROCESS_GROUP_DEBUG
  433. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  434. << " receive from source " << source << " and tag " << tag
  435. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  436. #endif
  437. Type data;
  438. if (context == trc_out_of_band) {
  439. // Receive the message directly off the wire
  440. int realtag = self.encode_tag(
  441. block == -1 ? self.my_block_number() : block, tag);
  442. detail::do_oob_receive(self,source,realtag,data);
  443. }
  444. else
  445. // Receive the message out of the local buffer
  446. boost::graph::distributed::receive(self, source, tag, data);
  447. // Pass the message off to the handler
  448. handler(source, tag, data, context);
  449. }
  450. template<typename Type, typename Handler>
  451. void
  452. mpi_process_group::reply_trigger_launcher<Type, Handler>::
  453. receive(mpi_process_group const&, int source, int tag,
  454. trigger_receive_context context, int block) const
  455. {
  456. #ifdef PBGL_PROCESS_GROUP_DEBUG
  457. std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
  458. << " receive from source " << source << " and tag " << tag
  459. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  460. #endif
  461. BOOST_ASSERT(context == trc_out_of_band);
  462. boost::parallel::detail::untracked_pair<int, Type> data;
  463. // Receive the message directly off the wire
  464. int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
  465. tag);
  466. detail::do_oob_receive(self, source, realtag, data);
  467. // Pass the message off to the handler and send the result back to
  468. // the source.
  469. send_oob(self, source, data.first,
  470. handler(source, tag, data.second, context), -2);
  471. }
  472. template<typename Type, typename Handler>
  473. void
  474. mpi_process_group::global_trigger_launcher<Type, Handler>::
  475. receive(mpi_process_group const& self, int source, int tag,
  476. trigger_receive_context context, int block) const
  477. {
  478. #ifdef PBGL_PROCESS_GROUP_DEBUG
  479. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  480. << " receive from source " << source << " and tag " << tag
  481. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  482. #endif
  483. Type data;
  484. if (context == trc_out_of_band) {
  485. // Receive the message directly off the wire
  486. int realtag = self.encode_tag(
  487. block == -1 ? self.my_block_number() : block, tag);
  488. detail::do_oob_receive(self,source,realtag,data);
  489. }
  490. else
  491. // Receive the message out of the local buffer
  492. boost::graph::distributed::receive(self, source, tag, data);
  493. // Pass the message off to the handler
  494. handler(self, source, tag, data, context);
  495. }
  496. template<typename Type, typename Handler>
  497. void
  498. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  499. receive(mpi_process_group const& self, int source, int tag,
  500. trigger_receive_context context, int block) const
  501. {
  502. #ifdef PBGL_PROCESS_GROUP_DEBUG
  503. std::cerr << (out_of_band? "OOB trigger" : "Trigger")
  504. << " receive from source " << source << " and tag " << tag
  505. << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
  506. #endif
  507. Type data;
  508. if (context == trc_out_of_band) {
  509. return;
  510. }
  511. BOOST_ASSERT (context == trc_irecv_out_of_band);
  512. // force posting of new MPI_Irecv, even though buffer is already allocated
  513. boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
  514. ia >> data;
  515. // Start a new receive
  516. prepare_receive(self,tag,true);
  517. // Pass the message off to the handler
  518. handler(self, source, tag, data, context);
  519. }
  520. template<typename Type, typename Handler>
  521. void
  522. mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
  523. prepare_receive(mpi_process_group const& self, int tag, bool force) const
  524. {
  525. #ifdef PBGL_PROCESS_GROUP_DEBUG
  526. std::cerr << ("Posting Irecv for trigger")
  527. << " receive with tag " << tag << std::endl;
  528. #endif
  529. if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
  530. self.impl_->buffers[tag].resize(buffer_size);
  531. force = true;
  532. }
  533. BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
  534. //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
  535. if (force) {
  536. self.impl_->requests.push_back(MPI_Request());
  537. MPI_Request* request = &self.impl_->requests.back();
  538. MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
  539. MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
  540. }
  541. }
  542. template<typename T>
  543. inline mpi_process_group::process_id_type
  544. receive(const mpi_process_group& pg, int tag, T& value)
  545. {
  546. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  547. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  548. value, boost::mpi::is_mpi_datatype<T>()))
  549. return source;
  550. }
  551. BOOST_ASSERT (false);
  552. }
  553. template<typename T>
  554. typename
  555. enable_if<boost::mpi::is_mpi_datatype<T>,
  556. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  557. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  558. {
  559. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  560. bool result =
  561. pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  562. boost::serialization::make_array(values,n),
  563. boost::mpl::true_());
  564. if (result)
  565. return std::make_pair(source, n);
  566. }
  567. BOOST_ASSERT(false);
  568. }
  569. template<typename T>
  570. typename
  571. disable_if<boost::mpi::is_mpi_datatype<T>,
  572. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  573. receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
  574. {
  575. for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
  576. if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  577. values, n))
  578. return std::make_pair(source, n);
  579. }
  580. BOOST_ASSERT(false);
  581. }
  582. template<typename T>
  583. mpi_process_group::process_id_type
  584. receive(const mpi_process_group& pg,
  585. mpi_process_group::process_id_type source, int tag, T& value)
  586. {
  587. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  588. value, boost::mpi::is_mpi_datatype<T>()))
  589. return source;
  590. else {
  591. fprintf(stderr,
  592. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  593. process_id(pg), source, tag, pg.my_block_number());
  594. BOOST_ASSERT(false);
  595. abort();
  596. }
  597. }
  598. template<typename T>
  599. typename
  600. enable_if<boost::mpi::is_mpi_datatype<T>,
  601. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  602. receive(const mpi_process_group& pg, int source, int tag, T values[],
  603. std::size_t n)
  604. {
  605. if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  606. boost::serialization::make_array(values,n),
  607. boost::mpl::true_()))
  608. return std::make_pair(source,n);
  609. else {
  610. fprintf(stderr,
  611. "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
  612. process_id(pg), source, tag, pg.my_block_number());
  613. BOOST_ASSERT(false);
  614. abort();
  615. }
  616. }
  617. template<typename T>
  618. typename
  619. disable_if<boost::mpi::is_mpi_datatype<T>,
  620. std::pair<mpi_process_group::process_id_type, std::size_t> >::type
  621. receive(const mpi_process_group& pg, int source, int tag, T values[],
  622. std::size_t n)
  623. {
  624. pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
  625. values, n);
  626. return std::make_pair(source, n);
  627. }
  628. template<typename T, typename BinaryOperation>
  629. T*
  630. all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
  631. BinaryOperation bin_op)
  632. {
  633. synchronize(pg);
  634. bool inplace = first == out;
  635. if (inplace) out = new T [last-first];
  636. boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
  637. boost::mpi::comm_attach),
  638. first, last-first, out, bin_op);
  639. if (inplace) {
  640. std::copy(out, out + (last-first), first);
  641. delete [] out;
  642. return last;
  643. }
  644. return out;
  645. }
  646. template<typename T>
  647. void
  648. broadcast(const mpi_process_group& pg, T& val,
  649. mpi_process_group::process_id_type root)
  650. {
  651. // broadcast the seed
  652. boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
  653. boost::mpi::broadcast(comm,val,root);
  654. }
  655. template<typename T, typename BinaryOperation>
  656. T*
  657. scan(const mpi_process_group& pg, T* first, T* last, T* out,
  658. BinaryOperation bin_op)
  659. {
  660. synchronize(pg);
  661. bool inplace = first == out;
  662. if (inplace) out = new T [last-first];
  663. boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
  664. if (inplace) {
  665. std::copy(out, out + (last-first), first);
  666. delete [] out;
  667. return last;
  668. }
  669. return out;
  670. }
  671. template<typename InputIterator, typename T>
  672. void
  673. all_gather(const mpi_process_group& pg, InputIterator first,
  674. InputIterator last, std::vector<T>& out)
  675. {
  676. synchronize(pg);
  677. // Stick a copy of the local values into a vector, so we can broadcast it
  678. std::vector<T> local_values(first, last);
  679. // Collect the number of vertices stored in each process
  680. int size = local_values.size();
  681. std::vector<int> sizes(num_processes(pg));
  682. int result = MPI_Allgather(&size, 1, MPI_INT,
  683. &sizes[0], 1, MPI_INT,
  684. communicator(pg));
  685. BOOST_ASSERT(result == MPI_SUCCESS);
  686. (void)result;
  687. // Adjust sizes based on the number of bytes
  688. //
  689. // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
  690. // std::bind2nd(std::multiplies<int>(), sizeof(T)));
  691. //
  692. // std::bind2nd has been removed from C++17
  693. for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
  694. {
  695. sizes[ i ] *= sizeof( T );
  696. }
  697. // Compute displacements
  698. std::vector<int> displacements;
  699. displacements.reserve(sizes.size() + 1);
  700. displacements.push_back(0);
  701. std::partial_sum(sizes.begin(), sizes.end(),
  702. std::back_inserter(displacements));
  703. // Gather all of the values
  704. out.resize(displacements.back() / sizeof(T));
  705. if (!out.empty()) {
  706. result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
  707. /* local results */: (void*)&local_values[0],
  708. local_values.size() * sizeof(T),
  709. MPI_BYTE,
  710. &out[0], &sizes[0], &displacements[0], MPI_BYTE,
  711. communicator(pg));
  712. }
  713. BOOST_ASSERT(result == MPI_SUCCESS);
  714. }
  715. template<typename InputIterator>
  716. mpi_process_group
  717. process_subgroup(const mpi_process_group& pg,
  718. InputIterator first, InputIterator last)
  719. {
  720. /*
  721. boost::mpi::group current_group = communicator(pg).group();
  722. boost::mpi::group new_group = current_group.include(first,last);
  723. boost::mpi::communicator new_comm(communicator(pg),new_group);
  724. return mpi_process_group(new_comm);
  725. */
  726. std::vector<int> ranks(first, last);
  727. MPI_Group current_group;
  728. int result = MPI_Comm_group(communicator(pg), &current_group);
  729. BOOST_ASSERT(result == MPI_SUCCESS);
  730. (void)result;
  731. MPI_Group new_group;
  732. result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
  733. BOOST_ASSERT(result == MPI_SUCCESS);
  734. MPI_Comm new_comm;
  735. result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
  736. BOOST_ASSERT(result == MPI_SUCCESS);
  737. result = MPI_Group_free(&new_group);
  738. BOOST_ASSERT(result == MPI_SUCCESS);
  739. result = MPI_Group_free(&current_group);
  740. BOOST_ASSERT(result == MPI_SUCCESS);
  741. if (new_comm != MPI_COMM_NULL) {
  742. mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
  743. result = MPI_Comm_free(&new_comm);
  744. BOOST_ASSERT(result == 0);
  745. return result_pg;
  746. } else {
  747. return mpi_process_group(mpi_process_group::create_empty());
  748. }
  749. }
  750. template<typename Receiver>
  751. Receiver* mpi_process_group::get_receiver()
  752. {
  753. return impl_->blocks[my_block_number()]->on_receive
  754. .template target<Receiver>();
  755. }
  756. template<typename T>
  757. typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
  758. receive_oob(const mpi_process_group& pg,
  759. mpi_process_group::process_id_type source, int tag, T& value, int block)
  760. {
  761. using boost::mpi::get_mpi_datatype;
  762. // Determine the actual message we expect to receive, and which
  763. // communicator it will come by.
  764. std::pair<boost::mpi::communicator, int> actual
  765. = pg.actual_communicator_and_tag(tag, block);
  766. // Post a non-blocking receive that waits until we complete this request.
  767. MPI_Request request;
  768. MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
  769. source, actual.second, actual.first, &request);
  770. int done = 0;
  771. do {
  772. MPI_Test(&request, &done, MPI_STATUS_IGNORE);
  773. if (!done)
  774. pg.poll(/*wait=*/false, block);
  775. } while (!done);
  776. }
  777. template<typename T>
  778. typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
  779. receive_oob(const mpi_process_group& pg,
  780. mpi_process_group::process_id_type source, int tag, T& value, int block)
  781. {
  782. // Determine the actual message we expect to receive, and which
  783. // communicator it will come by.
  784. std::pair<boost::mpi::communicator, int> actual
  785. = pg.actual_communicator_and_tag(tag, block);
  786. boost::optional<boost::mpi::status> status;
  787. do {
  788. status = actual.first.iprobe(source, actual.second);
  789. if (!status)
  790. pg.poll();
  791. } while (!status);
  792. //actual.first.recv(status->source(), status->tag(),value);
  793. // Allocate the receive buffer
  794. boost::mpi::packed_iarchive in(actual.first);
  795. #if BOOST_VERSION >= 103600
  796. in.resize(status->count<boost::mpi::packed>().get());
  797. #else
  798. int size;
  799. MPI_Status mpi_status = *status;
  800. MPI_Get_count(&mpi_status, MPI_PACKED, &size);
  801. in.resize(size);
  802. #endif
  803. // Receive the message data
  804. MPI_Recv(in.address(), in.size(), MPI_PACKED,
  805. status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
  806. // Unpack the message data
  807. in >> value;
  808. }
  809. template<typename SendT, typename ReplyT>
  810. typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  811. send_oob_with_reply(const mpi_process_group& pg,
  812. mpi_process_group::process_id_type dest,
  813. int tag, const SendT& send_value, ReplyT& reply_value,
  814. int block)
  815. {
  816. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  817. send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
  818. (int)reply_tag, send_value), block);
  819. receive_oob(pg, dest, reply_tag, reply_value);
  820. }
  821. template<typename SendT, typename ReplyT>
  822. typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  823. send_oob_with_reply(const mpi_process_group& pg,
  824. mpi_process_group::process_id_type dest,
  825. int tag, const SendT& send_value, ReplyT& reply_value,
  826. int block)
  827. {
  828. detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
  829. send_oob(pg, dest, tag,
  830. boost::parallel::detail::make_untracked_pair((int)reply_tag,
  831. send_value), block);
  832. receive_oob(pg, dest, reply_tag, reply_value);
  833. }
  834. } } } // end namespace boost::graph::distributed