unbuffered_channel.hpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  6. #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  7. #include <atomic>
  8. #include <chrono>
  9. #include <cstddef>
  10. #include <cstdint>
  11. #include <memory>
  12. #include <vector>
  13. #include <boost/config.hpp>
  14. #include <boost/fiber/channel_op_status.hpp>
  15. #include <boost/fiber/context.hpp>
  16. #include <boost/fiber/detail/config.hpp>
  17. #include <boost/fiber/detail/convert.hpp>
  18. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  19. #include <boost/fiber/detail/exchange.hpp>
  20. #endif
  21. #include <boost/fiber/detail/spinlock.hpp>
  22. #include <boost/fiber/exceptions.hpp>
  23. #ifdef BOOST_HAS_ABI_HEADERS
  24. # include BOOST_ABI_PREFIX
  25. #endif
  26. namespace boost {
  27. namespace fibers {
  28. template< typename T >
  29. class unbuffered_channel {
  30. public:
  31. typedef typename std::remove_reference< T >::type value_type;
  32. private:
  33. typedef context::wait_queue_t wait_queue_type;
  34. struct slot {
  35. value_type value;
  36. context * ctx;
  37. slot( value_type const& value_, context * ctx_) :
  38. value{ value_ },
  39. ctx{ ctx_ } {
  40. }
  41. slot( value_type && value_, context * ctx_) :
  42. value{ std::move( value_) },
  43. ctx{ ctx_ } {
  44. }
  45. };
  46. // shared cacheline
  47. std::atomic< slot * > slot_{ nullptr };
  48. // shared cacheline
  49. std::atomic_bool closed_{ false };
  50. mutable detail::spinlock splk_producers_{};
  51. wait_queue_type waiting_producers_{};
  52. mutable detail::spinlock splk_consumers_{};
  53. wait_queue_type waiting_consumers_{};
  54. char pad_[cacheline_length];
  55. bool is_empty_() {
  56. return nullptr == slot_.load( std::memory_order_acquire);
  57. }
  58. bool try_push_( slot * own_slot) {
  59. for (;;) {
  60. slot * s = slot_.load( std::memory_order_acquire);
  61. if ( nullptr == s) {
  62. if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
  63. continue;
  64. }
  65. return true;
  66. } else {
  67. return false;
  68. }
  69. }
  70. }
  71. slot * try_pop_() {
  72. slot * nil_slot = nullptr;
  73. for (;;) {
  74. slot * s = slot_.load( std::memory_order_acquire);
  75. if ( nullptr != s) {
  76. if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
  77. continue;}
  78. }
  79. return s;
  80. }
  81. }
  82. public:
  83. unbuffered_channel() {
  84. }
  85. ~unbuffered_channel() {
  86. close();
  87. }
  88. unbuffered_channel( unbuffered_channel const&) = delete;
  89. unbuffered_channel & operator=( unbuffered_channel const&) = delete;
  90. bool is_closed() const noexcept {
  91. return closed_.load( std::memory_order_acquire);
  92. }
  93. void close() noexcept {
  94. context * active_ctx = context::active();
  95. // set flag
  96. if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
  97. // notify current waiting
  98. slot * s = slot_.load( std::memory_order_acquire);
  99. if ( nullptr != s) {
  100. // notify context
  101. active_ctx->schedule( s->ctx);
  102. }
  103. // notify all waiting producers
  104. detail::spinlock_lock lk1{ splk_producers_ };
  105. while ( ! waiting_producers_.empty() ) {
  106. context * producer_ctx = & waiting_producers_.front();
  107. waiting_producers_.pop_front();
  108. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  109. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  110. // notify context
  111. active_ctx->schedule( producer_ctx);
  112. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  113. // no timed-wait op.
  114. // notify context
  115. active_ctx->schedule( producer_ctx);
  116. }
  117. }
  118. // notify all waiting consumers
  119. detail::spinlock_lock lk2{ splk_consumers_ };
  120. while ( ! waiting_consumers_.empty() ) {
  121. context * consumer_ctx = & waiting_consumers_.front();
  122. waiting_consumers_.pop_front();
  123. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  124. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  125. // notify context
  126. active_ctx->schedule( consumer_ctx);
  127. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  128. // no timed-wait op.
  129. // notify context
  130. active_ctx->schedule( consumer_ctx);
  131. }
  132. }
  133. }
  134. }
  135. channel_op_status push( value_type const& value) {
  136. context * active_ctx = context::active();
  137. slot s{ value, active_ctx };
  138. for (;;) {
  139. if ( BOOST_UNLIKELY( is_closed() ) ) {
  140. return channel_op_status::closed;
  141. }
  142. if ( try_push_( & s) ) {
  143. detail::spinlock_lock lk{ splk_consumers_ };
  144. // notify one waiting consumer
  145. while ( ! waiting_consumers_.empty() ) {
  146. context * consumer_ctx = & waiting_consumers_.front();
  147. waiting_consumers_.pop_front();
  148. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  149. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  150. // notify context
  151. active_ctx->schedule( consumer_ctx);
  152. break;
  153. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  154. // no timed-wait op.
  155. // notify context
  156. active_ctx->schedule( consumer_ctx);
  157. break;
  158. }
  159. }
  160. // suspend till value has been consumed
  161. active_ctx->suspend( lk);
  162. // resumed
  163. if ( nullptr == s.ctx) {
  164. // value has been consumed
  165. return channel_op_status::success;
  166. } else {
  167. // channel was closed before value was consumed
  168. return channel_op_status::closed;
  169. }
  170. } else {
  171. detail::spinlock_lock lk{ splk_producers_ };
  172. if ( BOOST_UNLIKELY( is_closed() ) ) {
  173. return channel_op_status::closed;
  174. }
  175. if ( is_empty_() ) {
  176. continue;
  177. }
  178. active_ctx->wait_link( waiting_producers_);
  179. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  180. // suspend this producer
  181. active_ctx->suspend( lk);
  182. // resumed, slot mabye free
  183. }
  184. }
  185. }
  186. channel_op_status push( value_type && value) {
  187. context * active_ctx = context::active();
  188. slot s{ std::move( value), active_ctx };
  189. for (;;) {
  190. if ( BOOST_UNLIKELY( is_closed() ) ) {
  191. return channel_op_status::closed;
  192. }
  193. if ( try_push_( & s) ) {
  194. detail::spinlock_lock lk{ splk_consumers_ };
  195. // notify one waiting consumer
  196. while ( ! waiting_consumers_.empty() ) {
  197. context * consumer_ctx = & waiting_consumers_.front();
  198. waiting_consumers_.pop_front();
  199. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  200. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  201. // notify context
  202. active_ctx->schedule( consumer_ctx);
  203. break;
  204. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  205. // no timed-wait op.
  206. // notify context
  207. active_ctx->schedule( consumer_ctx);
  208. break;
  209. }
  210. }
  211. // suspend till value has been consumed
  212. active_ctx->suspend( lk);
  213. // resumed
  214. if ( nullptr == s.ctx) {
  215. // value has been consumed
  216. return channel_op_status::success;
  217. } else {
  218. // channel was closed before value was consumed
  219. return channel_op_status::closed;
  220. }
  221. } else {
  222. detail::spinlock_lock lk{ splk_producers_ };
  223. if ( BOOST_UNLIKELY( is_closed() ) ) {
  224. return channel_op_status::closed;
  225. }
  226. if ( is_empty_() ) {
  227. continue;
  228. }
  229. active_ctx->wait_link( waiting_producers_);
  230. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  231. // suspend this producer
  232. active_ctx->suspend( lk);
  233. // resumed, slot mabye free
  234. }
  235. }
  236. }
  237. template< typename Rep, typename Period >
  238. channel_op_status push_wait_for( value_type const& value,
  239. std::chrono::duration< Rep, Period > const& timeout_duration) {
  240. return push_wait_until( value,
  241. std::chrono::steady_clock::now() + timeout_duration);
  242. }
  243. template< typename Rep, typename Period >
  244. channel_op_status push_wait_for( value_type && value,
  245. std::chrono::duration< Rep, Period > const& timeout_duration) {
  246. return push_wait_until( std::forward< value_type >( value),
  247. std::chrono::steady_clock::now() + timeout_duration);
  248. }
  249. template< typename Clock, typename Duration >
  250. channel_op_status push_wait_until( value_type const& value,
  251. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  252. context * active_ctx = context::active();
  253. slot s{ value, active_ctx };
  254. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  255. for (;;) {
  256. if ( BOOST_UNLIKELY( is_closed() ) ) {
  257. return channel_op_status::closed;
  258. }
  259. if ( try_push_( & s) ) {
  260. detail::spinlock_lock lk{ splk_consumers_ };
  261. // notify one waiting consumer
  262. while ( ! waiting_consumers_.empty() ) {
  263. context * consumer_ctx = & waiting_consumers_.front();
  264. waiting_consumers_.pop_front();
  265. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  266. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  267. // notify context
  268. active_ctx->schedule( consumer_ctx);
  269. break;
  270. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  271. // no timed-wait op.
  272. // notify context
  273. active_ctx->schedule( consumer_ctx);
  274. break;
  275. }
  276. }
  277. // suspend this producer
  278. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  279. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  280. // clear slot
  281. slot * nil_slot = nullptr, * own_slot = & s;
  282. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  283. // resumed, value has not been consumed
  284. return channel_op_status::timeout;
  285. }
  286. // resumed
  287. if ( nullptr == s.ctx) {
  288. // value has been consumed
  289. return channel_op_status::success;
  290. } else {
  291. // channel was closed before value was consumed
  292. return channel_op_status::closed;
  293. }
  294. } else {
  295. detail::spinlock_lock lk{ splk_producers_ };
  296. if ( BOOST_UNLIKELY( is_closed() ) ) {
  297. return channel_op_status::closed;
  298. }
  299. if ( is_empty_() ) {
  300. continue;
  301. }
  302. active_ctx->wait_link( waiting_producers_);
  303. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  304. // suspend this producer
  305. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  306. // relock local lk
  307. lk.lock();
  308. // remove from waiting-queue
  309. waiting_producers_.remove( * active_ctx);
  310. return channel_op_status::timeout;
  311. }
  312. // resumed, slot maybe free
  313. }
  314. }
  315. }
  316. template< typename Clock, typename Duration >
  317. channel_op_status push_wait_until( value_type && value,
  318. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  319. context * active_ctx = context::active();
  320. slot s{ std::move( value), active_ctx };
  321. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  322. for (;;) {
  323. if ( BOOST_UNLIKELY( is_closed() ) ) {
  324. return channel_op_status::closed;
  325. }
  326. if ( try_push_( & s) ) {
  327. detail::spinlock_lock lk{ splk_consumers_ };
  328. // notify one waiting consumer
  329. while ( ! waiting_consumers_.empty() ) {
  330. context * consumer_ctx = & waiting_consumers_.front();
  331. waiting_consumers_.pop_front();
  332. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  333. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  334. // notify context
  335. active_ctx->schedule( consumer_ctx);
  336. break;
  337. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  338. // no timed-wait op.
  339. // notify context
  340. active_ctx->schedule( consumer_ctx);
  341. break;
  342. }
  343. }
  344. // suspend this producer
  345. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  346. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  347. // clear slot
  348. slot * nil_slot = nullptr, * own_slot = & s;
  349. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  350. // resumed, value has not been consumed
  351. return channel_op_status::timeout;
  352. }
  353. // resumed
  354. if ( nullptr == s.ctx) {
  355. // value has been consumed
  356. return channel_op_status::success;
  357. } else {
  358. // channel was closed before value was consumed
  359. return channel_op_status::closed;
  360. }
  361. } else {
  362. detail::spinlock_lock lk{ splk_producers_ };
  363. if ( BOOST_UNLIKELY( is_closed() ) ) {
  364. return channel_op_status::closed;
  365. }
  366. if ( is_empty_() ) {
  367. continue;
  368. }
  369. active_ctx->wait_link( waiting_producers_);
  370. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  371. // suspend this producer
  372. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  373. // relock local lk
  374. lk.lock();
  375. // remove from waiting-queue
  376. waiting_producers_.remove( * active_ctx);
  377. return channel_op_status::timeout;
  378. }
  379. // resumed, slot maybe free
  380. }
  381. }
  382. }
  383. channel_op_status pop( value_type & value) {
  384. context * active_ctx = context::active();
  385. slot * s = nullptr;
  386. for (;;) {
  387. if ( nullptr != ( s = try_pop_() ) ) {
  388. {
  389. detail::spinlock_lock lk{ splk_producers_ };
  390. // notify one waiting producer
  391. while ( ! waiting_producers_.empty() ) {
  392. context * producer_ctx = & waiting_producers_.front();
  393. waiting_producers_.pop_front();
  394. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  395. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  396. lk.unlock();
  397. // notify context
  398. active_ctx->schedule( producer_ctx);
  399. break;
  400. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  401. lk.unlock();
  402. // no timed-wait op.
  403. // notify context
  404. active_ctx->schedule( producer_ctx);
  405. break;
  406. }
  407. }
  408. }
  409. value = std::move( s->value);
  410. // notify context
  411. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  412. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  413. #else
  414. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  415. #endif
  416. return channel_op_status::success;
  417. } else {
  418. detail::spinlock_lock lk{ splk_consumers_ };
  419. if ( BOOST_UNLIKELY( is_closed() ) ) {
  420. return channel_op_status::closed;
  421. }
  422. if ( ! is_empty_() ) {
  423. continue;
  424. }
  425. active_ctx->wait_link( waiting_consumers_);
  426. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  427. // suspend this consumer
  428. active_ctx->suspend( lk);
  429. // resumed, slot mabye set
  430. }
  431. }
  432. }
  433. value_type value_pop() {
  434. context * active_ctx = context::active();
  435. slot * s = nullptr;
  436. for (;;) {
  437. if ( nullptr != ( s = try_pop_() ) ) {
  438. {
  439. detail::spinlock_lock lk{ splk_producers_ };
  440. // notify one waiting producer
  441. while ( ! waiting_producers_.empty() ) {
  442. context * producer_ctx = & waiting_producers_.front();
  443. waiting_producers_.pop_front();
  444. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  445. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  446. lk.unlock();
  447. // notify context
  448. active_ctx->schedule( producer_ctx);
  449. break;
  450. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  451. lk.unlock();
  452. // no timed-wait op.
  453. // notify context
  454. active_ctx->schedule( producer_ctx);
  455. break;
  456. }
  457. }
  458. }
  459. // consume value
  460. value_type value = std::move( s->value);
  461. // notify context
  462. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  463. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  464. #else
  465. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  466. #endif
  467. return std::move( value);
  468. } else {
  469. detail::spinlock_lock lk{ splk_consumers_ };
  470. if ( BOOST_UNLIKELY( is_closed() ) ) {
  471. throw fiber_error{
  472. std::make_error_code( std::errc::operation_not_permitted),
  473. "boost fiber: channel is closed" };
  474. }
  475. if ( ! is_empty_() ) {
  476. continue;
  477. }
  478. active_ctx->wait_link( waiting_consumers_);
  479. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  480. // suspend this consumer
  481. active_ctx->suspend( lk);
  482. // resumed, slot mabye set
  483. }
  484. }
  485. }
  486. template< typename Rep, typename Period >
  487. channel_op_status pop_wait_for( value_type & value,
  488. std::chrono::duration< Rep, Period > const& timeout_duration) {
  489. return pop_wait_until( value,
  490. std::chrono::steady_clock::now() + timeout_duration);
  491. }
  492. template< typename Clock, typename Duration >
  493. channel_op_status pop_wait_until( value_type & value,
  494. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  495. context * active_ctx = context::active();
  496. slot * s = nullptr;
  497. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  498. for (;;) {
  499. if ( nullptr != ( s = try_pop_() ) ) {
  500. {
  501. detail::spinlock_lock lk{ splk_producers_ };
  502. // notify one waiting producer
  503. while ( ! waiting_producers_.empty() ) {
  504. context * producer_ctx = & waiting_producers_.front();
  505. waiting_producers_.pop_front();
  506. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  507. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  508. lk.unlock();
  509. // notify context
  510. active_ctx->schedule( producer_ctx);
  511. break;
  512. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  513. lk.unlock();
  514. // no timed-wait op.
  515. // notify context
  516. active_ctx->schedule( producer_ctx);
  517. break;
  518. }
  519. }
  520. }
  521. // consume value
  522. value = std::move( s->value);
  523. // notify context
  524. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  525. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  526. #else
  527. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  528. #endif
  529. return channel_op_status::success;
  530. } else {
  531. detail::spinlock_lock lk{ splk_consumers_ };
  532. if ( BOOST_UNLIKELY( is_closed() ) ) {
  533. return channel_op_status::closed;
  534. }
  535. if ( ! is_empty_() ) {
  536. continue;
  537. }
  538. active_ctx->wait_link( waiting_consumers_);
  539. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  540. // suspend this consumer
  541. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  542. // relock local lk
  543. lk.lock();
  544. // remove from waiting-queue
  545. waiting_consumers_.remove( * active_ctx);
  546. return channel_op_status::timeout;
  547. }
  548. }
  549. }
  550. }
  551. class iterator {
  552. private:
  553. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  554. unbuffered_channel * chan_{ nullptr };
  555. storage_type storage_;
  556. void increment_() {
  557. BOOST_ASSERT( nullptr != chan_);
  558. try {
  559. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  560. } catch ( fiber_error const&) {
  561. chan_ = nullptr;
  562. }
  563. }
  564. public:
  565. typedef std::input_iterator_tag iterator_category;
  566. typedef std::ptrdiff_t difference_type;
  567. typedef value_type * pointer;
  568. typedef value_type & reference;
  569. typedef pointer pointer_t;
  570. typedef reference reference_t;
  571. iterator() noexcept = default;
  572. explicit iterator( unbuffered_channel< T > * chan) noexcept :
  573. chan_{ chan } {
  574. increment_();
  575. }
  576. iterator( iterator const& other) noexcept :
  577. chan_{ other.chan_ } {
  578. }
  579. iterator & operator=( iterator const& other) noexcept {
  580. if ( this == & other) return * this;
  581. chan_ = other.chan_;
  582. return * this;
  583. }
  584. bool operator==( iterator const& other) const noexcept {
  585. return other.chan_ == chan_;
  586. }
  587. bool operator!=( iterator const& other) const noexcept {
  588. return other.chan_ != chan_;
  589. }
  590. iterator & operator++() {
  591. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  592. increment_();
  593. return * this;
  594. }
  595. iterator operator++( int) = delete;
  596. reference_t operator*() noexcept {
  597. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  598. }
  599. pointer_t operator->() noexcept {
  600. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  601. }
  602. };
  603. friend class iterator;
  604. };
  605. template< typename T >
  606. typename unbuffered_channel< T >::iterator
  607. begin( unbuffered_channel< T > & chan) {
  608. return typename unbuffered_channel< T >::iterator( & chan);
  609. }
  610. template< typename T >
  611. typename unbuffered_channel< T >::iterator
  612. end( unbuffered_channel< T > &) {
  613. return typename unbuffered_channel< T >::iterator();
  614. }
  615. }}
  616. #ifdef BOOST_HAS_ABI_HEADERS
  617. # include BOOST_ABI_SUFFIX
  618. #endif
  619. #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H