condition.hpp 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  11. #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  12. #ifndef BOOST_CONFIG_HPP
  13. # include <boost/config.hpp>
  14. #endif
  15. #
  16. #if defined(BOOST_HAS_PRAGMA_ONCE)
  17. # pragma once
  18. #endif
  19. #include <boost/interprocess/detail/config_begin.hpp>
  20. #include <boost/interprocess/detail/workaround.hpp>
  21. #include <boost/interprocess/sync/spin/mutex.hpp>
  22. #include <boost/interprocess/detail/posix_time_types_wrk.hpp>
  23. #include <boost/interprocess/detail/atomic.hpp>
  24. #include <boost/interprocess/sync/scoped_lock.hpp>
  25. #include <boost/interprocess/exceptions.hpp>
  26. #include <boost/interprocess/detail/os_thread_functions.hpp>
  27. #include <boost/interprocess/sync/spin/wait.hpp>
  28. #include <boost/move/utility_core.hpp>
  29. #include <boost/cstdint.hpp>
  30. namespace boost {
  31. namespace interprocess {
  32. namespace ipcdetail {
  33. class spin_condition
  34. {
  35. spin_condition(const spin_condition &);
  36. spin_condition &operator=(const spin_condition &);
  37. public:
  38. spin_condition();
  39. ~spin_condition();
  40. void notify_one();
  41. void notify_all();
  42. template <typename L>
  43. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
  44. {
  45. if (!lock)
  46. throw lock_exception();
  47. //Handle infinity absolute time here to avoid complications in do_timed_wait
  48. if(abs_time == boost::posix_time::pos_infin){
  49. this->wait(lock);
  50. return true;
  51. }
  52. return this->do_timed_wait(abs_time, *lock.mutex());
  53. }
  54. template <typename L, typename Pr>
  55. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
  56. {
  57. if (!lock)
  58. throw lock_exception();
  59. //Handle infinity absolute time here to avoid complications in do_timed_wait
  60. if(abs_time == boost::posix_time::pos_infin){
  61. this->wait(lock, pred);
  62. return true;
  63. }
  64. while (!pred()){
  65. if (!this->do_timed_wait(abs_time, *lock.mutex()))
  66. return pred();
  67. }
  68. return true;
  69. }
  70. template <typename L>
  71. void wait(L& lock)
  72. {
  73. if (!lock)
  74. throw lock_exception();
  75. do_wait(*lock.mutex());
  76. }
  77. template <typename L, typename Pr>
  78. void wait(L& lock, Pr pred)
  79. {
  80. if (!lock)
  81. throw lock_exception();
  82. while (!pred())
  83. do_wait(*lock.mutex());
  84. }
  85. template<class InterprocessMutex>
  86. void do_wait(InterprocessMutex &mut);
  87. template<class InterprocessMutex>
  88. bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  89. private:
  90. template<class InterprocessMutex>
  91. bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  92. enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
  93. spin_mutex m_enter_mut;
  94. volatile boost::uint32_t m_command;
  95. volatile boost::uint32_t m_num_waiters;
  96. void notify(boost::uint32_t command);
  97. };
  98. inline spin_condition::spin_condition()
  99. {
  100. //Note that this class is initialized to zero.
  101. //So zeroed memory can be interpreted as an initialized
  102. //condition variable
  103. m_command = SLEEP;
  104. m_num_waiters = 0;
  105. }
  106. inline spin_condition::~spin_condition()
  107. {
  108. //Notify all waiting threads
  109. //to allow POSIX semantics on condition destruction
  110. this->notify_all();
  111. }
  112. inline void spin_condition::notify_one()
  113. {
  114. this->notify(NOTIFY_ONE);
  115. }
  116. inline void spin_condition::notify_all()
  117. {
  118. this->notify(NOTIFY_ALL);
  119. }
  120. inline void spin_condition::notify(boost::uint32_t command)
  121. {
  122. //This mutex guarantees that no other thread can enter to the
  123. //do_timed_wait method logic, so that thread count will be
  124. //constant until the function writes a NOTIFY_ALL command.
  125. //It also guarantees that no other notification can be signaled
  126. //on this spin_condition before this one ends
  127. m_enter_mut.lock();
  128. //Return if there are no waiters
  129. if(!atomic_read32(&m_num_waiters)) {
  130. m_enter_mut.unlock();
  131. return;
  132. }
  133. //Notify that all threads should execute wait logic
  134. spin_wait swait;
  135. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
  136. swait.yield();
  137. }
  138. //The enter mutex will rest locked until the last waiting thread unlocks it
  139. }
  140. template<class InterprocessMutex>
  141. inline void spin_condition::do_wait(InterprocessMutex &mut)
  142. {
  143. this->do_timed_wait(false, boost::posix_time::ptime(), mut);
  144. }
  145. template<class InterprocessMutex>
  146. inline bool spin_condition::do_timed_wait
  147. (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
  148. {
  149. return this->do_timed_wait(true, abs_time, mut);
  150. }
  151. template<class InterprocessMutex>
  152. inline bool spin_condition::do_timed_wait(bool tout_enabled,
  153. const boost::posix_time::ptime &abs_time,
  154. InterprocessMutex &mut)
  155. {
  156. boost::posix_time::ptime now = microsec_clock::universal_time();
  157. if(tout_enabled){
  158. if(now >= abs_time) return false;
  159. }
  160. typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
  161. //The enter mutex guarantees that while executing a notification,
  162. //no other thread can execute the do_timed_wait method.
  163. {
  164. //---------------------------------------------------------------
  165. InternalLock lock;
  166. if(tout_enabled){
  167. InternalLock dummy(m_enter_mut, abs_time);
  168. lock = boost::move(dummy);
  169. }
  170. else{
  171. InternalLock dummy(m_enter_mut);
  172. lock = boost::move(dummy);
  173. }
  174. if(!lock)
  175. return false;
  176. //---------------------------------------------------------------
  177. //We increment the waiting thread count protected so that it will be
  178. //always constant when another thread enters the notification logic.
  179. //The increment marks this thread as "waiting on spin_condition"
  180. atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
  181. //We unlock the external mutex atomically with the increment
  182. mut.unlock();
  183. }
  184. //By default, we suppose that no timeout has happened
  185. bool timed_out = false, unlock_enter_mut= false;
  186. //Loop until a notification indicates that the thread should
  187. //exit or timeout occurs
  188. while(1){
  189. //The thread sleeps/spins until a spin_condition commands a notification
  190. //Notification occurred, we will lock the checking mutex so that
  191. spin_wait swait;
  192. while(atomic_read32(&m_command) == SLEEP){
  193. swait.yield();
  194. //Check for timeout
  195. if(tout_enabled){
  196. now = microsec_clock::universal_time();
  197. if(now >= abs_time){
  198. //If we can lock the mutex it means that no notification
  199. //is being executed in this spin_condition variable
  200. timed_out = m_enter_mut.try_lock();
  201. //If locking fails, indicates that another thread is executing
  202. //notification, so we play the notification game
  203. if(!timed_out){
  204. //There is an ongoing notification, we will try again later
  205. continue;
  206. }
  207. //No notification in execution, since enter mutex is locked.
  208. //We will execute time-out logic, so we will decrement count,
  209. //release the enter mutex and return false.
  210. break;
  211. }
  212. }
  213. }
  214. //If a timeout occurred, the mutex will not execute checking logic
  215. if(tout_enabled && timed_out){
  216. //Decrement wait count
  217. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  218. unlock_enter_mut = true;
  219. break;
  220. }
  221. else{
  222. boost::uint32_t result = atomic_cas32
  223. (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
  224. if(result == SLEEP){
  225. //Other thread has been notified and since it was a NOTIFY one
  226. //command, this thread must sleep again
  227. continue;
  228. }
  229. else if(result == NOTIFY_ONE){
  230. //If it was a NOTIFY_ONE command, only this thread should
  231. //exit. This thread has atomically marked command as sleep before
  232. //so no other thread will exit.
  233. //Decrement wait count.
  234. unlock_enter_mut = true;
  235. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  236. break;
  237. }
  238. else{
  239. //If it is a NOTIFY_ALL command, all threads should return
  240. //from do_timed_wait function. Decrement wait count.
  241. unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  242. //Check if this is the last thread of notify_all waiters
  243. //Only the last thread will release the mutex
  244. if(unlock_enter_mut){
  245. atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
  246. }
  247. break;
  248. }
  249. }
  250. }
  251. //Unlock the enter mutex if it is a single notification, if this is
  252. //the last notified thread in a notify_all or a timeout has occurred
  253. if(unlock_enter_mut){
  254. m_enter_mut.unlock();
  255. }
  256. //Lock external again before returning from the method
  257. mut.lock();
  258. return !timed_out;
  259. }
  260. } //namespace ipcdetail
  261. } //namespace interprocess
  262. } //namespace boost
  263. #include <boost/interprocess/detail/config_end.hpp>
  264. #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP