// Copyright Nat Goodspeed 2014. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include #include #include #include #include // std::find_if() #include #include class Verbose { public: Verbose( std::string const& d, std::string const& s="stop") : desc( d), stop( s) { std::cout << desc << " start" << std::endl; } ~Verbose() { std::cout << desc << ' ' << stop << std::endl; } Verbose( Verbose const&) = delete; Verbose & operator=( Verbose const&) = delete; private: std::string desc; std::string stop; }; //[priority_props class priority_props : public boost::fibers::fiber_properties { public: priority_props( boost::fibers::context * ctx): fiber_properties( ctx), /*< Your subclass constructor must accept a [^[class_link context]*] and pass it to the `fiber_properties` constructor. >*/ priority_( 0) { } int get_priority() const { return priority_; /*< Provide read access methods at your own discretion. >*/ } // Call this method to alter priority, because we must notify // priority_scheduler of any change. void set_priority( int p) { /*< It's important to call `notify()` on any change in a property that can affect the scheduler's behavior. Therefore, such modifications should only be performed through an access method. >*/ // Of course, it's only worth reshuffling the queue and all if we're // actually changing the priority. if ( p != priority_) { priority_ = p; notify(); } } // The fiber name of course is solely for purposes of this example // program; it has nothing to do with implementing scheduler priority. // This is a public data member -- not requiring set/get access methods -- // because we need not inform the scheduler of any change. std::string name; /*< A property that does not affect the scheduler does not need access methods. >*/ private: int priority_; }; //] //[priority_scheduler class priority_scheduler : public boost::fibers::algo::algorithm_with_properties< priority_props > { private: typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/ rqueue_t; rqueue_t rqueue_; std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; public: priority_scheduler() : rqueue_() { } // For a subclass of algorithm_with_properties<>, it's important to // override the correct awakened() overload. /*<< You must override the [member_link algorithm_with_properties..awakened] method. This is how your scheduler receives notification of a fiber that has become ready to run. >>*/ virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept { int ctx_priority = props.get_priority(); /*< `props` is the instance of priority_props associated with the passed fiber `ctx`. >*/ // With this scheduler, fibers with higher priority values are // preferred over fibers with lower priority values. But fibers with // equal priority values are processed in round-robin fashion. So when // we're handed a new context*, put it at the end of the fibers // with that same priority. In other words: search for the first fiber // in the queue with LOWER priority, and insert before that one. rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(), [ctx_priority,this]( boost::fibers::context & c) { return properties( &c ).get_priority() < ctx_priority; })); // Now, whether or not we found a fiber with lower priority, // insert this new fiber here. rqueue_.insert( i, * ctx); //<- std::cout << "awakened(" << props.name << "): "; describe_ready_queue(); //-> } /*<< You must override the [member_link algorithm_with_properties..pick_next] method. This is how your scheduler actually advises the fiber manager of the next fiber to run. >>*/ virtual boost::fibers::context * pick_next() noexcept { // if ready queue is empty, just tell caller if ( rqueue_.empty() ) { return nullptr; } boost::fibers::context * ctx( & rqueue_.front() ); rqueue_.pop_front(); //<- std::cout << "pick_next() resuming " << properties( ctx).name << ": "; describe_ready_queue(); //-> return ctx; } /*<< You must override [member_link algorithm_with_properties..has_ready_fibers] to inform the fiber manager of the state of your ready queue. >>*/ virtual bool has_ready_fibers() const noexcept { return ! rqueue_.empty(); } /*<< Overriding [member_link algorithm_with_properties..property_change] is optional. This override handles the case in which the running fiber changes the priority of another ready fiber: a fiber already in our queue. In that case, move the updated fiber within the queue. >>*/ virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept { // Although our priority_props class defines multiple properties, only // one of them (priority) actually calls notify() when changed. The // point of a property_change() override is to reshuffle the ready // queue according to the updated priority value. //<- std::cout << "property_change(" << props.name << '(' << props.get_priority() << ")): "; //-> // 'ctx' might not be in our queue at all, if caller is changing the // priority of (say) the running fiber. If it's not there, no need to // move it: we'll handle it next time it hits awakened(). if ( ! ctx->ready_is_linked()) { /*< Your `property_change()` override must be able to handle the case in which the passed `ctx` is not in your ready queue. It might be running, or it might be blocked. >*/ //<- // hopefully user will distinguish this case by noticing that // the fiber with which we were called does not appear in the // ready queue at all describe_ready_queue(); //-> return; } // Found ctx: unlink it ctx->ready_unlink(); // Here we know that ctx was in our ready queue, but we've unlinked // it. We happen to have a method that will (re-)add a context* to the // right place in the ready queue. awakened( ctx, props); } //<- void describe_ready_queue() { if ( rqueue_.empty() ) { std::cout << "[empty]"; } else { const char * delim = ""; for ( boost::fibers::context & ctx : rqueue_) { priority_props & props( properties( & ctx) ); std::cout << delim << props.name << '(' << props.get_priority() << ')'; delim = ", "; } } std::cout << std::endl; } //-> void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept { if ( (std::chrono::steady_clock::time_point::max)() == time_point) { std::unique_lock< std::mutex > lk( mtx_); cnd_.wait( lk, [this](){ return flag_; }); flag_ = false; } else { std::unique_lock< std::mutex > lk( mtx_); cnd_.wait_until( lk, time_point, [this](){ return flag_; }); flag_ = false; } } void notify() noexcept { std::unique_lock< std::mutex > lk( mtx_); flag_ = true; lk.unlock(); cnd_.notify_all(); } }; //] //[launch template< typename Fn > boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) { boost::fibers::fiber fiber( func); priority_props & props( fiber.properties< priority_props >() ); props.name = name; props.set_priority( priority); return fiber; } //] void yield_fn() { std::string name( boost::this_fiber::properties< priority_props >().name); Verbose v( std::string("fiber ") + name); for ( int i = 0; i < 3; ++i) { std::cout << "fiber " << name << " yielding" << std::endl; boost::this_fiber::yield(); } } void barrier_fn( boost::fibers::barrier & barrier) { std::string name( boost::this_fiber::properties< priority_props >().name); Verbose v( std::string("fiber ") + name); std::cout << "fiber " << name << " waiting on barrier" << std::endl; barrier.wait(); std::cout << "fiber " << name << " yielding" << std::endl; boost::this_fiber::yield(); } //[change_fn void change_fn( boost::fibers::fiber & other, int other_priority, boost::fibers::barrier& barrier) { std::string name( boost::this_fiber::properties< priority_props >().name); Verbose v( std::string("fiber ") + name); //<- std::cout << "fiber " << name << " waiting on barrier" << std::endl; //-> barrier.wait(); // We assume a couple things about 'other': // - that it was also waiting on the same barrier // - that it has lower priority than this fiber. // If both are true, 'other' is now ready to run but is sitting in // priority_scheduler's ready queue. Change its priority. priority_props & other_props( other.properties< priority_props >() ); //<- std::cout << "fiber " << name << " changing priority of " << other_props.name << " to " << other_priority << std::endl; //-> other_props.set_priority( other_priority); } //] //[main int main( int argc, char *argv[]) { // make sure we use our priority_scheduler rather than default round_robin boost::fibers::use_scheduling_algorithm< priority_scheduler >(); /*= ...*/ /*=}*/ //] Verbose v("main()"); // for clarity std::cout << "main() setting name" << std::endl; //[main_name boost::this_fiber::properties< priority_props >().name = "main"; //] std::cout << "main() running tests" << std::endl; { Verbose v("high-priority first", "stop\n"); // verify that high-priority fiber always gets scheduled first boost::fibers::fiber low( launch( yield_fn, "low", 1) ); boost::fibers::fiber med( launch( yield_fn, "medium", 2) ); boost::fibers::fiber hi( launch( yield_fn, "high", 3) ); std::cout << "main: high.join()" << std::endl; hi.join(); std::cout << "main: medium.join()" << std::endl; med.join(); std::cout << "main: low.join()" << std::endl; low.join(); } { Verbose v("same priority round-robin", "stop\n"); // fibers of same priority are scheduled in round-robin order boost::fibers::fiber a( launch( yield_fn, "a", 0) ); boost::fibers::fiber b( launch( yield_fn, "b", 0) ); boost::fibers::fiber c( launch( yield_fn, "c", 0) ); std::cout << "main: a.join()" << std::endl; a.join(); std::cout << "main: b.join()" << std::endl; b.join(); std::cout << "main: c.join()" << std::endl; c.join(); } { Verbose v("barrier wakes up all", "stop\n"); // using a barrier wakes up all waiting fibers at the same time boost::fibers::barrier barrier( 3); boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) ); boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) ); boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) ); std::cout << "main: low.join()" << std::endl; low.join(); std::cout << "main: medium.join()" << std::endl; med.join(); std::cout << "main: high.join()" << std::endl; hi.join(); } { Verbose v("change priority", "stop\n"); // change priority of a fiber in priority_scheduler's ready queue boost::fibers::barrier barrier( 3); boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) ); boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) ); boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) ); std::cout << "main: a.join()" << std::endl; std::cout << "main: a.join()" << std::endl; a.join(); std::cout << "main: b.join()" << std::endl; b.join(); std::cout << "main: c.join()" << std::endl; c.join(); } std::cout << "done." << std::endl; return EXIT_SUCCESS; }