123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984 |
- // Copyright Nat Goodspeed 2015.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- //
- #include <algorithm>
- #include <cassert>
- #include <chrono>
- #include <iostream>
- #include <memory>
- #include <sstream>
- #include <string>
- #include <type_traits>
- #include <utility>
- #include <vector>
- #include <boost/fiber/all.hpp>
- #include <boost/variant/variant.hpp>
- #include <boost/variant/get.hpp>
- // These are wait_something() functions rather than when_something()
- // functions. A big part of the point of the Fiber library is to model
- // sequencing using the processor's instruction pointer rather than chains of
- // callbacks. The future-oriented when_all() / when_any() functions are still
- // based on chains of callbacks. With Fiber, we can do better.
- /*****************************************************************************
- * Verbose
- *****************************************************************************/
- class Verbose {
- public:
- Verbose( std::string const& d):
- desc( d) {
- std::cout << desc << " start" << std::endl;
- }
- ~Verbose() {
- std::cout << desc << " stop" << std::endl;
- }
- Verbose( Verbose const&) = delete;
- Verbose & operator=( Verbose const&) = delete;
- private:
- const std::string desc;
- };
- /*****************************************************************************
- * Runner and Example
- *****************************************************************************/
- // collect and ultimately run every Example
- class Runner {
- typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
- public:
- void add( std::string const& desc, std::function< void() > const& func) {
- functions_.push_back( function_list::value_type( desc, func) );
- }
- void run() {
- for ( function_list::value_type const& pair : functions_) {
- Verbose v( pair.first);
- pair.second();
- }
- }
- private:
- function_list functions_;
- };
- Runner runner;
- // Example allows us to embed Runner::add() calls at module scope
- struct Example {
- Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
- runner.add( desc, func);
- }
- };
- /*****************************************************************************
- * example task functions
- *****************************************************************************/
- //[wait_sleeper
- template< typename T >
- T sleeper_impl( T item, int ms, bool thrw = false) {
- std::ostringstream descb, funcb;
- descb << item;
- std::string desc( descb.str() );
- funcb << " sleeper(" << item << ")";
- Verbose v( funcb.str() );
- boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
- if ( thrw) {
- throw std::runtime_error( desc);
- }
- return item;
- }
- //]
- inline
- std::string sleeper( std::string const& item, int ms, bool thrw = false) {
- return sleeper_impl( item, ms, thrw);
- }
- inline
- double sleeper( double item, int ms, bool thrw = false) {
- return sleeper_impl( item, ms, thrw);
- }
- inline
- int sleeper(int item, int ms, bool thrw = false) {
- return sleeper_impl( item, ms, thrw);
- }
- /*****************************************************************************
- * Done
- *****************************************************************************/
- //[wait_done
- // Wrap canonical pattern for condition_variable + bool flag
- struct Done {
- private:
- boost::fibers::condition_variable cond;
- boost::fibers::mutex mutex;
- bool ready = false;
- public:
- typedef std::shared_ptr< Done > ptr;
- void wait() {
- std::unique_lock< boost::fibers::mutex > lock( mutex);
- cond.wait( lock, [this](){ return ready; });
- }
- void notify() {
- {
- std::unique_lock< boost::fibers::mutex > lock( mutex);
- ready = true;
- } // release mutex
- cond.notify_one();
- }
- };
- //]
- /*****************************************************************************
- * when_any, simple completion
- *****************************************************************************/
- //[wait_first_simple_impl
- // Degenerate case: when there are no functions to wait for, return
- // immediately.
- void wait_first_simple_impl( Done::ptr) {
- }
- // When there's at least one function to wait for, launch it and recur to
- // process the rest.
- template< typename Fn, typename ... Fns >
- void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
- boost::fibers::fiber( [done, function](){
- function();
- done->notify();
- }).detach();
- wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
- }
- //]
- // interface function: instantiate Done, launch tasks, wait for Done
- //[wait_first_simple
- template< typename ... Fns >
- void wait_first_simple( Fns && ... functions) {
- // Use shared_ptr because each function's fiber will bind it separately,
- // and we're going to return before the last of them completes.
- auto done( std::make_shared< Done >() );
- wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
- done->wait();
- }
- //]
- // example usage
- Example wfs( runner, "wait_first_simple()", [](){
- //[wait_first_simple_ex
- wait_first_simple(
- [](){ sleeper("wfs_long", 150); },
- [](){ sleeper("wfs_medium", 100); },
- [](){ sleeper("wfs_short", 50); });
- //]
- });
- /*****************************************************************************
- * when_any, return value
- *****************************************************************************/
- // When there's only one function, call this overload
- //[wait_first_value_impl
- template< typename T, typename Fn >
- void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
- Fn && function) {
- boost::fibers::fiber( [chan, function](){
- // Ignore channel_op_status returned by push():
- // might be closed; we simply don't care.
- chan->push( function() );
- }).detach();
- }
- //]
- // When there are two or more functions, call this overload
- template< typename T, typename Fn0, typename Fn1, typename ... Fns >
- void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
- Fn0 && function0,
- Fn1 && function1,
- Fns && ... functions) {
- // process the first function using the single-function overload
- wait_first_value_impl< T >( chan,
- std::forward< Fn0 >( function0) );
- // then recur to process the rest
- wait_first_value_impl< T >( chan,
- std::forward< Fn1 >( function1),
- std::forward< Fns >( functions) ... );
- }
- //[wait_first_value
- // Assume that all passed functions have the same return type. The return type
- // of wait_first_value() is the return type of the first passed function. It is
- // simply invalid to pass NO functions.
- template< typename Fn, typename ... Fns >
- typename std::result_of< Fn() >::type
- wait_first_value( Fn && function, Fns && ... functions) {
- typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::buffered_channel< return_t > channel_t;
- auto chanp( std::make_shared< channel_t >( 64) );
- // launch all the relevant fibers
- wait_first_value_impl< return_t >( chanp,
- std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // retrieve the first value
- return_t value( chanp->value_pop() );
- // close the channel: no subsequent push() has to succeed
- chanp->close();
- return value;
- }
- //]
- // example usage
- Example wfv( runner, "wait_first_value()", [](){
- //[wait_first_value_ex
- std::string result = wait_first_value(
- [](){ return sleeper("wfv_third", 150); },
- [](){ return sleeper("wfv_second", 100); },
- [](){ return sleeper("wfv_first", 50); });
- std::cout << "wait_first_value() => " << result << std::endl;
- assert(result == "wfv_first");
- //]
- });
- /*****************************************************************************
- * when_any, produce first outcome, whether result or exception
- *****************************************************************************/
- // When there's only one function, call this overload.
- //[wait_first_outcome_impl
- template< typename T, typename CHANP, typename Fn >
- void wait_first_outcome_impl( CHANP chan, Fn && function) {
- boost::fibers::fiber(
- // Use std::bind() here for C++11 compatibility. C++11 lambda capture
- // can't move a move-only Fn type, but bind() can. Let bind() move the
- // channel pointer and the function into the bound object, passing
- // references into the lambda.
- std::bind(
- []( CHANP & chan,
- typename std::decay< Fn >::type & function) {
- // Instantiate a packaged_task to capture any exception thrown by
- // function.
- boost::fibers::packaged_task< T() > task( function);
- // Immediately run this packaged_task on same fiber. We want
- // function() to have completed BEFORE we push the future.
- task();
- // Pass the corresponding future to consumer. Ignore
- // channel_op_status returned by push(): might be closed; we
- // simply don't care.
- chan->push( task.get_future() );
- },
- chan,
- std::forward< Fn >( function)
- )).detach();
- }
- //]
- // When there are two or more functions, call this overload
- template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
- void wait_first_outcome_impl( CHANP chan,
- Fn0 && function0,
- Fn1 && function1,
- Fns && ... functions) {
- // process the first function using the single-function overload
- wait_first_outcome_impl< T >( chan,
- std::forward< Fn0 >( function0) );
- // then recur to process the rest
- wait_first_outcome_impl< T >( chan,
- std::forward< Fn1 >( function1),
- std::forward< Fns >( functions) ... );
- }
- // Assume that all passed functions have the same return type. The return type
- // of wait_first_outcome() is the return type of the first passed function. It is
- // simply invalid to pass NO functions.
- //[wait_first_outcome
- template< typename Fn, typename ... Fns >
- typename std::result_of< Fn() >::type
- wait_first_outcome( Fn && function, Fns && ... functions) {
- // In this case, the value we pass through the channel is actually a
- // future -- which is already ready. future can carry either a value or an
- // exception.
- typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::buffered_channel< future_t > channel_t;
- auto chanp(std::make_shared< channel_t >( 64) );
- // launch all the relevant fibers
- wait_first_outcome_impl< return_t >( chanp,
- std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // retrieve the first future
- future_t future( chanp->value_pop() );
- // close the channel: no subsequent push() has to succeed
- chanp->close();
- // either return value or throw exception
- return future.get();
- }
- //]
- // example usage
- Example wfo( runner, "wait_first_outcome()", [](){
- //[wait_first_outcome_ex
- std::string result = wait_first_outcome(
- [](){ return sleeper("wfos_first", 50); },
- [](){ return sleeper("wfos_second", 100); },
- [](){ return sleeper("wfos_third", 150); });
- std::cout << "wait_first_outcome(success) => " << result << std::endl;
- assert(result == "wfos_first");
- std::string thrown;
- try {
- result = wait_first_outcome(
- [](){ return sleeper("wfof_first", 50, true); },
- [](){ return sleeper("wfof_second", 100); },
- [](){ return sleeper("wfof_third", 150); });
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_first_outcome(fail) threw '" << thrown
- << "'" << std::endl;
- assert(thrown == "wfof_first");
- //]
- });
- /*****************************************************************************
- * when_any, collect exceptions until success; throw exception_list if no
- * success
- *****************************************************************************/
- // define an exception to aggregate exception_ptrs; prefer
- // std::exception_list (N4407 et al.) once that becomes available
- //[exception_list
- class exception_list : public std::runtime_error {
- public:
- exception_list( std::string const& what) :
- std::runtime_error( what) {
- }
- typedef std::vector< std::exception_ptr > bundle_t;
- // N4407 proposed std::exception_list API
- typedef bundle_t::const_iterator iterator;
- std::size_t size() const noexcept {
- return bundle_.size();
- }
- iterator begin() const noexcept {
- return bundle_.begin();
- }
- iterator end() const noexcept {
- return bundle_.end();
- }
- // extension to populate
- void add( std::exception_ptr ep) {
- bundle_.push_back( ep);
- }
- private:
- bundle_t bundle_;
- };
- //]
- // Assume that all passed functions have the same return type. The return type
- // of wait_first_success() is the return type of the first passed function. It is
- // simply invalid to pass NO functions.
- //[wait_first_success
- template< typename Fn, typename ... Fns >
- typename std::result_of< Fn() >::type
- wait_first_success( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- // In this case, the value we pass through the channel is actually a
- // future -- which is already ready. future can carry either a value or an
- // exception.
- typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
- typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::buffered_channel< future_t > channel_t;
- auto chanp( std::make_shared< channel_t >( 64) );
- // launch all the relevant fibers
- wait_first_outcome_impl< return_t >( chanp,
- std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // instantiate exception_list, just in case
- exception_list exceptions("wait_first_success() produced only errors");
- // retrieve up to 'count' results -- but stop there!
- for ( std::size_t i = 0; i < count; ++i) {
- // retrieve the next future
- future_t future( chanp->value_pop() );
- // retrieve exception_ptr if any
- std::exception_ptr error( future.get_exception_ptr() );
- // if no error, then yay, return value
- if ( ! error) {
- // close the channel: no subsequent push() has to succeed
- chanp->close();
- // show caller the value we got
- return future.get();
- }
- // error is non-null: collect
- exceptions.add( error);
- }
- // We only arrive here when every passed function threw an exception.
- // Throw our collection to inform caller.
- throw exceptions;
- }
- //]
- // example usage
- Example wfss( runner, "wait_first_success()", [](){
- //[wait_first_success_ex
- std::string result = wait_first_success(
- [](){ return sleeper("wfss_first", 50, true); },
- [](){ return sleeper("wfss_second", 100); },
- [](){ return sleeper("wfss_third", 150); });
- std::cout << "wait_first_success(success) => " << result << std::endl;
- assert(result == "wfss_second");
- //]
- std::string thrown;
- std::size_t count = 0;
- try {
- result = wait_first_success(
- [](){ return sleeper("wfsf_first", 50, true); },
- [](){ return sleeper("wfsf_second", 100, true); },
- [](){ return sleeper("wfsf_third", 150, true); });
- } catch ( exception_list const& e) {
- thrown = e.what();
- count = e.size();
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_first_success(fail) threw '" << thrown << "': "
- << count << " errors" << std::endl;
- assert(thrown == "wait_first_success() produced only errors");
- assert(count == 3);
- });
- /*****************************************************************************
- * when_any, heterogeneous
- *****************************************************************************/
- //[wait_first_value_het
- // No need to break out the first Fn for interface function: let the compiler
- // complain if empty.
- // Our functions have different return types, and we might have to return any
- // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
- // parameter pack.
- template< typename ... Fns >
- boost::variant< typename std::result_of< Fns() >::type ... >
- wait_first_value_het( Fns && ... functions) {
- // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
- typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
- typedef boost::fibers::buffered_channel< return_t > channel_t;
- auto chanp( std::make_shared< channel_t >( 64) );
- // launch all the relevant fibers
- wait_first_value_impl< return_t >( chanp,
- std::forward< Fns >( functions) ... );
- // retrieve the first value
- return_t value( chanp->value_pop() );
- // close the channel: no subsequent push() has to succeed
- chanp->close();
- return value;
- }
- //]
- // example usage
- Example wfvh( runner, "wait_first_value_het()", [](){
- //[wait_first_value_het_ex
- boost::variant< std::string, double, int > result =
- wait_first_value_het(
- [](){ return sleeper("wfvh_third", 150); },
- [](){ return sleeper(3.14, 100); },
- [](){ return sleeper(17, 50); });
- std::cout << "wait_first_value_het() => " << result << std::endl;
- assert(boost::get< int >( result) == 17);
- //]
- });
- /*****************************************************************************
- * when_all, simple completion
- *****************************************************************************/
- // Degenerate case: when there are no functions to wait for, return
- // immediately.
- void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
- }
- // When there's at least one function to wait for, launch it and recur to
- // process the rest.
- //[wait_all_simple_impl
- template< typename Fn, typename ... Fns >
- void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
- Fn && function, Fns && ... functions) {
- boost::fibers::fiber(
- std::bind(
- []( std::shared_ptr< boost::fibers::barrier > & barrier,
- typename std::decay< Fn >::type & function) mutable {
- function();
- barrier->wait();
- },
- barrier,
- std::forward< Fn >( function)
- )).detach();
- wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
- }
- //]
- // interface function: instantiate barrier, launch tasks, wait for barrier
- //[wait_all_simple
- template< typename ... Fns >
- void wait_all_simple( Fns && ... functions) {
- std::size_t count( sizeof ... ( functions) );
- // Initialize a barrier(count+1) because we'll immediately wait on it. We
- // don't want to wake up until 'count' more fibers wait on it. Even though
- // we'll stick around until the last of them completes, use shared_ptr
- // anyway because it's easier to be confident about lifespan issues.
- auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
- wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
- barrier->wait();
- }
- //]
- // example usage
- Example was( runner, "wait_all_simple()", [](){
- //[wait_all_simple_ex
- wait_all_simple(
- [](){ sleeper("was_long", 150); },
- [](){ sleeper("was_medium", 100); },
- [](){ sleeper("was_short", 50); });
- //]
- });
- /*****************************************************************************
- * when_all, return values
- *****************************************************************************/
- //[wait_nchannel
- // Introduce a channel facade that closes the channel once a specific number
- // of items has been pushed. This allows an arbitrary consumer to read until
- // 'closed' without itself having to count items.
- template< typename T >
- class nchannel {
- public:
- nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
- std::size_t lm):
- chan_( chan),
- limit_( lm) {
- assert(chan_);
- if ( 0 == limit_) {
- chan_->close();
- }
- }
- boost::fibers::channel_op_status push( T && va) {
- boost::fibers::channel_op_status ok =
- chan_->push( std::forward< T >( va) );
- if ( ok == boost::fibers::channel_op_status::success &&
- --limit_ == 0) {
- // after the 'limit_'th successful push, close the channel
- chan_->close();
- }
- return ok;
- }
- private:
- std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
- std::size_t limit_;
- };
- //]
- // When there's only one function, call this overload
- //[wait_all_values_impl
- template< typename T, typename Fn >
- void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
- Fn && function) {
- boost::fibers::fiber( [chan, function](){
- chan->push(function());
- }).detach();
- }
- //]
- // When there are two or more functions, call this overload
- template< typename T, typename Fn0, typename Fn1, typename ... Fns >
- void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
- Fn0 && function0,
- Fn1 && function1,
- Fns && ... functions) {
- // process the first function using the single-function overload
- wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
- // then recur to process the rest
- wait_all_values_impl< T >( chan,
- std::forward< Fn1 >( function1),
- std::forward< Fns >( functions) ... );
- }
- //[wait_all_values_source
- // Return a shared_ptr<buffered_channel<T>> from which the caller can
- // retrieve each new result as it arrives, until 'closed'.
- template< typename Fn, typename ... Fns >
- std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
- wait_all_values_source( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::buffered_channel< return_t > channel_t;
- // make the channel
- auto chanp( std::make_shared< channel_t >( 64) );
- // and make an nchannel facade to close it after 'count' items
- auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
- // pass that nchannel facade to all the relevant fibers
- wait_all_values_impl< return_t >( ncp,
- std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // then return the channel for consumer
- return chanp;
- }
- //]
- // When all passed functions have completed, return vector<T> containing
- // collected results. Assume that all passed functions have the same return
- // type. It is simply invalid to pass NO functions.
- //[wait_all_values
- template< typename Fn, typename ... Fns >
- std::vector< typename std::result_of< Fn() >::type >
- wait_all_values( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- typedef typename std::result_of< Fn() >::type return_t;
- typedef std::vector< return_t > vector_t;
- vector_t results;
- results.reserve( count);
- // get channel
- std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
- wait_all_values_source( std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // fill results vector
- return_t value;
- while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
- results.push_back( value);
- }
- // return vector to caller
- return results;
- }
- //]
- Example wav( runner, "wait_all_values()", [](){
- //[wait_all_values_source_ex
- std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
- wait_all_values_source(
- [](){ return sleeper("wavs_third", 150); },
- [](){ return sleeper("wavs_second", 100); },
- [](){ return sleeper("wavs_first", 50); });
- std::string value;
- while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
- std::cout << "wait_all_values_source() => '" << value
- << "'" << std::endl;
- }
- //]
- //[wait_all_values_ex
- std::vector< std::string > values =
- wait_all_values(
- [](){ return sleeper("wav_late", 150); },
- [](){ return sleeper("wav_middle", 100); },
- [](){ return sleeper("wav_early", 50); });
- //]
- std::cout << "wait_all_values() =>";
- for ( std::string const& v : values) {
- std::cout << " '" << v << "'";
- }
- std::cout << std::endl;
- });
- /*****************************************************************************
- * when_all, throw first exception
- *****************************************************************************/
- //[wait_all_until_error_source
- // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
- // get() each new result as it arrives, until 'closed'.
- template< typename Fn, typename ... Fns >
- std::shared_ptr<
- boost::fibers::buffered_channel<
- boost::fibers::future<
- typename std::result_of< Fn() >::type > > >
- wait_all_until_error_source( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::buffered_channel< future_t > channel_t;
- // make the channel
- auto chanp( std::make_shared< channel_t >( 64) );
- // and make an nchannel facade to close it after 'count' items
- auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
- // pass that nchannel facade to all the relevant fibers
- wait_first_outcome_impl< return_t >( ncp,
- std::forward< Fn >( function),
- std::forward< Fns >( functions) ... );
- // then return the channel for consumer
- return chanp;
- }
- //]
- // When all passed functions have completed, return vector<T> containing
- // collected results, or throw the first exception thrown by any of the passed
- // functions. Assume that all passed functions have the same return type. It
- // is simply invalid to pass NO functions.
- //[wait_all_until_error
- template< typename Fn, typename ... Fns >
- std::vector< typename std::result_of< Fn() >::type >
- wait_all_until_error( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- typedef typename std::result_of< Fn() >::type return_t;
- typedef typename boost::fibers::future< return_t > future_t;
- typedef std::vector< return_t > vector_t;
- vector_t results;
- results.reserve( count);
- // get channel
- std::shared_ptr<
- boost::fibers::buffered_channel< future_t > > chan(
- wait_all_until_error_source( std::forward< Fn >( function),
- std::forward< Fns >( functions) ... ) );
- // fill results vector
- future_t future;
- while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
- results.push_back( future.get() );
- }
- // return vector to caller
- return results;
- }
- //]
- Example waue( runner, "wait_all_until_error()", [](){
- //[wait_all_until_error_source_ex
- typedef boost::fibers::future< std::string > future_t;
- std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
- wait_all_until_error_source(
- [](){ return sleeper("wauess_third", 150); },
- [](){ return sleeper("wauess_second", 100); },
- [](){ return sleeper("wauess_first", 50); });
- future_t future;
- while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
- std::string value( future.get() );
- std::cout << "wait_all_until_error_source(success) => '" << value
- << "'" << std::endl;
- }
- //]
- chan = wait_all_until_error_source(
- [](){ return sleeper("wauesf_third", 150); },
- [](){ return sleeper("wauesf_second", 100, true); },
- [](){ return sleeper("wauesf_first", 50); });
- //[wait_all_until_error_ex
- std::string thrown;
- //<-
- try {
- while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
- std::string value( future.get() );
- std::cout << "wait_all_until_error_source(fail) => '" << value
- << "'" << std::endl;
- }
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_all_until_error_source(fail) threw '" << thrown
- << "'" << std::endl;
- thrown.clear();
- //->
- try {
- std::vector< std::string > values = wait_all_until_error(
- [](){ return sleeper("waue_late", 150); },
- [](){ return sleeper("waue_middle", 100, true); },
- [](){ return sleeper("waue_early", 50); });
- //<-
- std::cout << "wait_all_until_error(fail) =>";
- for ( std::string const& v : values) {
- std::cout << " '" << v << "'";
- }
- std::cout << std::endl;
- //->
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_all_until_error(fail) threw '" << thrown
- << "'" << std::endl;
- //]
- });
- /*****************************************************************************
- * when_all, collect exceptions
- *****************************************************************************/
- // When all passed functions have succeeded, return vector<T> containing
- // collected results, or throw exception_list containing all exceptions thrown
- // by any of the passed functions. Assume that all passed functions have the
- // same return type. It is simply invalid to pass NO functions.
- //[wait_all_collect_errors
- template< typename Fn, typename ... Fns >
- std::vector< typename std::result_of< Fn() >::type >
- wait_all_collect_errors( Fn && function, Fns && ... functions) {
- std::size_t count( 1 + sizeof ... ( functions) );
- typedef typename std::result_of< Fn() >::type return_t;
- typedef typename boost::fibers::future< return_t > future_t;
- typedef std::vector< return_t > vector_t;
- vector_t results;
- results.reserve( count);
- exception_list exceptions("wait_all_collect_errors() exceptions");
- // get channel
- std::shared_ptr<
- boost::fibers::buffered_channel< future_t > > chan(
- wait_all_until_error_source( std::forward< Fn >( function),
- std::forward< Fns >( functions) ... ) );
- // fill results and/or exceptions vectors
- future_t future;
- while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
- std::exception_ptr exp = future.get_exception_ptr();
- if ( ! exp) {
- results.push_back( future.get() );
- } else {
- exceptions.add( exp);
- }
- }
- // if there were any exceptions, throw
- if ( exceptions.size() ) {
- throw exceptions;
- }
- // no exceptions: return vector to caller
- return results;
- }
- //]
- Example wace( runner, "wait_all_collect_errors()", [](){
- std::vector< std::string > values = wait_all_collect_errors(
- [](){ return sleeper("waces_late", 150); },
- [](){ return sleeper("waces_middle", 100); },
- [](){ return sleeper("waces_early", 50); });
- std::cout << "wait_all_collect_errors(success) =>";
- for ( std::string const& v : values) {
- std::cout << " '" << v << "'";
- }
- std::cout << std::endl;
- std::string thrown;
- std::size_t errors = 0;
- try {
- values = wait_all_collect_errors(
- [](){ return sleeper("wacef_late", 150, true); },
- [](){ return sleeper("wacef_middle", 100, true); },
- [](){ return sleeper("wacef_early", 50); });
- std::cout << "wait_all_collect_errors(fail) =>";
- for ( std::string const& v : values) {
- std::cout << " '" << v << "'";
- }
- std::cout << std::endl;
- } catch ( exception_list const& e) {
- thrown = e.what();
- errors = e.size();
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_all_collect_errors(fail) threw '" << thrown
- << "': " << errors << " errors" << std::endl;
- });
- /*****************************************************************************
- * when_all, heterogeneous
- *****************************************************************************/
- //[wait_all_members_get
- template< typename Result, typename ... Futures >
- Result wait_all_members_get( Futures && ... futures) {
- // Fetch the results from the passed futures into Result's initializer
- // list. It's true that the get() calls here will block the implicit
- // iteration over futures -- but that doesn't matter because we won't be
- // done until the slowest of them finishes anyway. As results are
- // processed in argument-list order rather than order of completion, the
- // leftmost get() to throw an exception will cause that exception to
- // propagate to the caller.
- return Result{ futures.get() ... };
- }
- //]
- //[wait_all_members
- // Explicitly pass Result. This can be any type capable of being initialized
- // from the results of the passed functions, such as a struct.
- template< typename Result, typename ... Fns >
- Result wait_all_members( Fns && ... functions) {
- // Run each of the passed functions on a separate fiber, passing all their
- // futures to helper function for processing.
- return wait_all_members_get< Result >(
- boost::fibers::async( std::forward< Fns >( functions) ) ... );
- }
- //]
- // used by following example
- //[wait_Data
- struct Data {
- std::string str;
- double inexact;
- int exact;
- friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
- ...*/
- //<-
- {
- return out << "Data{str='" << data.str << "', inexact=" << data.inexact
- << ", exact=" << data.exact << "}";
- }
- //->
- };
- //]
- // example usage
- Example wam( runner, "wait_all_members()", [](){
- //[wait_all_members_data_ex
- Data data = wait_all_members< Data >(
- [](){ return sleeper("wams_left", 100); },
- [](){ return sleeper(3.14, 150); },
- [](){ return sleeper(17, 50); });
- std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
- //]
- std::string thrown;
- try {
- data = wait_all_members< Data >(
- [](){ return sleeper("wamf_left", 100, true); },
- [](){ return sleeper(3.14, 150); },
- [](){ return sleeper(17, 50, true); });
- std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
- } catch ( std::exception const& e) {
- thrown = e.what();
- }
- std::cout << "wait_all_members<Data>(fail) threw '" << thrown
- << '"' << std::endl;
- //[wait_all_members_vector_ex
- // If we don't care about obtaining results as soon as they arrive, and we
- // prefer a result vector in passed argument order rather than completion
- // order, wait_all_members() is another possible implementation of
- // wait_all_until_error().
- auto strings = wait_all_members< std::vector< std::string > >(
- [](){ return sleeper("wamv_left", 150); },
- [](){ return sleeper("wamv_middle", 100); },
- [](){ return sleeper("wamv_right", 50); });
- std::cout << "wait_all_members<vector>() =>";
- for ( std::string const& str : strings) {
- std::cout << " '" << str << "'";
- }
- std::cout << std::endl;
- //]
- });
- /*****************************************************************************
- * main()
- *****************************************************************************/
- int main( int argc, char *argv[]) {
- runner.run();
- std::cout << "done." << std::endl;
- return EXIT_SUCCESS;
- }
|