123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- // (C) Copyright 2012 Howard Hinnant
- // (C) Copyright 2012 Vicente Botet
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- // adapted from the example given by Howard Hinnant in
- #include <boost/config.hpp>
- #define BOOST_THREAD_VERSION 4
- #define BOOST_THREAD_QUEUE_DEPRECATE_OLD
- #if ! defined BOOST_NO_CXX11_DECLTYPE
- #define BOOST_RESULT_OF_USE_DECLTYPE
- #endif
- //#define XXXX
- #include <iostream>
- #include <boost/thread/scoped_thread.hpp>
- #ifdef XXXX
- #include <boost/thread/externally_locked_stream.hpp>
- typedef boost::externally_locked_stream<std::ostream> the_ostream;
- #else
- typedef std::ostream the_ostream;
- typedef std::istream the_istream;
- #endif
- #include <boost/thread/sync_bounded_queue.hpp>
- void producer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
- {
- using namespace boost;
- try {
- for(int i=0; ;++i)
- {
- sbq.push_back(i);
- //sbq << i;
- //mos << "push_back(" << i << ") "<< sbq.size()<<"\n";
- this_thread::sleep_for(chrono::milliseconds(200));
- }
- }
- catch(sync_queue_is_closed&)
- {
- //mos << "closed !!!\n";
- }
- catch(...)
- {
- //mos << "exception !!!\n";
- }
- }
- void consumer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
- {
- using namespace boost;
- try {
- for(int i=0; ;++i)
- {
- int r;
- sbq.pull_front(r);
- //sbq >> r;
- //mos << i << " pull_front(" << r << ") "<< sbq.size()<<"\n";
- this_thread::sleep_for(chrono::milliseconds(250));
- }
- }
- catch(sync_queue_is_closed&)
- {
- //mos << "closed !!!\n";
- }
- catch(...)
- {
- //mos << "exception !!!\n";
- }
- }
- void consumer2(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
- {
- using namespace boost;
- try {
- for(int i=0; ;++i)
- {
- int r;
- queue_op_status st = sbq.try_pull_front(r);
- if (queue_op_status::closed == st) break;
- if (queue_op_status::success == st) {
- //mos << i << " pull(" << r << ")\n";
- }
- this_thread::sleep_for(chrono::milliseconds(250));
- }
- }
- catch(...)
- {
- //mos << "exception !!!\n";
- }
- }
- //void consumer3(the_ostream &mos, boost::sync_bounded_queue<int> & sbq)
- //{
- // using namespace boost;
- // bool closed=false;
- // try {
- // for(int i=0; ;++i)
- // {
- // int r;
- // queue_op_status res = sbq.wait_and_pull(r);
- // if (res==queue_op_status::closed) break;
- // //mos << i << " wait_and_pull(" << r << ")\n";
- // this_thread::sleep_for(chrono::milliseconds(250));
- // }
- // }
- // catch(...)
- // {
- // //mos << "exception !!!\n";
- // }
- //}
- int main()
- {
- using namespace boost;
- #ifdef XXXX
- recursive_mutex terminal_mutex;
- externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
- externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
- externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
- #else
- the_ostream &mcerr = std::cout;
- the_ostream &mcout = std::cerr;
- //the_istream &mcin = std::cin;
- #endif
- sync_bounded_queue<int> sbq(10);
- {
- mcout << "begin of main" << std::endl;
- scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
- scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
- scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), boost::ref(sbq)));
- this_thread::sleep_for(chrono::seconds(1));
- sbq.close();
- mcout << "closed()" << std::endl;
- } // all threads joined here.
- mcout << "end of main" << std::endl;
- return 0;
- }
|