user_scheduler.hpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Copyright (C) 2013 Vicente J. Botet Escriba
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // 2013/11 Vicente J. Botet Escriba
  7. // first implementation of a simple serial scheduler.
  8. #ifndef BOOST_THREAD_USER_SCHEDULER_HPP
  9. #define BOOST_THREAD_USER_SCHEDULER_HPP
  10. #include <boost/thread/detail/config.hpp>
  11. #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
  12. #include <boost/thread/detail/delete.hpp>
  13. #include <boost/thread/detail/move.hpp>
  14. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  15. #include <boost/thread/executors/work.hpp>
  16. #include <boost/config/abi_prefix.hpp>
  17. namespace boost
  18. {
  19. class user_scheduler
  20. {
  21. /// type-erasure to store the works to do
  22. typedef executors::work work;
  23. /// the thread safe work queue
  24. sync_queue<work > work_queue;
  25. public:
  26. /**
  27. * Effects: try to execute one task.
  28. * Returns: whether a task has been executed.
  29. * Throws: whatever the current task constructor throws or the task() throws.
  30. */
  31. bool try_executing_one()
  32. {
  33. work task;
  34. try
  35. {
  36. if (work_queue.try_pull(task) == queue_op_status::success)
  37. {
  38. task();
  39. return true;
  40. }
  41. return false;
  42. }
  43. catch (std::exception& )
  44. {
  45. return false;
  46. }
  47. catch (...)
  48. {
  49. return false;
  50. }
  51. }
  52. private:
  53. /**
  54. * Effects: schedule one task or yields
  55. * Throws: whatever the current task constructor throws or the task() throws.
  56. */
  57. void schedule_one_or_yield()
  58. {
  59. if ( ! try_executing_one())
  60. {
  61. this_thread::yield();
  62. }
  63. }
  64. /**
  65. * The main loop of the worker thread
  66. */
  67. void worker_thread()
  68. {
  69. while (!closed())
  70. {
  71. schedule_one_or_yield();
  72. }
  73. while (try_executing_one())
  74. {
  75. }
  76. }
  77. public:
  78. /// user_scheduler is not copyable.
  79. BOOST_THREAD_NO_COPYABLE(user_scheduler)
  80. /**
  81. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
  82. *
  83. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  84. */
  85. user_scheduler()
  86. {
  87. }
  88. /**
  89. * \b Effects: Destroys the thread pool.
  90. *
  91. * \b Synchronization: The completion of all the closures happen before the completion of the \c user_scheduler destructor.
  92. */
  93. ~user_scheduler()
  94. {
  95. // signal to all the worker thread that there will be no more submissions.
  96. close();
  97. }
  98. /**
  99. * loop
  100. */
  101. void loop() { worker_thread(); }
  102. /**
  103. * \b Effects: close the \c user_scheduler for submissions.
  104. * The loop will work until there is no more closures to run.
  105. */
  106. void close()
  107. {
  108. work_queue.close();
  109. }
  110. /**
  111. * \b Returns: whether the pool is closed for submissions.
  112. */
  113. bool closed()
  114. {
  115. return work_queue.closed();
  116. }
  117. /**
  118. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
  119. *
  120. * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
  121. * If invoked closure throws an exception the \c user_scheduler will call \c std::terminate, as is the case with threads.
  122. *
  123. * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
  124. *
  125. * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
  126. * Whatever exception that can be throw while storing the closure.
  127. */
  128. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  129. template <typename Closure>
  130. void submit(Closure & closure)
  131. {
  132. work w ((closure));
  133. work_queue.push(boost::move(w));
  134. //work_queue.push(work(closure)); // todo check why this doesn't work
  135. }
  136. #endif
  137. void submit(void (*closure)())
  138. {
  139. work w ((closure));
  140. work_queue.push(boost::move(w));
  141. //work_queue.push(work(closure)); // todo check why this doesn't work
  142. }
  143. template <typename Closure>
  144. void submit(BOOST_THREAD_RV_REF(Closure) closure)
  145. {
  146. work w =boost::move(closure);
  147. work_queue.push(boost::move(w));
  148. //work_queue.push(work(boost::move(closure))); // todo check why this doesn't work
  149. }
  150. /**
  151. * \b Requires: This must be called from an scheduled task.
  152. *
  153. * \b Effects: reschedule functions until pred()
  154. */
  155. template <typename Pred>
  156. bool reschedule_until(Pred const& pred)
  157. {
  158. do {
  159. if ( ! try_executing_one())
  160. {
  161. return false;
  162. }
  163. } while (! pred());
  164. return true;
  165. }
  166. /**
  167. * run queued closures
  168. */
  169. void run_queued_closures()
  170. {
  171. sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
  172. while (q.empty())
  173. {
  174. work task = q.front();
  175. q.pop_front();
  176. task();
  177. }
  178. }
  179. };
  180. }
  181. #include <boost/config/abi_suffix.hpp>
  182. #endif
  183. #endif