sync_queue_base.hpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
  2. #define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
  3. //////////////////////////////////////////////////////////////////////////////
  4. //
  5. // (C) Copyright Vicente J. Botet Escriba 2013-2017. Distributed under the Boost
  6. // Software License, Version 1.0. (See accompanying file
  7. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  8. //
  9. // See http://www.boost.org/libs/thread for documentation.
  10. //
  11. //////////////////////////////////////////////////////////////////////////////
  12. #include <boost/bind.hpp>
  13. #include <boost/thread/detail/config.hpp>
  14. #include <boost/thread/condition_variable.hpp>
  15. #include <boost/thread/detail/move.hpp>
  16. #include <boost/thread/mutex.hpp>
  17. #include <boost/thread/concurrent_queues/queue_op_status.hpp>
  18. #include <boost/chrono/time_point.hpp>
  19. #include <boost/throw_exception.hpp>
  20. #include <boost/config/abi_prefix.hpp>
  21. namespace boost
  22. {
  23. namespace concurrent
  24. {
  25. namespace detail
  26. {
  27. template <class ValueType, class Queue>
  28. class sync_queue_base
  29. {
  30. public:
  31. typedef ValueType value_type;
  32. typedef Queue underlying_queue_type;
  33. typedef typename Queue::size_type size_type;
  34. typedef queue_op_status op_status;
  35. // Constructors/Assignment/Destructors
  36. BOOST_THREAD_NO_COPYABLE(sync_queue_base)
  37. inline sync_queue_base();
  38. //template <typename Range>
  39. //inline explicit sync_queue(Range range);
  40. inline ~sync_queue_base();
  41. // Observers
  42. inline bool empty() const;
  43. inline bool full() const;
  44. inline size_type size() const;
  45. inline bool closed() const;
  46. // Modifiers
  47. inline void close();
  48. inline underlying_queue_type underlying_queue() {
  49. lock_guard<mutex> lk(mtx_);
  50. return boost::move(data_);
  51. }
  52. protected:
  53. mutable mutex mtx_;
  54. condition_variable cond_;
  55. underlying_queue_type data_;
  56. bool closed_;
  57. inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
  58. {
  59. return data_.empty();
  60. }
  61. inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
  62. {
  63. return data_.empty();
  64. }
  65. inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT
  66. {
  67. return data_.size();
  68. }
  69. inline bool closed(unique_lock<mutex>& lk) const;
  70. inline bool closed(lock_guard<mutex>& lk) const;
  71. inline void throw_if_closed(unique_lock<mutex>&);
  72. inline void throw_if_closed(lock_guard<mutex>&);
  73. inline bool not_empty_or_closed(unique_lock<mutex>& ) const;
  74. inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
  75. template <class WClock, class Duration>
  76. queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
  77. inline void notify_elem_added(unique_lock<mutex>& )
  78. {
  79. cond_.notify_all();
  80. }
  81. inline void notify_elem_added(lock_guard<mutex>& )
  82. {
  83. cond_.notify_all();
  84. }
  85. };
  86. template <class ValueType, class Queue>
  87. sync_queue_base<ValueType, Queue>::sync_queue_base() :
  88. data_(), closed_(false)
  89. {
  90. BOOST_ASSERT(data_.empty());
  91. }
  92. template <class ValueType, class Queue>
  93. sync_queue_base<ValueType, Queue>::~sync_queue_base()
  94. {
  95. }
  96. template <class ValueType, class Queue>
  97. void sync_queue_base<ValueType, Queue>::close()
  98. {
  99. {
  100. lock_guard<mutex> lk(mtx_);
  101. closed_ = true;
  102. }
  103. cond_.notify_all();
  104. }
  105. template <class ValueType, class Queue>
  106. bool sync_queue_base<ValueType, Queue>::closed() const
  107. {
  108. lock_guard<mutex> lk(mtx_);
  109. return closed(lk);
  110. }
  111. template <class ValueType, class Queue>
  112. bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const
  113. {
  114. return closed_;
  115. }
  116. template <class ValueType, class Queue>
  117. bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const
  118. {
  119. return closed_;
  120. }
  121. template <class ValueType, class Queue>
  122. bool sync_queue_base<ValueType, Queue>::empty() const
  123. {
  124. lock_guard<mutex> lk(mtx_);
  125. return empty(lk);
  126. }
  127. template <class ValueType, class Queue>
  128. bool sync_queue_base<ValueType, Queue>::full() const
  129. {
  130. return false;
  131. }
  132. template <class ValueType, class Queue>
  133. typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const
  134. {
  135. lock_guard<mutex> lk(mtx_);
  136. return size(lk);
  137. }
  138. template <class ValueType, class Queue>
  139. void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk)
  140. {
  141. if (closed(lk))
  142. {
  143. BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
  144. }
  145. }
  146. template <class ValueType, class Queue>
  147. void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk)
  148. {
  149. if (closed(lk))
  150. {
  151. BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
  152. }
  153. }
  154. template <class ValueType, class Queue>
  155. bool sync_queue_base<ValueType, Queue>::not_empty_or_closed(unique_lock<mutex>& ) const
  156. {
  157. return ! data_.empty() || closed_;
  158. }
  159. template <class ValueType, class Queue>
  160. bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
  161. {
  162. cond_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
  163. if (! empty(lk)) return false; // success
  164. return true; // closed
  165. }
  166. template <class ValueType, class Queue>
  167. template <class WClock, class Duration>
  168. queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
  169. {
  170. if (! cond_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
  171. return queue_op_status::timeout;
  172. if (! empty(lk)) return queue_op_status::success;
  173. return queue_op_status::closed;
  174. }
  175. } // detail
  176. } // concurrent
  177. } // boost
  178. #include <boost/config/abi_suffix.hpp>
  179. #endif