[/ Copyright Oliver Kowalke 2013. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt ] [section:buffered_channel Buffered Channel] __boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to synchonize fibers (running on same or different threads) via asynchronouss message passing. typedef boost::fibers::buffered_channel< int > channel_t; void send( channel_t & chan) { for ( int i = 0; i < 5; ++i) { chan.push( i); } chan.close(); } void recv( channel_t & chan) { int i; while ( boost::fibers::channel_op_status::success == chan.pop(i) ) { std::cout << "received " << i << std::endl; } } channel_t chan{ 2 }; boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) ); boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) ); f1.join(); f2.join(); Class `buffered_channel` supports range-for syntax: typedef boost::fibers::buffered_channel< int > channel_t; void foo( channel_t & chan) { chan.push( 1); chan.push( 1); chan.push( 2); chan.push( 3); chan.push( 5); chan.push( 8); chan.push( 12); chan.close(); } void bar( channel_t & chan) { for ( unsigned int value : chan) { std::cout << value << " "; } std::cout << std::endl; } [template_heading buffered_channel] #include namespace boost { namespace fibers { template< typename T > class buffered_channel { public: typedef T value_type; class iterator; explicit buffered_channel( std::size_t capacity); buffered_channel( buffered_channel const& other) = delete; buffered_channel & operator=( buffered_channel const& other) = delete; void close() noexcept; channel_op_status push( value_type const& va); channel_op_status push( value_type && va); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type const& va, std::chrono::duration< Rep, Period > const& timeout_duration); channel_op_status push_wait_for( value_type && va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& va, std::chrono::time_point< Clock, Duration > const& timeout_time); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); channel_op_status pop( value_type & va); value_type value_pop(); template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_pop( value_type & va); }; template< typename T > buffered_channel< T >::iterator begin( buffered_channel< T > & chan); template< typename T > buffered_channel< T >::iterator end( buffered_channel< T > & chan); }} [heading Constructor] explicit buffered_channel( std::size_t capacity); [variablelist [[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]] [[Effects:] [The constructor constructs an object of class `buffered_channel` with an internal buffer of size `capacity`.]] [[Throws:] [`fiber_error`]] [[Error Conditions:] [ [*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]] [[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block until the number of values in the channel becomes equal to `capacity`. The channel can hold only `capacity - 1` elements, otherwise it is considered to be full.]] ] [member_heading buffered_channel..close] void close() noexcept; [variablelist [[Effects:] [Deactivates the channel. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`. Fibers blocked in `this->value_pop()` will receive an exception.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] [template buffered_channel_push_effects[enqueues] If channel is closed, returns `closed`. [enqueues] the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. If the channel is full, the fiber is blocked.] [member_heading buffered_channel..push] channel_op_status push( value_type const& va); channel_op_status push( value_type && va); [variablelist [[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]] [[Throws:] [Exceptions thrown by copy- or move-operations.]] ] [template buffered_channel_try_push_effects[enqueues] If channel is closed, returns `closed`. [enqueues] the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`. If the channel is full, it doesn't block and returns `full`.] [member_heading buffered_channel..try_push] channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); [variablelist [[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]] [[Throws:] [Exceptions thrown by copy- or move-operations.]] ] [template buffered_channel_pop[cls unblocking] [member_heading [cls]..pop] channel_op_status pop( value_type & va); [variablelist [[Effects:] [Dequeues a value from the channel. If the channel is empty, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value) or the channel gets `close()`d (return value `closed`)[unblocking]]] [[Throws:] [Exceptions thrown by copy- or move-operations.]] ] ] [buffered_channel_pop buffered_channel .] [template buffered_channel_value_pop[cls unblocking] [member_heading [cls]..value_pop] value_type value_pop(); [variablelist [[Effects:] [Dequeues a value from the channel. If the channel is empty, the fiber gets suspended until at least one new item is `push()`ed or the channel gets `close()`d (which throws an exception)[unblocking]]] [[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]] [[Error conditions:] [`std::errc::operation_not_permitted`]] ] ] [buffered_channel_value_pop buffered_channel .] [template buffered_channel_try_pop[cls unblocking] [member_heading [cls]..try_pop] channel_op_status try_pop( value_type & va); [variablelist [[Effects:] [If channel is empty, returns `empty`. If channel is closed, returns `closed`. Otherwise it returns `success` and `va` contains the dequeued value[unblocking]]] [[Throws:] [Exceptions thrown by copy- or move-operations.]] ] ] [buffered_channel_try_pop buffered_channel .] [template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel is not empty, immediately dequeues a value from the channel. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the channel gets `close()`d (return value `closed`), or the system time reaches [endtime] (return value `timeout`)[unblocking]] [template buffered_channel_pop_wait_for[cls unblocking] [member_heading [cls]..pop_wait_for] template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout time as (system time + `timeout_duration`). [buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]] [[Throws:] [timeout-related exceptions or by copy- or move-operations.]] ] ] [buffered_channel_pop_wait_for buffered_channel .] [template buffered_channel_pop_wait_until[cls unblocking] [member_heading [cls]..pop_wait_until] template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time) [variablelist [[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`. [buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]] [[Throws:] [timeout-related exceptions or by copy- or move-operations.]] ] ] [buffered_channel_pop_wait_until buffered_channel .] [heading Non-member function `begin( buffered_channel< T > &)`] template< typename T > buffered_channel< T >::iterator begin( buffered_channel< T > &); [variablelist [[Returns:] [Returns a range-iterator (input-iterator).]] ] [heading Non-member function `end( buffered_channel< T > &)`] template< typename T > buffered_channel< R >::iterator end( buffered_channel< T > &); [variablelist [[Returns:] [Returns an end range-iterator (input-iterator).]] ] [endsect]