priority.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. // Copyright Nat Goodspeed 2014.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #include <chrono>
  6. #include <condition_variable>
  7. #include <iostream>
  8. #include <mutex>
  9. #include <algorithm> // std::find_if()
  10. #include <boost/fiber/all.hpp>
  11. #include <boost/fiber/scheduler.hpp>
  12. class Verbose {
  13. public:
  14. Verbose( std::string const& d, std::string const& s="stop") :
  15. desc( d),
  16. stop( s) {
  17. std::cout << desc << " start" << std::endl;
  18. }
  19. ~Verbose() {
  20. std::cout << desc << ' ' << stop << std::endl;
  21. }
  22. Verbose( Verbose const&) = delete;
  23. Verbose & operator=( Verbose const&) = delete;
  24. private:
  25. std::string desc;
  26. std::string stop;
  27. };
  28. //[priority_props
  29. class priority_props : public boost::fibers::fiber_properties {
  30. public:
  31. priority_props( boost::fibers::context * ctx):
  32. fiber_properties( ctx), /*< Your subclass constructor must accept a
  33. [^[class_link context]*] and pass it to
  34. the `fiber_properties` constructor. >*/
  35. priority_( 0) {
  36. }
  37. int get_priority() const {
  38. return priority_; /*< Provide read access methods at your own discretion. >*/
  39. }
  40. // Call this method to alter priority, because we must notify
  41. // priority_scheduler of any change.
  42. void set_priority( int p) { /*<
  43. It's important to call `notify()` on any
  44. change in a property that can affect the
  45. scheduler's behavior. Therefore, such
  46. modifications should only be performed
  47. through an access method. >*/
  48. // Of course, it's only worth reshuffling the queue and all if we're
  49. // actually changing the priority.
  50. if ( p != priority_) {
  51. priority_ = p;
  52. notify();
  53. }
  54. }
  55. // The fiber name of course is solely for purposes of this example
  56. // program; it has nothing to do with implementing scheduler priority.
  57. // This is a public data member -- not requiring set/get access methods --
  58. // because we need not inform the scheduler of any change.
  59. std::string name; /*< A property that does not affect the scheduler does
  60. not need access methods. >*/
  61. private:
  62. int priority_;
  63. };
  64. //]
  65. //[priority_scheduler
  66. class priority_scheduler :
  67. public boost::fibers::algo::algorithm_with_properties< priority_props > {
  68. private:
  69. typedef boost::fibers::scheduler::ready_queue_type/*< See [link ready_queue_t]. >*/ rqueue_t;
  70. rqueue_t rqueue_;
  71. std::mutex mtx_{};
  72. std::condition_variable cnd_{};
  73. bool flag_{ false };
  74. public:
  75. priority_scheduler() :
  76. rqueue_() {
  77. }
  78. // For a subclass of algorithm_with_properties<>, it's important to
  79. // override the correct awakened() overload.
  80. /*<< You must override the [member_link algorithm_with_properties..awakened]
  81. method. This is how your scheduler receives notification of a
  82. fiber that has become ready to run. >>*/
  83. virtual void awakened( boost::fibers::context * ctx, priority_props & props) noexcept {
  84. int ctx_priority = props.get_priority(); /*< `props` is the instance of
  85. priority_props associated
  86. with the passed fiber `ctx`. >*/
  87. // With this scheduler, fibers with higher priority values are
  88. // preferred over fibers with lower priority values. But fibers with
  89. // equal priority values are processed in round-robin fashion. So when
  90. // we're handed a new context*, put it at the end of the fibers
  91. // with that same priority. In other words: search for the first fiber
  92. // in the queue with LOWER priority, and insert before that one.
  93. rqueue_t::iterator i( std::find_if( rqueue_.begin(), rqueue_.end(),
  94. [ctx_priority,this]( boost::fibers::context & c)
  95. { return properties( &c ).get_priority() < ctx_priority; }));
  96. // Now, whether or not we found a fiber with lower priority,
  97. // insert this new fiber here.
  98. rqueue_.insert( i, * ctx);
  99. //<-
  100. std::cout << "awakened(" << props.name << "): ";
  101. describe_ready_queue();
  102. //->
  103. }
  104. /*<< You must override the [member_link algorithm_with_properties..pick_next]
  105. method. This is how your scheduler actually advises the fiber manager
  106. of the next fiber to run. >>*/
  107. virtual boost::fibers::context * pick_next() noexcept {
  108. // if ready queue is empty, just tell caller
  109. if ( rqueue_.empty() ) {
  110. return nullptr;
  111. }
  112. boost::fibers::context * ctx( & rqueue_.front() );
  113. rqueue_.pop_front();
  114. //<-
  115. std::cout << "pick_next() resuming " << properties( ctx).name << ": ";
  116. describe_ready_queue();
  117. //->
  118. return ctx;
  119. }
  120. /*<< You must override [member_link algorithm_with_properties..has_ready_fibers]
  121. to inform the fiber manager of the state of your ready queue. >>*/
  122. virtual bool has_ready_fibers() const noexcept {
  123. return ! rqueue_.empty();
  124. }
  125. /*<< Overriding [member_link algorithm_with_properties..property_change]
  126. is optional. This override handles the case in which the running
  127. fiber changes the priority of another ready fiber: a fiber already in
  128. our queue. In that case, move the updated fiber within the queue. >>*/
  129. virtual void property_change( boost::fibers::context * ctx, priority_props & props) noexcept {
  130. // Although our priority_props class defines multiple properties, only
  131. // one of them (priority) actually calls notify() when changed. The
  132. // point of a property_change() override is to reshuffle the ready
  133. // queue according to the updated priority value.
  134. //<-
  135. std::cout << "property_change(" << props.name << '(' << props.get_priority()
  136. << ")): ";
  137. //->
  138. // 'ctx' might not be in our queue at all, if caller is changing the
  139. // priority of (say) the running fiber. If it's not there, no need to
  140. // move it: we'll handle it next time it hits awakened().
  141. if ( ! ctx->ready_is_linked()) { /*<
  142. Your `property_change()` override must be able to
  143. handle the case in which the passed `ctx` is not in
  144. your ready queue. It might be running, or it might be
  145. blocked. >*/
  146. //<-
  147. // hopefully user will distinguish this case by noticing that
  148. // the fiber with which we were called does not appear in the
  149. // ready queue at all
  150. describe_ready_queue();
  151. //->
  152. return;
  153. }
  154. // Found ctx: unlink it
  155. ctx->ready_unlink();
  156. // Here we know that ctx was in our ready queue, but we've unlinked
  157. // it. We happen to have a method that will (re-)add a context* to the
  158. // right place in the ready queue.
  159. awakened( ctx, props);
  160. }
  161. //<-
  162. void describe_ready_queue() {
  163. if ( rqueue_.empty() ) {
  164. std::cout << "[empty]";
  165. } else {
  166. const char * delim = "";
  167. for ( boost::fibers::context & ctx : rqueue_) {
  168. priority_props & props( properties( & ctx) );
  169. std::cout << delim << props.name << '(' << props.get_priority() << ')';
  170. delim = ", ";
  171. }
  172. }
  173. std::cout << std::endl;
  174. }
  175. //->
  176. void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
  177. if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
  178. std::unique_lock< std::mutex > lk( mtx_);
  179. cnd_.wait( lk, [this](){ return flag_; });
  180. flag_ = false;
  181. } else {
  182. std::unique_lock< std::mutex > lk( mtx_);
  183. cnd_.wait_until( lk, time_point, [this](){ return flag_; });
  184. flag_ = false;
  185. }
  186. }
  187. void notify() noexcept {
  188. std::unique_lock< std::mutex > lk( mtx_);
  189. flag_ = true;
  190. lk.unlock();
  191. cnd_.notify_all();
  192. }
  193. };
  194. //]
  195. //[launch
  196. template< typename Fn >
  197. boost::fibers::fiber launch( Fn && func, std::string const& name, int priority) {
  198. boost::fibers::fiber fiber( func);
  199. priority_props & props( fiber.properties< priority_props >() );
  200. props.name = name;
  201. props.set_priority( priority);
  202. return fiber;
  203. }
  204. //]
  205. void yield_fn() {
  206. std::string name( boost::this_fiber::properties< priority_props >().name);
  207. Verbose v( std::string("fiber ") + name);
  208. for ( int i = 0; i < 3; ++i) {
  209. std::cout << "fiber " << name << " yielding" << std::endl;
  210. boost::this_fiber::yield();
  211. }
  212. }
  213. void barrier_fn( boost::fibers::barrier & barrier) {
  214. std::string name( boost::this_fiber::properties< priority_props >().name);
  215. Verbose v( std::string("fiber ") + name);
  216. std::cout << "fiber " << name << " waiting on barrier" << std::endl;
  217. barrier.wait();
  218. std::cout << "fiber " << name << " yielding" << std::endl;
  219. boost::this_fiber::yield();
  220. }
  221. //[change_fn
  222. void change_fn( boost::fibers::fiber & other,
  223. int other_priority,
  224. boost::fibers::barrier& barrier) {
  225. std::string name( boost::this_fiber::properties< priority_props >().name);
  226. Verbose v( std::string("fiber ") + name);
  227. //<-
  228. std::cout << "fiber " << name << " waiting on barrier" << std::endl;
  229. //->
  230. barrier.wait();
  231. // We assume a couple things about 'other':
  232. // - that it was also waiting on the same barrier
  233. // - that it has lower priority than this fiber.
  234. // If both are true, 'other' is now ready to run but is sitting in
  235. // priority_scheduler's ready queue. Change its priority.
  236. priority_props & other_props(
  237. other.properties< priority_props >() );
  238. //<-
  239. std::cout << "fiber " << name << " changing priority of " << other_props.name
  240. << " to " << other_priority << std::endl;
  241. //->
  242. other_props.set_priority( other_priority);
  243. }
  244. //]
  245. //[main
  246. int main( int argc, char *argv[]) {
  247. // make sure we use our priority_scheduler rather than default round_robin
  248. boost::fibers::use_scheduling_algorithm< priority_scheduler >();
  249. /*= ...*/
  250. /*=}*/
  251. //]
  252. Verbose v("main()");
  253. // for clarity
  254. std::cout << "main() setting name" << std::endl;
  255. //[main_name
  256. boost::this_fiber::properties< priority_props >().name = "main";
  257. //]
  258. std::cout << "main() running tests" << std::endl;
  259. {
  260. Verbose v("high-priority first", "stop\n");
  261. // verify that high-priority fiber always gets scheduled first
  262. boost::fibers::fiber low( launch( yield_fn, "low", 1) );
  263. boost::fibers::fiber med( launch( yield_fn, "medium", 2) );
  264. boost::fibers::fiber hi( launch( yield_fn, "high", 3) );
  265. std::cout << "main: high.join()" << std::endl;
  266. hi.join();
  267. std::cout << "main: medium.join()" << std::endl;
  268. med.join();
  269. std::cout << "main: low.join()" << std::endl;
  270. low.join();
  271. }
  272. {
  273. Verbose v("same priority round-robin", "stop\n");
  274. // fibers of same priority are scheduled in round-robin order
  275. boost::fibers::fiber a( launch( yield_fn, "a", 0) );
  276. boost::fibers::fiber b( launch( yield_fn, "b", 0) );
  277. boost::fibers::fiber c( launch( yield_fn, "c", 0) );
  278. std::cout << "main: a.join()" << std::endl;
  279. a.join();
  280. std::cout << "main: b.join()" << std::endl;
  281. b.join();
  282. std::cout << "main: c.join()" << std::endl;
  283. c.join();
  284. }
  285. {
  286. Verbose v("barrier wakes up all", "stop\n");
  287. // using a barrier wakes up all waiting fibers at the same time
  288. boost::fibers::barrier barrier( 3);
  289. boost::fibers::fiber low( launch( [&barrier](){ barrier_fn( barrier); }, "low", 1) );
  290. boost::fibers::fiber med( launch( [&barrier](){ barrier_fn( barrier); }, "medium", 2) );
  291. boost::fibers::fiber hi( launch( [&barrier](){ barrier_fn( barrier); }, "high", 3) );
  292. std::cout << "main: low.join()" << std::endl;
  293. low.join();
  294. std::cout << "main: medium.join()" << std::endl;
  295. med.join();
  296. std::cout << "main: high.join()" << std::endl;
  297. hi.join();
  298. }
  299. {
  300. Verbose v("change priority", "stop\n");
  301. // change priority of a fiber in priority_scheduler's ready queue
  302. boost::fibers::barrier barrier( 3);
  303. boost::fibers::fiber c( launch( [&barrier](){ barrier_fn( barrier); }, "c", 1) );
  304. boost::fibers::fiber a( launch( [&c,&barrier]() { change_fn( c, 3, barrier); }, "a", 3) );
  305. boost::fibers::fiber b( launch( [&barrier](){ barrier_fn( barrier); }, "b", 2) );
  306. std::cout << "main: a.join()" << std::endl;
  307. std::cout << "main: a.join()" << std::endl;
  308. a.join();
  309. std::cout << "main: b.join()" << std::endl;
  310. b.join();
  311. std::cout << "main: c.join()" << std::endl;
  312. c.join();
  313. }
  314. std::cout << "done." << std::endl;
  315. return EXIT_SUCCESS;
  316. }