context_spmc_queue.hpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright Oliver Kowalke 2013.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
  6. #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
  7. #include <atomic>
  8. #include <cstddef>
  9. #include <cstdint>
  10. #include <memory>
  11. #include <type_traits>
  12. #include <utility>
  13. #include <boost/assert.hpp>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/detail/config.hpp>
  16. #include <boost/fiber/context.hpp>
  17. // David Chase and Yossi Lev. Dynamic circular work-stealing deque.
  18. // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
  19. // on Parallelism in algorithms and architectures, pages 21–28,
  20. // New York, NY, USA, 2005. ACM.
  21. //
  22. // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
  23. // Correct and efficient work-stealing for weak memory models.
  24. // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
  25. // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
  26. #if BOOST_COMP_CLANG
  27. #pragma clang diagnostic push
  28. #pragma clang diagnostic ignored "-Wunused-private-field"
  29. #endif
  30. namespace boost {
  31. namespace fibers {
  32. namespace detail {
  33. class context_spmc_queue {
  34. private:
  35. class array {
  36. private:
  37. typedef std::atomic< context * > atomic_type;
  38. typedef atomic_type storage_type;
  39. std::size_t capacity_;
  40. storage_type * storage_;
  41. public:
  42. array( std::size_t capacity) :
  43. capacity_{ capacity },
  44. storage_{ new storage_type[capacity_] } {
  45. for ( std::size_t i = 0; i < capacity_; ++i) {
  46. ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
  47. }
  48. }
  49. ~array() {
  50. for ( std::size_t i = 0; i < capacity_; ++i) {
  51. reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
  52. }
  53. delete [] storage_;
  54. }
  55. std::size_t capacity() const noexcept {
  56. return capacity_;
  57. }
  58. void push( std::size_t bottom, context * ctx) noexcept {
  59. reinterpret_cast< atomic_type * >(
  60. std::addressof( storage_[bottom % capacity_]) )
  61. ->store( ctx, std::memory_order_relaxed);
  62. }
  63. context * pop( std::size_t top) noexcept {
  64. return reinterpret_cast< atomic_type * >(
  65. std::addressof( storage_[top % capacity_]) )
  66. ->load( std::memory_order_relaxed);
  67. }
  68. array * resize( std::size_t bottom, std::size_t top) {
  69. std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
  70. for ( std::size_t i = top; i != bottom; ++i) {
  71. tmp->push( i, pop( i) );
  72. }
  73. return tmp.release();
  74. }
  75. };
  76. std::atomic< std::size_t > top_{ 0 };
  77. std::atomic< std::size_t > bottom_{ 0 };
  78. std::atomic< array * > array_;
  79. std::vector< array * > old_arrays_{};
  80. char padding_[cacheline_length];
  81. public:
  82. context_spmc_queue( std::size_t capacity = 4096) :
  83. array_{ new array{ capacity } } {
  84. old_arrays_.reserve( 32);
  85. }
  86. ~context_spmc_queue() {
  87. for ( array * a : old_arrays_) {
  88. delete a;
  89. }
  90. delete array_.load();
  91. }
  92. context_spmc_queue( context_spmc_queue const&) = delete;
  93. context_spmc_queue & operator=( context_spmc_queue const&) = delete;
  94. bool empty() const noexcept {
  95. std::size_t bottom = bottom_.load( std::memory_order_relaxed);
  96. std::size_t top = top_.load( std::memory_order_relaxed);
  97. return bottom <= top;
  98. }
  99. void push( context * ctx) {
  100. std::size_t bottom = bottom_.load( std::memory_order_relaxed);
  101. std::size_t top = top_.load( std::memory_order_acquire);
  102. array * a = array_.load( std::memory_order_relaxed);
  103. if ( (a->capacity() - 1) < (bottom - top) ) {
  104. // queue is full
  105. // resize
  106. array * tmp = a->resize( bottom, top);
  107. old_arrays_.push_back( a);
  108. std::swap( a, tmp);
  109. array_.store( a, std::memory_order_relaxed);
  110. }
  111. a->push( bottom, ctx);
  112. std::atomic_thread_fence( std::memory_order_release);
  113. bottom_.store( bottom + 1, std::memory_order_relaxed);
  114. }
  115. context * pop() {
  116. std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
  117. array * a = array_.load( std::memory_order_relaxed);
  118. bottom_.store( bottom, std::memory_order_relaxed);
  119. std::atomic_thread_fence( std::memory_order_seq_cst);
  120. std::size_t top = top_.load( std::memory_order_relaxed);
  121. context * ctx = nullptr;
  122. if ( top <= bottom) {
  123. // queue is not empty
  124. ctx = a->pop( bottom);
  125. BOOST_ASSERT( nullptr != ctx);
  126. if ( top == bottom) {
  127. // last element dequeued
  128. if ( ! top_.compare_exchange_strong( top, top + 1,
  129. std::memory_order_seq_cst,
  130. std::memory_order_relaxed) ) {
  131. // lose the race
  132. ctx = nullptr;
  133. }
  134. bottom_.store( bottom + 1, std::memory_order_relaxed);
  135. }
  136. } else {
  137. // queue is empty
  138. bottom_.store( bottom + 1, std::memory_order_relaxed);
  139. }
  140. return ctx;
  141. }
  142. context * steal() {
  143. std::size_t top = top_.load( std::memory_order_acquire);
  144. std::atomic_thread_fence( std::memory_order_seq_cst);
  145. std::size_t bottom = bottom_.load( std::memory_order_acquire);
  146. context * ctx = nullptr;
  147. if ( top < bottom) {
  148. // queue is not empty
  149. array * a = array_.load( std::memory_order_consume);
  150. ctx = a->pop( top);
  151. BOOST_ASSERT( nullptr != ctx);
  152. // do not steal pinned context (e.g. main-/dispatcher-context)
  153. if ( ctx->is_context( type::pinned_context) ) {
  154. return nullptr;
  155. }
  156. if ( ! top_.compare_exchange_strong( top, top + 1,
  157. std::memory_order_seq_cst,
  158. std::memory_order_relaxed) ) {
  159. // lose the race
  160. return nullptr;
  161. }
  162. }
  163. return ctx;
  164. }
  165. };
  166. }}}
  167. #if BOOST_COMP_CLANG
  168. #pragma clang diagnostic pop
  169. #endif
  170. #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H