123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- [/
- Copyright Oliver Kowalke 2013.
- Distributed under the Boost Software License, Version 1.0.
- (See accompanying file LICENSE_1_0.txt or copy at
- http://www.boost.org/LICENSE_1_0.txt
- ]
- [section:buffered_channel Buffered Channel]
- __boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to
- synchonize fibers (running on same or different threads) via asynchronouss
- message passing.
- typedef boost::fibers::buffered_channel< int > channel_t;
- void send( channel_t & chan) {
- for ( int i = 0; i < 5; ++i) {
- chan.push( i);
- }
- chan.close();
- }
- void recv( channel_t & chan) {
- int i;
- while ( boost::fibers::channel_op_status::success == chan.pop(i) ) {
- std::cout << "received " << i << std::endl;
- }
- }
- channel_t chan{ 2 };
- boost::fibers::fiber f1( std::bind( send, std::ref( chan) ) );
- boost::fibers::fiber f2( std::bind( recv, std::ref( chan) ) );
- f1.join();
- f2.join();
- Class `buffered_channel` supports range-for syntax:
- typedef boost::fibers::buffered_channel< int > channel_t;
- void foo( channel_t & chan) {
- chan.push( 1);
- chan.push( 1);
- chan.push( 2);
- chan.push( 3);
- chan.push( 5);
- chan.push( 8);
- chan.push( 12);
- chan.close();
- }
- void bar( channel_t & chan) {
- for ( unsigned int value : chan) {
- std::cout << value << " ";
- }
- std::cout << std::endl;
- }
- [template_heading buffered_channel]
- #include <boost/fiber/buffered_channel.hpp>
- namespace boost {
- namespace fibers {
- template< typename T >
- class buffered_channel {
- public:
- typedef T value_type;
- class iterator;
- explicit buffered_channel( std::size_t capacity);
- buffered_channel( buffered_channel const& other) = delete;
- buffered_channel & operator=( buffered_channel const& other) = delete;
- void close() noexcept;
- channel_op_status push( value_type const& va);
- channel_op_status push( value_type && va);
- template< typename Rep, typename Period >
- channel_op_status push_wait_for(
- value_type const& va,
- std::chrono::duration< Rep, Period > const& timeout_duration);
- channel_op_status push_wait_for( value_type && va,
- std::chrono::duration< Rep, Period > const& timeout_duration);
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until(
- value_type const& va,
- std::chrono::time_point< Clock, Duration > const& timeout_time);
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until(
- value_type && va,
- std::chrono::time_point< Clock, Duration > const& timeout_time);
- channel_op_status try_push( value_type const& va);
- channel_op_status try_push( value_type && va);
- channel_op_status pop( value_type & va);
- value_type value_pop();
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for(
- value_type & va,
- std::chrono::duration< Rep, Period > const& timeout_duration);
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until(
- value_type & va,
- std::chrono::time_point< Clock, Duration > const& timeout_time);
- channel_op_status try_pop( value_type & va);
- };
- template< typename T >
- buffered_channel< T >::iterator begin( buffered_channel< T > & chan);
- template< typename T >
- buffered_channel< T >::iterator end( buffered_channel< T > & chan);
- }}
- [heading Constructor]
- explicit buffered_channel( std::size_t capacity);
- [variablelist
- [[Preconditions:] [`2<=capacity && 0==(capacity & (capacity-1))`]]
- [[Effects:] [The constructor constructs an object of class `buffered_channel`
- with an internal buffer of size `capacity`.]]
- [[Throws:] [`fiber_error`]]
- [[Error Conditions:] [
- [*invalid_argument]: if `0==capacity || 0!=(capacity & (capacity-1))`.]]
- [[Notes:] [A `push()`, `push_wait_for()` or `push_wait_until()` will not block
- until the number of values in the channel becomes equal to `capacity`.
- The channel can hold only `capacity - 1` elements, otherwise it is
- considered to be full.]]
- ]
- [member_heading buffered_channel..close]
- void close() noexcept;
- [variablelist
- [[Effects:] [Deactivates the channel. No values can be put after calling
- `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()`
- or `this->pop_wait_until()` will return `closed`. Fibers blocked in
- `this->value_pop()` will receive an exception.]]
- [[Throws:] [Nothing.]]
- [[Note:] [`close()` is like closing a pipe. It informs waiting consumers
- that no more values will arrive.]]
- ]
- [template buffered_channel_push_effects[enqueues] If channel is closed, returns
- `closed`. [enqueues] the value in the channel, wakes up a fiber
- blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
- `this->pop_wait_until()` and returns `success`. If the channel is full,
- the fiber is blocked.]
- [member_heading buffered_channel..push]
- channel_op_status push( value_type const& va);
- channel_op_status push( value_type && va);
- [variablelist
- [[Effects:] [[buffered_channel_push_effects Otherwise enqueues]]]
- [[Throws:] [Exceptions thrown by copy- or move-operations.]]
- ]
- [template buffered_channel_try_push_effects[enqueues] If channel is closed, returns
- `closed`. [enqueues] the value in the channel, wakes up a fiber
- blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or
- `this->pop_wait_until()` and returns `success`. If the channel is full,
- it doesn't block and returns `full`.]
- [member_heading buffered_channel..try_push]
- channel_op_status try_push( value_type const& va);
- channel_op_status try_push( value_type && va);
- [variablelist
- [[Effects:] [[buffered_channel_try_push_effects Otherwise enqueues]]]
- [[Throws:] [Exceptions thrown by copy- or move-operations.]]
- ]
- [template buffered_channel_pop[cls unblocking]
- [member_heading [cls]..pop]
- channel_op_status pop( value_type & va);
- [variablelist
- [[Effects:] [Dequeues a value from the channel. If the channel is empty, the
- fiber gets suspended until at least one new item is `push()`ed (return value
- `success` and `va` contains dequeued value) or the channel gets `close()`d
- (return value `closed`)[unblocking]]]
- [[Throws:] [Exceptions thrown by copy- or move-operations.]]
- ]
- ]
- [buffered_channel_pop buffered_channel .]
- [template buffered_channel_value_pop[cls unblocking]
- [member_heading [cls]..value_pop]
- value_type value_pop();
- [variablelist
- [[Effects:] [Dequeues a value from the channel. If the channel is empty, the
- fiber gets suspended until at least one new item is `push()`ed or the channel
- gets `close()`d (which throws an exception)[unblocking]]]
- [[Throws:] [`fiber_error` if `*this` is closed or by copy- or move-operations.]]
- [[Error conditions:] [`std::errc::operation_not_permitted`]]
- ]
- ]
- [buffered_channel_value_pop buffered_channel .]
- [template buffered_channel_try_pop[cls unblocking]
- [member_heading [cls]..try_pop]
- channel_op_status try_pop( value_type & va);
- [variablelist
- [[Effects:] [If channel is empty, returns `empty`. If channel is closed,
- returns `closed`. Otherwise it returns `success` and `va` contains the
- dequeued value[unblocking]]]
- [[Throws:] [Exceptions thrown by copy- or move-operations.]]
- ]
- ]
- [buffered_channel_try_pop buffered_channel .]
- [template buffered_channel_pop_wait_until_effects[endtime unblocking] If channel
- is not empty, immediately dequeues a value from the channel. Otherwise
- the fiber gets suspended until at least one new item is `push()`ed (return
- value `success` and `va` contains dequeued value), or the channel gets
- `close()`d (return value `closed`), or the system time reaches [endtime]
- (return value `timeout`)[unblocking]]
- [template buffered_channel_pop_wait_for[cls unblocking]
- [member_heading [cls]..pop_wait_for]
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for(
- value_type & va,
- std::chrono::duration< Rep, Period > const& timeout_duration)
- [variablelist
- [[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout
- time as (system time + `timeout_duration`).
- [buffered_channel_pop_wait_until_effects the computed timeout time..[unblocking]]]]
- [[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
- ]
- ]
- [buffered_channel_pop_wait_for buffered_channel .]
- [template buffered_channel_pop_wait_until[cls unblocking]
- [member_heading [cls]..pop_wait_until]
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until(
- value_type & va,
- std::chrono::time_point< Clock, Duration > const& timeout_time)
- [variablelist
- [[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`.
- [buffered_channel_pop_wait_until_effects the passed `time_point`..[unblocking]]]]
- [[Throws:] [timeout-related exceptions or by copy- or move-operations.]]
- ]
- ]
- [buffered_channel_pop_wait_until buffered_channel .]
- [heading Non-member function `begin( buffered_channel< T > &)`]
- template< typename T >
- buffered_channel< T >::iterator begin( buffered_channel< T > &);
- [variablelist
- [[Returns:] [Returns a range-iterator (input-iterator).]]
- ]
- [heading Non-member function `end( buffered_channel< T > &)`]
- template< typename T >
- buffered_channel< R >::iterator end( buffered_channel< T > &);
- [variablelist
- [[Returns:] [Returns an end range-iterator (input-iterator).]]
- ]
- [endsect]
|