sync_priority_queue.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright (C) 2014 Ian Forbed
  2. // Copyright (C) 2014-2017 Vicente J. Botet Escriba
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
  8. #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
  9. #include <boost/thread/detail/config.hpp>
  10. #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
  11. #include <boost/thread/concurrent_queues/queue_op_status.hpp>
  12. #include <boost/thread/condition_variable.hpp>
  13. #include <boost/thread/csbl/vector.hpp>
  14. #include <boost/thread/detail/move.hpp>
  15. #include <boost/thread/mutex.hpp>
  16. #include <boost/atomic.hpp>
  17. #include <boost/chrono/duration.hpp>
  18. #include <boost/chrono/time_point.hpp>
  19. #include <exception>
  20. #include <queue>
  21. #include <utility>
  22. #include <boost/config/abi_prefix.hpp>
  23. namespace boost
  24. {
  25. namespace detail {
  26. template <
  27. class Type,
  28. class Container = csbl::vector<Type>,
  29. class Compare = std::less<Type>
  30. >
  31. class priority_queue
  32. {
  33. private:
  34. Container _elements;
  35. Compare _compare;
  36. public:
  37. typedef Type value_type;
  38. typedef typename Container::size_type size_type;
  39. explicit priority_queue(const Compare& compare = Compare())
  40. : _elements(), _compare(compare)
  41. { }
  42. size_type size() const
  43. {
  44. return _elements.size();
  45. }
  46. bool empty() const
  47. {
  48. return _elements.empty();
  49. }
  50. void push(Type const& element)
  51. {
  52. _elements.push_back(element);
  53. std::push_heap(_elements.begin(), _elements.end(), _compare);
  54. }
  55. void push(BOOST_RV_REF(Type) element)
  56. {
  57. _elements.push_back(boost::move(element));
  58. std::push_heap(_elements.begin(), _elements.end(), _compare);
  59. }
  60. void pop()
  61. {
  62. std::pop_heap(_elements.begin(), _elements.end(), _compare);
  63. _elements.pop_back();
  64. }
  65. Type pull()
  66. {
  67. Type result = boost::move(_elements.front());
  68. pop();
  69. return boost::move(result);
  70. }
  71. Type const& top() const
  72. {
  73. return _elements.front();
  74. }
  75. };
  76. }
  77. namespace concurrent
  78. {
  79. template <class ValueType,
  80. class Container = csbl::vector<ValueType>,
  81. class Compare = std::less<typename Container::value_type> >
  82. class sync_priority_queue
  83. : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
  84. {
  85. typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
  86. public:
  87. typedef ValueType value_type;
  88. //typedef typename super::value_type value_type; // fixme
  89. typedef typename super::underlying_queue_type underlying_queue_type;
  90. typedef typename super::size_type size_type;
  91. typedef typename super::op_status op_status;
  92. typedef chrono::steady_clock clock;
  93. protected:
  94. public:
  95. sync_priority_queue() {}
  96. ~sync_priority_queue()
  97. {
  98. if(!super::closed())
  99. {
  100. super::close();
  101. }
  102. }
  103. void push(const ValueType& elem);
  104. void push(BOOST_THREAD_RV_REF(ValueType) elem);
  105. queue_op_status try_push(const ValueType& elem);
  106. queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
  107. ValueType pull();
  108. void pull(ValueType&);
  109. template <class WClock, class Duration>
  110. queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
  111. template <class Rep, class Period>
  112. queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
  113. queue_op_status try_pull(ValueType& elem);
  114. queue_op_status wait_pull(ValueType& elem);
  115. queue_op_status nonblocking_pull(ValueType&);
  116. private:
  117. void push(unique_lock<mutex>&, const ValueType& elem);
  118. void push(lock_guard<mutex>&, const ValueType& elem);
  119. void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
  120. void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
  121. queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
  122. queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
  123. ValueType pull(unique_lock<mutex>&);
  124. ValueType pull(lock_guard<mutex>&);
  125. void pull(unique_lock<mutex>&, ValueType&);
  126. void pull(lock_guard<mutex>&, ValueType&);
  127. queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
  128. queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
  129. queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
  130. queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
  131. sync_priority_queue(const sync_priority_queue&);
  132. sync_priority_queue& operator= (const sync_priority_queue&);
  133. sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
  134. sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
  135. }; //end class
  136. //////////////////////
  137. template <class T, class Container,class Cmp>
  138. void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
  139. {
  140. super::throw_if_closed(lk);
  141. super::data_.push(elem);
  142. super::notify_elem_added(lk);
  143. }
  144. template <class T, class Container,class Cmp>
  145. void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
  146. {
  147. super::throw_if_closed(lk);
  148. super::data_.push(elem);
  149. super::notify_elem_added(lk);
  150. }
  151. template <class T, class Container,class Cmp>
  152. void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
  153. {
  154. lock_guard<mutex> lk(super::mtx_);
  155. push(lk, elem);
  156. }
  157. //////////////////////
  158. template <class T, class Container,class Cmp>
  159. void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
  160. {
  161. super::throw_if_closed(lk);
  162. super::data_.push(boost::move(elem));
  163. super::notify_elem_added(lk);
  164. }
  165. template <class T, class Container,class Cmp>
  166. void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
  167. {
  168. super::throw_if_closed(lk);
  169. super::data_.push(boost::move(elem));
  170. super::notify_elem_added(lk);
  171. }
  172. template <class T, class Container,class Cmp>
  173. void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
  174. {
  175. lock_guard<mutex> lk(super::mtx_);
  176. push(lk, boost::move(elem));
  177. }
  178. //////////////////////
  179. template <class T, class Container,class Cmp>
  180. queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
  181. {
  182. lock_guard<mutex> lk(super::mtx_);
  183. if (super::closed(lk)) return queue_op_status::closed;
  184. push(lk, elem);
  185. return queue_op_status::success;
  186. }
  187. //////////////////////
  188. template <class T, class Container,class Cmp>
  189. queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
  190. {
  191. lock_guard<mutex> lk(super::mtx_);
  192. if (super::closed(lk)) return queue_op_status::closed;
  193. push(lk, boost::move(elem));
  194. return queue_op_status::success;
  195. }
  196. //////////////////////
  197. template <class T,class Container, class Cmp>
  198. T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
  199. {
  200. return super::data_.pull();
  201. }
  202. template <class T,class Container, class Cmp>
  203. T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
  204. {
  205. return super::data_.pull();
  206. }
  207. template <class T,class Container, class Cmp>
  208. T sync_priority_queue<T,Container,Cmp>::pull()
  209. {
  210. unique_lock<mutex> lk(super::mtx_);
  211. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  212. if (has_been_closed) super::throw_if_closed(lk);
  213. return pull(lk);
  214. }
  215. //////////////////////
  216. template <class T,class Container, class Cmp>
  217. void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
  218. {
  219. elem = super::data_.pull();
  220. }
  221. template <class T,class Container, class Cmp>
  222. void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
  223. {
  224. elem = super::data_.pull();
  225. }
  226. template <class T,class Container, class Cmp>
  227. void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
  228. {
  229. unique_lock<mutex> lk(super::mtx_);
  230. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  231. if (has_been_closed) super::throw_if_closed(lk);
  232. pull(lk, elem);
  233. }
  234. //////////////////////
  235. template <class T, class Cont,class Cmp>
  236. template <class WClock, class Duration>
  237. queue_op_status
  238. sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
  239. {
  240. unique_lock<mutex> lk(super::mtx_);
  241. const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
  242. if (rc == queue_op_status::success) pull(lk, elem);
  243. return rc;
  244. }
  245. //////////////////////
  246. template <class T, class Cont,class Cmp>
  247. template <class Rep, class Period>
  248. queue_op_status
  249. sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
  250. {
  251. return pull_until(chrono::steady_clock::now() + dura, elem);
  252. }
  253. //////////////////////
  254. template <class T, class Container,class Cmp>
  255. queue_op_status
  256. sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
  257. {
  258. if (super::empty(lk))
  259. {
  260. if (super::closed(lk)) return queue_op_status::closed;
  261. return queue_op_status::empty;
  262. }
  263. pull(lk, elem);
  264. return queue_op_status::success;
  265. }
  266. template <class T, class Container,class Cmp>
  267. queue_op_status
  268. sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
  269. {
  270. if (super::empty(lk))
  271. {
  272. if (super::closed(lk)) return queue_op_status::closed;
  273. return queue_op_status::empty;
  274. }
  275. pull(lk, elem);
  276. return queue_op_status::success;
  277. }
  278. template <class T, class Container,class Cmp>
  279. queue_op_status
  280. sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
  281. {
  282. lock_guard<mutex> lk(super::mtx_);
  283. return try_pull(lk, elem);
  284. }
  285. //////////////////////
  286. template <class T,class Container, class Cmp>
  287. queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
  288. {
  289. const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
  290. if (has_been_closed) return queue_op_status::closed;
  291. pull(lk, elem);
  292. return queue_op_status::success;
  293. }
  294. template <class T,class Container, class Cmp>
  295. queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
  296. {
  297. unique_lock<mutex> lk(super::mtx_);
  298. return wait_pull(lk, elem);
  299. }
  300. //////////////////////
  301. template <class T,class Container, class Cmp>
  302. queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
  303. {
  304. unique_lock<mutex> lk(super::mtx_, try_to_lock);
  305. if (!lk.owns_lock()) return queue_op_status::busy;
  306. return try_pull(lk, elem);
  307. }
  308. } //end concurrent namespace
  309. using concurrent::sync_priority_queue;
  310. } //end boost namespace
  311. #include <boost/config/abi_suffix.hpp>
  312. #endif