123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- // Copyright Nat Goodspeed 2014.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #include <chrono>
- #include <condition_variable>
- #include <iostream>
- #include <mutex>
- #include <algorithm> // std::find_if()
- #include <boost/fiber/all.hpp>
- #include <boost/fiber/scheduler.hpp>
- class Verbose {
- public:
- Verbose( std::string const& d, std::string const& s="stop") :
- desc( d),
- stop( s) {
- std::cout << desc << " start" << std::endl;
- }
- ~Verbose() {
- std::cout << desc << ' ' << stop << std::endl;
- }
- Verbose( Verbose const&) = delete;
- Verbose & operator=( Verbose const&) = delete;
- private:
- std::string desc;
- std::string stop;
- };
- //[priority_props
- class priority_props : public boost::fibers::fiber_properties {
- public:
- priority_props( boost::fibers::context * ctx):
- fiber_properties( ctx), /*< Your subclass constructor must accept a
- [^[class_link context]*] and pass it to
- the `fiber_properties` constructor. >*/
- priority_( 0) {
- }
- int get_priority() const {
- return priority_; /*< Provide read access methods at your own discretion. >*/
- }
- // Call this method to alter priority, because we must notify
- // priority_scheduler of any change.
- void set_priority( int p) { /*<
- It's important to call `notify()` on any
- change in a property that can affect the
- scheduler's behavior. Therefore, such
- modifications should only be performed
- through an access method. >*/
- // Of course, it's only worth reshuffling the queue and all if we're
- // actually changing the priority.
- if ( p != priority_) {
- priority_ = p;
- notify();
- }
- }
- // The fiber name of course is solely for purposes of this example
- // program; it has nothing to do with implementing scheduler priority.
- // This is a public data member -- not requiring set/get access methods --
- // because we need not inform the scheduler of any change.
- std::string name; /*< A property that does not affect the scheduler does
- not need access methods. >*/
- private:
- int priority_;
- };
- //]
- //[priority_scheduler
- class priority_scheduler :
- public boost::fibers::algo::algorithm_with_properties< priority_props > {
- private:
- typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/ rqueue_t;
- rqueue_t rqueue_;
- std::mutex mtx_{};
- std::condition_variable cnd_{};
- bool flag_{ false };
- public:
- priority_scheduler() :
- rqueue_() {
- }
- // For a subclass of algorithm_with_properties<>, it's important to
- // override the correct awakened() overload.
- /*<< You must override the [member_link algorithm_with_properties..awakened]
- method. This is how your scheduler receives notification of a
- fiber that has become ready to run. >>*/
- virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept {
- int ctx_priority = props.get_priority(); /*< `props` is the instance of
- priority_props associated
- with the passed fiber `ctx`. >*/
- // With this scheduler, fibers with higher priority values are
- // preferred over fibers with lower priority values. But fibers with
- // equal priority values are processed in round-robin fashion. So when
- // we're handed a new context*, put it at the end of the fibers
- // with that same priority. In other words: search for the first fiber
- // in the queue with LOWER priority, and insert before that one.
- rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(),
- [ctx_priority,this]( boost::fibers::context & c)
- { return properties( &c ).get_priority() < ctx_priority; }));
- // Now, whether or not we found a fiber with lower priority,
- // insert this new fiber here.
- rqueue_.insert( i, * ctx);
- //<-
- std::cout << "awakened(" << props.name << "): ";
- describe_ready_queue();
- //->
- }
- /*<< You must override the [member_link algorithm_with_properties..pick_next]
- method. This is how your scheduler actually advises the fiber manager
- of the next fiber to run. >>*/
- virtual boost::fibers::context * pick_next() noexcept {
- // if ready queue is empty, just tell caller
- if ( rqueue_.empty() ) {
- return nullptr;
- }
- boost::fibers::context * ctx( & rqueue_.front() );
- rqueue_.pop_front();
- //<-
- std::cout << "pick_next() resuming " << properties( ctx).name << ": ";
- describe_ready_queue();
- //->
- return ctx;
- }
- /*<< You must override [member_link algorithm_with_properties..has_ready_fibers]
- to inform the fiber manager of the state of your ready queue. >>*/
- virtual bool has_ready_fibers() const noexcept {
- return ! rqueue_.empty();
- }
- /*<< Overriding [member_link algorithm_with_properties..property_change]
- is optional. This override handles the case in which the running
- fiber changes the priority of another ready fiber: a fiber already in
- our queue. In that case, move the updated fiber within the queue. >>*/
- virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept {
- // Although our priority_props class defines multiple properties, only
- // one of them (priority) actually calls notify() when changed. The
- // point of a property_change() override is to reshuffle the ready
- // queue according to the updated priority value.
- //<-
- std::cout << "property_change(" << props.name << '(' << props.get_priority()
- << ")): ";
- //->
- // 'ctx' might not be in our queue at all, if caller is changing the
- // priority of (say) the running fiber. If it's not there, no need to
- // move it: we'll handle it next time it hits awakened().
- if ( ! ctx->ready_is_linked()) { /*<
- Your `property_change()` override must be able to
- handle the case in which the passed `ctx` is not in
- your ready queue. It might be running, or it might be
- blocked. >*/
- //<-
- // hopefully user will distinguish this case by noticing that
- // the fiber with which we were called does not appear in the
- // ready queue at all
- describe_ready_queue();
- //->
- return;
- }
- // Found ctx: unlink it
- ctx->ready_unlink();
- // Here we know that ctx was in our ready queue, but we've unlinked
- // it. We happen to have a method that will (re-)add a context* to the
- // right place in the ready queue.
- awakened( ctx, props);
- }
- //<-
- void describe_ready_queue() {
- if ( rqueue_.empty() ) {
- std::cout << "[empty]";
- } else {
- const char * delim = "";
- for ( boost::fibers::context & ctx : rqueue_) {
- priority_props & props( properties( & ctx) );
- std::cout << delim << props.name << '(' << props.get_priority() << ')';
- delim = ", ";
- }
- }
- std::cout << std::endl;
- }
- //->
- void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
- if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
- std::unique_lock< std::mutex > lk( mtx_);
- cnd_.wait( lk, [this](){ return flag_; });
- flag_ = false;
- } else {
- std::unique_lock< std::mutex > lk( mtx_);
- cnd_.wait_until( lk, time_point, [this](){ return flag_; });
- flag_ = false;
- }
- }
- void notify() noexcept {
- std::unique_lock< std::mutex > lk( mtx_);
- flag_ = true;
- lk.unlock();
- cnd_.notify_all();
- }
- };
- //]
- //[launch
- template< typename Fn >
- boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) {
- boost::fibers::fiber fiber( func);
- priority_props & props( fiber.properties< priority_props >() );
- props.name = name;
- props.set_priority( priority);
- return fiber;
- }
- //]
- void yield_fn() {
- std::string name( boost::this_fiber::properties< priority_props >().name);
- Verbose v( std::string("fiber ") + name);
- for ( int i = 0; i < 3; ++i) {
- std::cout << "fiber " << name << " yielding" << std::endl;
- boost::this_fiber::yield();
- }
- }
- void barrier_fn( boost::fibers::barrier & barrier) {
- std::string name( boost::this_fiber::properties< priority_props >().name);
- Verbose v( std::string("fiber ") + name);
- std::cout << "fiber " << name << " waiting on barrier" << std::endl;
- barrier.wait();
- std::cout << "fiber " << name << " yielding" << std::endl;
- boost::this_fiber::yield();
- }
- //[change_fn
- void change_fn( boost::fibers::fiber & other,
- int other_priority,
- boost::fibers::barrier& barrier) {
- std::string name( boost::this_fiber::properties< priority_props >().name);
- Verbose v( std::string("fiber ") + name);
- //<-
- std::cout << "fiber " << name << " waiting on barrier" << std::endl;
- //->
- barrier.wait();
- // We assume a couple things about 'other':
- // - that it was also waiting on the same barrier
- // - that it has lower priority than this fiber.
- // If both are true, 'other' is now ready to run but is sitting in
- // priority_scheduler's ready queue. Change its priority.
- priority_props & other_props(
- other.properties< priority_props >() );
- //<-
- std::cout << "fiber " << name << " changing priority of " << other_props.name
- << " to " << other_priority << std::endl;
- //->
- other_props.set_priority( other_priority);
- }
- //]
- //[main
- int main( int argc, char *argv[]) {
- // make sure we use our priority_scheduler rather than default round_robin
- boost::fibers::use_scheduling_algorithm< priority_scheduler >();
- /*= ...*/
- /*=}*/
- //]
- Verbose v("main()");
- // for clarity
- std::cout << "main() setting name" << std::endl;
- //[main_name
- boost::this_fiber::properties< priority_props >().name = "main";
- //]
- std::cout << "main() running tests" << std::endl;
- {
- Verbose v("high-priority first", "stop\n");
- // verify that high-priority fiber always gets scheduled first
- boost::fibers::fiber low( launch( yield_fn, "low", 1) );
- boost::fibers::fiber med( launch( yield_fn, "medium", 2) );
- boost::fibers::fiber hi( launch( yield_fn, "high", 3) );
- std::cout << "main: high.join()" << std::endl;
- hi.join();
- std::cout << "main: medium.join()" << std::endl;
- med.join();
- std::cout << "main: low.join()" << std::endl;
- low.join();
- }
- {
- Verbose v("same priority round-robin", "stop\n");
- // fibers of same priority are scheduled in round-robin order
- boost::fibers::fiber a( launch( yield_fn, "a", 0) );
- boost::fibers::fiber b( launch( yield_fn, "b", 0) );
- boost::fibers::fiber c( launch( yield_fn, "c", 0) );
- std::cout << "main: a.join()" << std::endl;
- a.join();
- std::cout << "main: b.join()" << std::endl;
- b.join();
- std::cout << "main: c.join()" << std::endl;
- c.join();
- }
- {
- Verbose v("barrier wakes up all", "stop\n");
- // using a barrier wakes up all waiting fibers at the same time
- boost::fibers::barrier barrier( 3);
- boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) );
- boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) );
- boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) );
- std::cout << "main: low.join()" << std::endl;
- low.join();
- std::cout << "main: medium.join()" << std::endl;
- med.join();
- std::cout << "main: high.join()" << std::endl;
- hi.join();
- }
- {
- Verbose v("change priority", "stop\n");
- // change priority of a fiber in priority_scheduler's ready queue
- boost::fibers::barrier barrier( 3);
- boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) );
- boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) );
- boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) );
- std::cout << "main: a.join()" << std::endl;
- std::cout << "main: a.join()" << std::endl;
- a.join();
- std::cout << "main: b.join()" << std::endl;
- b.join();
- std::cout << "main: c.join()" << std::endl;
- c.join();
- }
- std::cout << "done." << std::endl;
- return EXIT_SUCCESS;
- }
|