// Copyright (C) 2011 Tim Blechmann // // Distributed under the Boost Software License, Version 1.0. (See // accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include #include #include "test_helpers.hpp" #include #include namespace impl { using boost::array; using namespace boost; using namespace std; template struct queue_stress_tester { static const unsigned int buckets = 1<<13; #ifndef BOOST_LOCKFREE_STRESS_TEST static const long node_count = 5000; #else static const long node_count = 500000; #endif const int reader_threads; const int writer_threads; boost::lockfree::detail::atomic writers_finished; static_hashed_set data; static_hashed_set dequeued; array, buckets> returned; boost::lockfree::detail::atomic push_count, pop_count; queue_stress_tester(int reader, int writer): reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0) {} template void add_items(queue & stk) { for (long i = 0; i != node_count; ++i) { long id = generate_id(); bool inserted = data.insert(id); assert(inserted); if (Bounded) while(stk.bounded_push(id) == false) { #ifdef __VXWORKS__ thread::yield(); #endif } else while(stk.push(id) == false) { #ifdef __VXWORKS__ thread::yield(); #endif } ++push_count; } writers_finished += 1; } boost::lockfree::detail::atomic running; template bool consume_element(queue & q) { long id; bool ret = q.pop(id); if (!ret) return false; bool erased = data.erase(id); bool inserted = dequeued.insert(id); assert(erased); assert(inserted); ++pop_count; return true; } template void get_items(queue & q) { for (;;) { bool received_element = consume_element(q); if (received_element) continue; if ( writers_finished.load() == writer_threads ) break; #ifdef __VXWORKS__ thread::yield(); #endif } while (consume_element(q)); } template void run(queue & stk) { BOOST_WARN(stk.is_lock_free()); writers_finished.store(0); thread_group writer; thread_group reader; BOOST_REQUIRE(stk.empty()); for (int i = 0; i != reader_threads; ++i) reader.create_thread(boost::bind(&queue_stress_tester::template get_items, this, boost::ref(stk))); for (int i = 0; i != writer_threads; ++i) writer.create_thread(boost::bind(&queue_stress_tester::template add_items, this, boost::ref(stk))); std::cout << "threads created" << std::endl; writer.join_all(); std::cout << "writer threads joined, waiting for readers" << std::endl; reader.join_all(); std::cout << "reader threads joined" << std::endl; BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0); BOOST_REQUIRE(stk.empty()); BOOST_REQUIRE_EQUAL(push_count, pop_count); BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count); } }; } using impl::queue_stress_tester;