test_common.hpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // Copyright (C) 2011 Tim Blechmann
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See
  4. // accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #include <cassert>
  7. #include <iostream>
  8. #include "test_helpers.hpp"
  9. #include <boost/array.hpp>
  10. #include <boost/thread.hpp>
  11. namespace impl {
  12. using boost::array;
  13. using namespace boost;
  14. using namespace std;
  15. template <bool Bounded = false>
  16. struct queue_stress_tester
  17. {
  18. static const unsigned int buckets = 1<<13;
  19. #ifndef BOOST_LOCKFREE_STRESS_TEST
  20. static const long node_count = 5000;
  21. #else
  22. static const long node_count = 500000;
  23. #endif
  24. const int reader_threads;
  25. const int writer_threads;
  26. boost::lockfree::detail::atomic<int> writers_finished;
  27. static_hashed_set<long, buckets> data;
  28. static_hashed_set<long, buckets> dequeued;
  29. array<std::set<long>, buckets> returned;
  30. boost::lockfree::detail::atomic<int> push_count, pop_count;
  31. queue_stress_tester(int reader, int writer):
  32. reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0)
  33. {}
  34. template <typename queue>
  35. void add_items(queue & stk)
  36. {
  37. for (long i = 0; i != node_count; ++i) {
  38. long id = generate_id<long>();
  39. bool inserted = data.insert(id);
  40. assert(inserted);
  41. if (Bounded)
  42. while(stk.bounded_push(id) == false) {
  43. #ifdef __VXWORKS__
  44. thread::yield();
  45. #endif
  46. }
  47. else
  48. while(stk.push(id) == false) {
  49. #ifdef __VXWORKS__
  50. thread::yield();
  51. #endif
  52. }
  53. ++push_count;
  54. }
  55. writers_finished += 1;
  56. }
  57. boost::lockfree::detail::atomic<bool> running;
  58. template <typename queue>
  59. bool consume_element(queue & q)
  60. {
  61. long id;
  62. bool ret = q.pop(id);
  63. if (!ret)
  64. return false;
  65. bool erased = data.erase(id);
  66. bool inserted = dequeued.insert(id);
  67. assert(erased);
  68. assert(inserted);
  69. ++pop_count;
  70. return true;
  71. }
  72. template <typename queue>
  73. void get_items(queue & q)
  74. {
  75. for (;;) {
  76. bool received_element = consume_element(q);
  77. if (received_element)
  78. continue;
  79. if ( writers_finished.load() == writer_threads )
  80. break;
  81. #ifdef __VXWORKS__
  82. thread::yield();
  83. #endif
  84. }
  85. while (consume_element(q));
  86. }
  87. template <typename queue>
  88. void run(queue & stk)
  89. {
  90. BOOST_WARN(stk.is_lock_free());
  91. writers_finished.store(0);
  92. thread_group writer;
  93. thread_group reader;
  94. BOOST_REQUIRE(stk.empty());
  95. for (int i = 0; i != reader_threads; ++i)
  96. reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk)));
  97. for (int i = 0; i != writer_threads; ++i)
  98. writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk)));
  99. std::cout << "threads created" << std::endl;
  100. writer.join_all();
  101. std::cout << "writer threads joined, waiting for readers" << std::endl;
  102. reader.join_all();
  103. std::cout << "reader threads joined" << std::endl;
  104. BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0);
  105. BOOST_REQUIRE(stk.empty());
  106. BOOST_REQUIRE_EQUAL(push_count, pop_count);
  107. BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count);
  108. }
  109. };
  110. }
  111. using impl::queue_stress_tester;