buffered_channel.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
  7. #define BOOST_FIBERS_BUFFERED_CHANNEL_H
  8. #include <atomic>
  9. #include <chrono>
  10. #include <cstddef>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <type_traits>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/channel_op_status.hpp>
  16. #include <boost/fiber/context.hpp>
  17. #include <boost/fiber/detail/config.hpp>
  18. #include <boost/fiber/detail/convert.hpp>
  19. #include <boost/fiber/detail/spinlock.hpp>
  20. #include <boost/fiber/exceptions.hpp>
  21. #ifdef BOOST_HAS_ABI_HEADERS
  22. # include BOOST_ABI_PREFIX
  23. #endif
  24. namespace boost {
  25. namespace fibers {
  26. template< typename T >
  27. class buffered_channel {
  28. public:
  29. typedef typename std::remove_reference< T >::type value_type;
  30. private:
  31. typedef context::wait_queue_t wait_queue_type;
  32. typedef value_type slot_type;
  33. mutable detail::spinlock splk_{};
  34. wait_queue_type waiting_producers_{};
  35. wait_queue_type waiting_consumers_{};
  36. slot_type * slots_;
  37. std::size_t pidx_{ 0 };
  38. std::size_t cidx_{ 0 };
  39. std::size_t capacity_;
  40. bool closed_{ false };
  41. bool is_full_() const noexcept {
  42. return cidx_ == ((pidx_ + 1) % capacity_);
  43. }
  44. bool is_empty_() const noexcept {
  45. return cidx_ == pidx_;
  46. }
  47. bool is_closed_() const noexcept {
  48. return closed_;
  49. }
  50. public:
  51. explicit buffered_channel( std::size_t capacity) :
  52. capacity_{ capacity } {
  53. if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
  54. throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
  55. "boost fiber: buffer capacity is invalid" };
  56. }
  57. slots_ = new slot_type[capacity_];
  58. }
  59. ~buffered_channel() {
  60. close();
  61. delete [] slots_;
  62. }
  63. buffered_channel( buffered_channel const&) = delete;
  64. buffered_channel & operator=( buffered_channel const&) = delete;
  65. bool is_closed() const noexcept {
  66. detail::spinlock_lock lk{ splk_ };
  67. return is_closed_();
  68. }
  69. void close() noexcept {
  70. context * active_ctx = context::active();
  71. detail::spinlock_lock lk{ splk_ };
  72. if ( ! closed_) {
  73. closed_ = true;
  74. // notify all waiting producers
  75. while ( ! waiting_producers_.empty() ) {
  76. context * producer_ctx = & waiting_producers_.front();
  77. waiting_producers_.pop_front();
  78. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  79. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  80. // notify context
  81. active_ctx->schedule( producer_ctx);
  82. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  83. // no timed-wait op.
  84. // notify context
  85. active_ctx->schedule( producer_ctx);
  86. }
  87. }
  88. // notify all waiting consumers
  89. while ( ! waiting_consumers_.empty() ) {
  90. context * consumer_ctx = & waiting_consumers_.front();
  91. waiting_consumers_.pop_front();
  92. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  93. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  94. // notify context
  95. active_ctx->schedule( consumer_ctx);
  96. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  97. // no timed-wait op.
  98. // notify context
  99. active_ctx->schedule( consumer_ctx);
  100. }
  101. }
  102. }
  103. }
  104. channel_op_status try_push( value_type const& value) {
  105. context * active_ctx = context::active();
  106. detail::spinlock_lock lk{ splk_ };
  107. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  108. return channel_op_status::closed;
  109. } else if ( is_full_() ) {
  110. return channel_op_status::full;
  111. } else {
  112. slots_[pidx_] = value;
  113. pidx_ = (pidx_ + 1) % capacity_;
  114. // notify one waiting consumer
  115. while ( ! waiting_consumers_.empty() ) {
  116. context * consumer_ctx = & waiting_consumers_.front();
  117. waiting_consumers_.pop_front();
  118. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  119. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  120. lk.unlock();
  121. // notify context
  122. active_ctx->schedule( consumer_ctx);
  123. break;
  124. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  125. lk.unlock();
  126. // no timed-wait op.
  127. // notify context
  128. active_ctx->schedule( consumer_ctx);
  129. break;
  130. }
  131. }
  132. return channel_op_status::success;
  133. }
  134. }
  135. channel_op_status try_push( value_type && value) {
  136. context * active_ctx = context::active();
  137. detail::spinlock_lock lk{ splk_ };
  138. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  139. return channel_op_status::closed;
  140. } else if ( is_full_() ) {
  141. return channel_op_status::full;
  142. } else {
  143. slots_[pidx_] = std::move( value);
  144. pidx_ = (pidx_ + 1) % capacity_;
  145. // notify one waiting consumer
  146. while ( ! waiting_consumers_.empty() ) {
  147. context * consumer_ctx = & waiting_consumers_.front();
  148. waiting_consumers_.pop_front();
  149. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  150. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  151. lk.unlock();
  152. // notify context
  153. active_ctx->schedule( consumer_ctx);
  154. break;
  155. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  156. lk.unlock();
  157. // no timed-wait op.
  158. // notify context
  159. active_ctx->schedule( consumer_ctx);
  160. break;
  161. }
  162. }
  163. return channel_op_status::success;
  164. }
  165. }
  166. channel_op_status push( value_type const& value) {
  167. context * active_ctx = context::active();
  168. for (;;) {
  169. detail::spinlock_lock lk{ splk_ };
  170. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  171. return channel_op_status::closed;
  172. } else if ( is_full_() ) {
  173. active_ctx->wait_link( waiting_producers_);
  174. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  175. // suspend this producer
  176. active_ctx->suspend( lk);
  177. } else {
  178. slots_[pidx_] = value;
  179. pidx_ = (pidx_ + 1) % capacity_;
  180. // notify one waiting consumer
  181. while ( ! waiting_consumers_.empty() ) {
  182. context * consumer_ctx = & waiting_consumers_.front();
  183. waiting_consumers_.pop_front();
  184. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  185. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  186. lk.unlock();
  187. // notify context
  188. active_ctx->schedule( consumer_ctx);
  189. break;
  190. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  191. lk.unlock();
  192. // no timed-wait op.
  193. // notify context
  194. active_ctx->schedule( consumer_ctx);
  195. break;
  196. }
  197. }
  198. return channel_op_status::success;
  199. }
  200. }
  201. }
  202. channel_op_status push( value_type && value) {
  203. context * active_ctx = context::active();
  204. for (;;) {
  205. detail::spinlock_lock lk{ splk_ };
  206. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  207. return channel_op_status::closed;
  208. } else if ( is_full_() ) {
  209. active_ctx->wait_link( waiting_producers_);
  210. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  211. // suspend this producer
  212. active_ctx->suspend( lk);
  213. } else {
  214. slots_[pidx_] = std::move( value);
  215. pidx_ = (pidx_ + 1) % capacity_;
  216. // notify one waiting consumer
  217. while ( ! waiting_consumers_.empty() ) {
  218. context * consumer_ctx = & waiting_consumers_.front();
  219. waiting_consumers_.pop_front();
  220. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  221. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  222. lk.unlock();
  223. // notify context
  224. active_ctx->schedule( consumer_ctx);
  225. break;
  226. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  227. lk.unlock();
  228. // no timed-wait op.
  229. // notify context
  230. active_ctx->schedule( consumer_ctx);
  231. break;
  232. }
  233. }
  234. return channel_op_status::success;
  235. }
  236. }
  237. }
  238. template< typename Rep, typename Period >
  239. channel_op_status push_wait_for( value_type const& value,
  240. std::chrono::duration< Rep, Period > const& timeout_duration) {
  241. return push_wait_until( value,
  242. std::chrono::steady_clock::now() + timeout_duration);
  243. }
  244. template< typename Rep, typename Period >
  245. channel_op_status push_wait_for( value_type && value,
  246. std::chrono::duration< Rep, Period > const& timeout_duration) {
  247. return push_wait_until( std::forward< value_type >( value),
  248. std::chrono::steady_clock::now() + timeout_duration);
  249. }
  250. template< typename Clock, typename Duration >
  251. channel_op_status push_wait_until( value_type const& value,
  252. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  253. context * active_ctx = context::active();
  254. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  255. for (;;) {
  256. detail::spinlock_lock lk{ splk_ };
  257. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  258. return channel_op_status::closed;
  259. } else if ( is_full_() ) {
  260. active_ctx->wait_link( waiting_producers_);
  261. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  262. // suspend this producer
  263. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  264. // relock local lk
  265. lk.lock();
  266. // remove from waiting-queue
  267. waiting_producers_.remove( * active_ctx);
  268. return channel_op_status::timeout;
  269. }
  270. } else {
  271. slots_[pidx_] = value;
  272. pidx_ = (pidx_ + 1) % capacity_;
  273. // notify one waiting consumer
  274. while ( ! waiting_consumers_.empty() ) {
  275. context * consumer_ctx = & waiting_consumers_.front();
  276. waiting_consumers_.pop_front();
  277. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  278. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  279. lk.unlock();
  280. // notify context
  281. active_ctx->schedule( consumer_ctx);
  282. break;
  283. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  284. lk.unlock();
  285. // no timed-wait op.
  286. // notify context
  287. active_ctx->schedule( consumer_ctx);
  288. break;
  289. }
  290. }
  291. return channel_op_status::success;
  292. }
  293. }
  294. }
  295. template< typename Clock, typename Duration >
  296. channel_op_status push_wait_until( value_type && value,
  297. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  298. context * active_ctx = context::active();
  299. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  300. for (;;) {
  301. detail::spinlock_lock lk{ splk_ };
  302. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  303. return channel_op_status::closed;
  304. } else if ( is_full_() ) {
  305. active_ctx->wait_link( waiting_producers_);
  306. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  307. // suspend this producer
  308. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  309. // relock local lk
  310. lk.lock();
  311. // remove from waiting-queue
  312. waiting_producers_.remove( * active_ctx);
  313. return channel_op_status::timeout;
  314. }
  315. } else {
  316. slots_[pidx_] = std::move( value);
  317. pidx_ = (pidx_ + 1) % capacity_;
  318. // notify one waiting consumer
  319. while ( ! waiting_consumers_.empty() ) {
  320. context * consumer_ctx = & waiting_consumers_.front();
  321. waiting_consumers_.pop_front();
  322. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  323. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  324. lk.unlock();
  325. // notify context
  326. active_ctx->schedule( consumer_ctx);
  327. break;
  328. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  329. lk.unlock();
  330. // no timed-wait op.
  331. // notify context
  332. active_ctx->schedule( consumer_ctx);
  333. break;
  334. }
  335. }
  336. return channel_op_status::success;
  337. }
  338. }
  339. }
  340. channel_op_status try_pop( value_type & value) {
  341. context * active_ctx = context::active();
  342. detail::spinlock_lock lk{ splk_ };
  343. if ( is_empty_() ) {
  344. return is_closed_()
  345. ? channel_op_status::closed
  346. : channel_op_status::empty;
  347. } else {
  348. value = std::move( slots_[cidx_]);
  349. cidx_ = (cidx_ + 1) % capacity_;
  350. // notify one waiting producer
  351. while ( ! waiting_producers_.empty() ) {
  352. context * producer_ctx = & waiting_producers_.front();
  353. waiting_producers_.pop_front();
  354. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  355. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  356. lk.unlock();
  357. // notify context
  358. active_ctx->schedule( producer_ctx);
  359. break;
  360. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  361. lk.unlock();
  362. // no timed-wait op.
  363. // notify context
  364. active_ctx->schedule( producer_ctx);
  365. break;
  366. }
  367. }
  368. return channel_op_status::success;
  369. }
  370. }
  371. channel_op_status pop( value_type & value) {
  372. context * active_ctx = context::active();
  373. for (;;) {
  374. detail::spinlock_lock lk{ splk_ };
  375. if ( is_empty_() ) {
  376. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  377. return channel_op_status::closed;
  378. } else {
  379. active_ctx->wait_link( waiting_consumers_);
  380. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  381. // suspend this consumer
  382. active_ctx->suspend( lk);
  383. }
  384. } else {
  385. value = std::move( slots_[cidx_]);
  386. cidx_ = (cidx_ + 1) % capacity_;
  387. // notify one waiting producer
  388. while ( ! waiting_producers_.empty() ) {
  389. context * producer_ctx = & waiting_producers_.front();
  390. waiting_producers_.pop_front();
  391. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  392. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  393. lk.unlock();
  394. // notify context
  395. active_ctx->schedule( producer_ctx);
  396. break;
  397. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  398. lk.unlock();
  399. // no timed-wait op.
  400. // notify context
  401. active_ctx->schedule( producer_ctx);
  402. break;
  403. }
  404. }
  405. return channel_op_status::success;
  406. }
  407. }
  408. }
  409. value_type value_pop() {
  410. context * active_ctx = context::active();
  411. for (;;) {
  412. detail::spinlock_lock lk{ splk_ };
  413. if ( is_empty_() ) {
  414. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  415. throw fiber_error{
  416. std::make_error_code( std::errc::operation_not_permitted),
  417. "boost fiber: channel is closed" };
  418. } else {
  419. active_ctx->wait_link( waiting_consumers_);
  420. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  421. // suspend this consumer
  422. active_ctx->suspend( lk);
  423. }
  424. } else {
  425. value_type value = std::move( slots_[cidx_]);
  426. cidx_ = (cidx_ + 1) % capacity_;
  427. // notify one waiting producer
  428. while ( ! waiting_producers_.empty() ) {
  429. context * producer_ctx = & waiting_producers_.front();
  430. waiting_producers_.pop_front();
  431. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  432. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  433. lk.unlock();
  434. // notify context
  435. active_ctx->schedule( producer_ctx);
  436. break;
  437. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  438. lk.unlock();
  439. // no timed-wait op.
  440. // notify context
  441. active_ctx->schedule( producer_ctx);
  442. break;
  443. }
  444. }
  445. return std::move( value);
  446. }
  447. }
  448. }
  449. template< typename Rep, typename Period >
  450. channel_op_status pop_wait_for( value_type & value,
  451. std::chrono::duration< Rep, Period > const& timeout_duration) {
  452. return pop_wait_until( value,
  453. std::chrono::steady_clock::now() + timeout_duration);
  454. }
  455. template< typename Clock, typename Duration >
  456. channel_op_status pop_wait_until( value_type & value,
  457. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  458. context * active_ctx = context::active();
  459. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  460. for (;;) {
  461. detail::spinlock_lock lk{ splk_ };
  462. if ( is_empty_() ) {
  463. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  464. return channel_op_status::closed;
  465. } else {
  466. active_ctx->wait_link( waiting_consumers_);
  467. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  468. // suspend this consumer
  469. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  470. // relock local lk
  471. lk.lock();
  472. // remove from waiting-queue
  473. waiting_consumers_.remove( * active_ctx);
  474. return channel_op_status::timeout;
  475. }
  476. }
  477. } else {
  478. value = std::move( slots_[cidx_]);
  479. cidx_ = (cidx_ + 1) % capacity_;
  480. // notify one waiting producer
  481. while ( ! waiting_producers_.empty() ) {
  482. context * producer_ctx = & waiting_producers_.front();
  483. waiting_producers_.pop_front();
  484. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  485. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  486. lk.unlock();
  487. // notify context
  488. active_ctx->schedule( producer_ctx);
  489. break;
  490. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  491. lk.unlock();
  492. // no timed-wait op.
  493. // notify context
  494. active_ctx->schedule( producer_ctx);
  495. break;
  496. }
  497. }
  498. return channel_op_status::success;
  499. }
  500. }
  501. }
  502. class iterator {
  503. private:
  504. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  505. buffered_channel * chan_{ nullptr };
  506. storage_type storage_;
  507. void increment_() {
  508. BOOST_ASSERT( nullptr != chan_);
  509. try {
  510. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  511. } catch ( fiber_error const&) {
  512. chan_ = nullptr;
  513. }
  514. }
  515. public:
  516. typedef std::input_iterator_tag iterator_category;
  517. typedef std::ptrdiff_t difference_type;
  518. typedef value_type * pointer;
  519. typedef value_type & reference;
  520. typedef pointer pointer_t;
  521. typedef reference reference_t;
  522. iterator() noexcept = default;
  523. explicit iterator( buffered_channel< T > * chan) noexcept :
  524. chan_{ chan } {
  525. increment_();
  526. }
  527. iterator( iterator const& other) noexcept :
  528. chan_{ other.chan_ } {
  529. }
  530. iterator & operator=( iterator const& other) noexcept {
  531. if ( BOOST_LIKELY( this != & other) ) {
  532. chan_ = other.chan_;
  533. }
  534. return * this;
  535. }
  536. bool operator==( iterator const& other) const noexcept {
  537. return other.chan_ == chan_;
  538. }
  539. bool operator!=( iterator const& other) const noexcept {
  540. return other.chan_ != chan_;
  541. }
  542. iterator & operator++() {
  543. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  544. increment_();
  545. return * this;
  546. }
  547. iterator operator++( int) = delete;
  548. reference_t operator*() noexcept {
  549. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  550. }
  551. pointer_t operator->() noexcept {
  552. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  553. }
  554. };
  555. friend class iterator;
  556. };
  557. template< typename T >
  558. typename buffered_channel< T >::iterator
  559. begin( buffered_channel< T > & chan) {
  560. return typename buffered_channel< T >::iterator( & chan);
  561. }
  562. template< typename T >
  563. typename buffered_channel< T >::iterator
  564. end( buffered_channel< T > &) {
  565. return typename buffered_channel< T >::iterator();
  566. }
  567. }}
  568. #ifdef BOOST_HAS_ABI_HEADERS
  569. # include BOOST_ABI_SUFFIX
  570. #endif
  571. #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H