util_ipc_reliable_mq.cpp 17 KB


  1. /*
  2. * Copyright Lingxi Li 2015.
  3. * Copyright Andrey Semashev 2016.
  4. * Distributed under the Boost Software License, Version 1.0.
  5. * (See accompanying file LICENSE_1_0.txt or copy at
  6. * http://www.boost.org/LICENSE_1_0.txt)
  7. */
  8. /*!
  9. * \file util_ipc_reliable_mq.cpp
  10. * \author Lingxi Li
  11. * \author Andrey Semashev
  12. * \date 19.10.2015
  13. *
  14. * \brief The test verifies that \c ipc::reliable_message_queue works.
  15. */
  16. #if !defined(BOOST_LOG_WITHOUT_IPC)
  17. #define BOOST_TEST_MODULE util_ipc_reliable_mq
  18. #include <boost/log/utility/ipc/reliable_message_queue.hpp>
  19. #include <boost/log/utility/ipc/object_name.hpp>
  20. #include <boost/log/utility/permissions.hpp>
  21. #include <boost/log/utility/open_mode.hpp>
  22. #include <boost/log/exceptions.hpp>
  23. #include <boost/test/unit_test.hpp>
  24. #include <cstddef>
  25. #include <cstring>
  26. #include <string>
  27. #include <vector>
  28. #include <iostream>
  29. #include <stdexcept>
  30. #include <boost/move/utility_core.hpp>
  31. #if !defined(BOOST_LOG_NO_THREADS)
  32. #include <algorithm>
  33. #include <boost/ref.hpp>
  34. #include <boost/atomic/fences.hpp>
  35. #include <boost/thread/thread.hpp>
  36. #include <boost/chrono/duration.hpp>
  37. #endif
  38. #include "char_definitions.hpp"
  39. typedef boost::log::ipc::reliable_message_queue queue_t;
  40. typedef queue_t::size_type size_type;
  41. const boost::log::ipc::object_name ipc_queue_name(boost::log::ipc::object_name::session, "boost_log_test_ipc_reliable_mq");
  42. const unsigned int capacity = 512;
  43. const size_type block_size = 1024;
  44. const char message1[] = "Hello, world!";
  45. const char message2[] = "Hello, the brand new world!";
  46. BOOST_AUTO_TEST_CASE(basic_functionality)
  47. {
  48. // Default constructor.
  49. {
  50. queue_t queue;
  51. BOOST_CHECK(!queue.is_open());
  52. }
  53. // Do a remove in case if a previous test failed
  54. queue_t::remove(ipc_queue_name);
  55. // Opening a non-existing queue
  56. try
  57. {
  58. queue_t queue(boost::log::open_mode::open_only, ipc_queue_name);
  59. BOOST_FAIL("Non-existing queue open succeeded, although it shouldn't have");
  60. }
  61. catch (std::exception&)
  62. {
  63. BOOST_TEST_PASSPOINT();
  64. }
  65. // Create constructor and destructor.
  66. {
  67. queue_t queue(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  68. BOOST_CHECK(equal_strings(queue.name().c_str(), ipc_queue_name.c_str()));
  69. BOOST_CHECK(queue.is_open());
  70. BOOST_CHECK_EQUAL(queue.capacity(), capacity);
  71. BOOST_CHECK_EQUAL(queue.block_size(), block_size);
  72. }
  73. // Creating a duplicate queue
  74. try
  75. {
  76. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  77. queue_t queue_b(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  78. BOOST_FAIL("Creating a duplicate queue succeeded, although it shouldn't have");
  79. }
  80. catch (std::exception&)
  81. {
  82. BOOST_TEST_PASSPOINT();
  83. }
  84. // Opening an existing queue
  85. {
  86. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  87. BOOST_CHECK(queue_a.is_open());
  88. queue_t queue_b(boost::log::open_mode::open_or_create, ipc_queue_name, capacity * 2u, block_size * 2u); // queue geometry differs from the existing queue
  89. BOOST_CHECK(queue_b.is_open());
  90. BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
  91. BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
  92. BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
  93. queue_t queue_c(boost::log::open_mode::open_only, ipc_queue_name);
  94. BOOST_CHECK(queue_c.is_open());
  95. BOOST_CHECK(equal_strings(queue_c.name().c_str(), ipc_queue_name.c_str()));
  96. BOOST_CHECK_EQUAL(queue_c.capacity(), capacity);
  97. BOOST_CHECK_EQUAL(queue_c.block_size(), block_size);
  98. }
  99. // Closing a queue
  100. {
  101. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  102. BOOST_CHECK(queue_a.is_open());
  103. queue_a.close();
  104. BOOST_CHECK(!queue_a.is_open());
  105. // Duplicate close()
  106. queue_a.close();
  107. BOOST_CHECK(!queue_a.is_open());
  108. }
  109. // Move constructor.
  110. {
  111. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  112. queue_t queue_b(boost::move(queue_a));
  113. BOOST_CHECK(!queue_a.is_open());
  114. BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
  115. BOOST_CHECK(queue_b.is_open());
  116. BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
  117. BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
  118. }
  119. // Move assignment operator.
  120. {
  121. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  122. queue_t queue_b;
  123. queue_b = boost::move(queue_a);
  124. BOOST_CHECK(!queue_a.is_open());
  125. BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
  126. BOOST_CHECK(queue_b.is_open());
  127. BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
  128. BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
  129. }
  130. // Member and non-member swaps.
  131. {
  132. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, capacity, block_size);
  133. queue_a.swap(queue_a);
  134. BOOST_CHECK(queue_a.is_open());
  135. BOOST_CHECK(equal_strings(queue_a.name().c_str(), ipc_queue_name.c_str()));
  136. BOOST_CHECK_EQUAL(queue_a.capacity(), capacity);
  137. BOOST_CHECK_EQUAL(queue_a.block_size(), block_size);
  138. queue_t queue_b;
  139. swap(queue_a, queue_b);
  140. BOOST_CHECK(!queue_a.is_open());
  141. BOOST_CHECK(queue_b.is_open());
  142. BOOST_CHECK(equal_strings(queue_b.name().c_str(), ipc_queue_name.c_str()));
  143. BOOST_CHECK_EQUAL(queue_b.capacity(), capacity);
  144. BOOST_CHECK_EQUAL(queue_b.block_size(), block_size);
  145. }
  146. }
  147. BOOST_AUTO_TEST_CASE(message_passing)
  148. {
  149. // try_send() and try_receive()
  150. {
  151. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
  152. queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
  153. BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
  154. BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
  155. char buffer[block_size] = {};
  156. size_type message_size = 0u;
  157. BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
  158. BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
  159. BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
  160. BOOST_CHECK(!queue_b.try_receive(buffer, sizeof(buffer), message_size));
  161. BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
  162. std::string msg;
  163. BOOST_CHECK(queue_b.try_receive(msg));
  164. BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
  165. BOOST_CHECK_EQUAL(msg, message2);
  166. BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
  167. std::vector< unsigned char > buf;
  168. BOOST_CHECK(queue_b.try_receive(buf));
  169. BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
  170. BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
  171. }
  172. // send() and receive() without blocking
  173. {
  174. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
  175. queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
  176. BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
  177. char buffer[block_size] = {};
  178. size_type message_size = 0u;
  179. BOOST_CHECK(queue_b.receive(buffer, sizeof(buffer), message_size) == queue_t::succeeded);
  180. BOOST_CHECK_EQUAL(message_size, sizeof(message1) - 1u);
  181. BOOST_CHECK(std::memcmp(buffer, message1, message_size) == 0);
  182. BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
  183. std::string msg;
  184. BOOST_CHECK(queue_b.receive(msg) == queue_t::succeeded);
  185. BOOST_CHECK_EQUAL(msg.size(), sizeof(message2) - 1u);
  186. BOOST_CHECK_EQUAL(msg, message2);
  187. BOOST_CHECK(queue_a.send(message2, sizeof(message2) - 1u) == queue_t::succeeded);
  188. std::vector< unsigned char > buf;
  189. BOOST_CHECK(queue_b.receive(buf) == queue_t::succeeded);
  190. BOOST_CHECK_EQUAL(buf.size(), sizeof(message2) - 1u);
  191. BOOST_CHECK(std::memcmp(&buf[0], message2, buf.size()) == 0);
  192. }
  193. // send() with an error code on overflow
  194. {
  195. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::fail_on_overflow);
  196. BOOST_TEST_PASSPOINT();
  197. BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
  198. BOOST_TEST_PASSPOINT();
  199. queue_t::operation_result res = queue_a.send(message1, sizeof(message1) - 1u);
  200. BOOST_CHECK_EQUAL(res, queue_t::no_space);
  201. }
  202. // send() with an exception on overflow
  203. {
  204. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size, queue_t::throw_on_overflow);
  205. BOOST_TEST_PASSPOINT();
  206. BOOST_CHECK(queue_a.send(message1, sizeof(message1) - 1u) == queue_t::succeeded);
  207. BOOST_TEST_PASSPOINT();
  208. try
  209. {
  210. queue_a.send(message1, sizeof(message1) - 1u);
  211. BOOST_FAIL("Owerflowing the queue succeeded, although it shouldn't have");
  212. }
  213. catch (boost::log::capacity_limit_reached&)
  214. {
  215. BOOST_TEST_PASSPOINT();
  216. }
  217. }
  218. // send() and receive() for messages larger than block_size. The message size and queue capacity below are such
  219. // that the last enqueued message is expected to be split in the queue storage.
  220. {
  221. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 5u, block_size);
  222. queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
  223. const size_type message_size = block_size * 3u / 2u;
  224. std::vector< unsigned char > send_data;
  225. send_data.resize(message_size);
  226. for (unsigned int i = 0; i < message_size; ++i)
  227. send_data[i] = static_cast< unsigned char >(i & 0xFF);
  228. BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
  229. for (unsigned int i = 0; i < 3; ++i)
  230. {
  231. BOOST_CHECK(queue_a.send(&send_data[0], static_cast< size_type >(send_data.size())) == queue_t::succeeded);
  232. std::vector< unsigned char > receive_data;
  233. BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
  234. BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
  235. }
  236. std::vector< unsigned char > receive_data;
  237. BOOST_CHECK(queue_b.receive(receive_data) == queue_t::succeeded);
  238. BOOST_CHECK_EQUAL_COLLECTIONS(send_data.begin(), send_data.end(), receive_data.begin(), receive_data.end());
  239. }
  240. // clear()
  241. {
  242. queue_t queue_a(boost::log::open_mode::create_only, ipc_queue_name, 1u, block_size);
  243. queue_t queue_b(boost::log::open_mode::open_only, ipc_queue_name);
  244. BOOST_CHECK(queue_a.try_send(message1, sizeof(message1) - 1u));
  245. BOOST_CHECK(!queue_a.try_send(message2, sizeof(message2) - 1u));
  246. queue_a.clear();
  247. BOOST_CHECK(queue_a.try_send(message2, sizeof(message2) - 1u));
  248. char buffer[block_size] = {};
  249. size_type message_size = 0u;
  250. BOOST_CHECK(queue_b.try_receive(buffer, sizeof(buffer), message_size));
  251. BOOST_CHECK_EQUAL(message_size, sizeof(message2) - 1u);
  252. BOOST_CHECK(std::memcmp(buffer, message2, message_size) == 0);
  253. }
  254. }
  255. #if !defined(BOOST_LOG_NO_THREADS)
  256. namespace {
  257. const unsigned int message_count = 100000;
  258. void multithreaded_message_passing_feeding_thread(const char* message, unsigned int& failure_count)
  259. {
  260. const size_type len = static_cast< size_type >(std::strlen(message));
  261. queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
  262. for (unsigned int i = 0; i < message_count; ++i)
  263. {
  264. failure_count += queue.send(message, len) != queue_t::succeeded;
  265. }
  266. boost::atomic_thread_fence(boost::memory_order_release);
  267. }
  268. } // namespace
  269. BOOST_AUTO_TEST_CASE(multithreaded_message_passing)
  270. {
  271. unsigned int failure_count1 = 0, failure_count2 = 0, failure_count3 = 0;
  272. boost::atomic_thread_fence(boost::memory_order_release);
  273. boost::thread thread1(&multithreaded_message_passing_feeding_thread, "Thread 1", boost::ref(failure_count1));
  274. boost::thread thread2(&multithreaded_message_passing_feeding_thread, "Thread 2", boost::ref(failure_count2));
  275. boost::thread thread3(&multithreaded_message_passing_feeding_thread, "Thread 3", boost::ref(failure_count3));
  276. BOOST_TEST_PASSPOINT();
  277. queue_t queue(boost::log::open_mode::open_or_create, ipc_queue_name, capacity, block_size);
  278. unsigned int receive_failures = 0, receive_corruptions = 0;
  279. unsigned int message_count1 = 0, message_count2 = 0, message_count3 = 0;
  280. std::string msg;
  281. BOOST_TEST_PASSPOINT();
  282. for (unsigned int i = 0; i < message_count * 3u; ++i)
  283. {
  284. msg.clear();
  285. if (queue.receive(msg) == queue_t::succeeded)
  286. {
  287. if (msg == "Thread 1")
  288. ++message_count1;
  289. else if (msg == "Thread 2")
  290. ++message_count2;
  291. else if (msg == "Thread 3")
  292. ++message_count3;
  293. else
  294. ++receive_corruptions;
  295. }
  296. else
  297. ++receive_failures;
  298. }
  299. BOOST_TEST_PASSPOINT();
  300. thread1.join();
  301. BOOST_TEST_PASSPOINT();
  302. thread2.join();
  303. BOOST_TEST_PASSPOINT();
  304. thread3.join();
  305. boost::atomic_thread_fence(boost::memory_order_acquire);
  306. BOOST_CHECK_EQUAL(failure_count1, 0u);
  307. BOOST_CHECK_EQUAL(message_count1, message_count);
  308. BOOST_CHECK_EQUAL(failure_count2, 0u);
  309. BOOST_CHECK_EQUAL(message_count2, message_count);
  310. BOOST_CHECK_EQUAL(failure_count3, 0u);
  311. BOOST_CHECK_EQUAL(message_count3, message_count);
  312. BOOST_CHECK_EQUAL(receive_failures, 0u);
  313. BOOST_CHECK_EQUAL(receive_corruptions, 0u);
  314. }
  315. namespace {
  316. void stop_reset_feeding_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
  317. {
  318. for (unsigned int i = 0; i < count; ++i)
  319. {
  320. results[i] = queue.send(message1, sizeof(message1) - 1u);
  321. if (results[i] != queue_t::succeeded)
  322. break;
  323. }
  324. boost::atomic_thread_fence(boost::memory_order_release);
  325. }
  326. void stop_reset_reading_thread(queue_t& queue, queue_t::operation_result* results, unsigned int count)
  327. {
  328. std::string msg;
  329. for (unsigned int i = 0; i < count; ++i)
  330. {
  331. msg.clear();
  332. results[i] = queue.receive(msg);
  333. if (results[i] != queue_t::succeeded)
  334. break;
  335. }
  336. boost::atomic_thread_fence(boost::memory_order_release);
  337. }
  338. } // namespace
  339. BOOST_AUTO_TEST_CASE(stop_reset_local)
  340. {
  341. queue_t feeder_queue(boost::log::open_mode::open_or_create, ipc_queue_name, 1u, block_size);
  342. queue_t::operation_result feeder_results[3];
  343. queue_t reader_queue(boost::log::open_mode::open_only, ipc_queue_name);
  344. queue_t::operation_result reader_results[3];
  345. std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
  346. std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
  347. boost::atomic_thread_fence(boost::memory_order_release);
  348. BOOST_TEST_PASSPOINT();
  349. // Case 1: Let the feeder block and then we unblock it with stop_local()
  350. boost::thread feeder_thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 3);
  351. boost::thread reader_thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 1);
  352. BOOST_TEST_PASSPOINT();
  353. reader_thread.join();
  354. BOOST_TEST_PASSPOINT();
  355. boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
  356. BOOST_TEST_PASSPOINT();
  357. feeder_queue.stop_local();
  358. BOOST_TEST_PASSPOINT();
  359. feeder_thread.join();
  360. boost::atomic_thread_fence(boost::memory_order_acquire);
  361. BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
  362. BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
  363. BOOST_CHECK_EQUAL(feeder_results[2], queue_t::aborted);
  364. BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
  365. // Reset the aborted queue
  366. feeder_queue.reset_local();
  367. feeder_queue.clear();
  368. std::fill_n(feeder_results, sizeof(feeder_results) / sizeof(*feeder_results), queue_t::succeeded);
  369. std::fill_n(reader_results, sizeof(reader_results) / sizeof(*reader_results), queue_t::succeeded);
  370. boost::atomic_thread_fence(boost::memory_order_release);
  371. BOOST_TEST_PASSPOINT();
  372. // Case 2: Let the reader block and then we unblock it with stop_local()
  373. boost::thread(&stop_reset_feeding_thread, boost::ref(feeder_queue), feeder_results, 1).swap(feeder_thread);
  374. boost::thread(&stop_reset_reading_thread, boost::ref(reader_queue), reader_results, 2).swap(reader_thread);
  375. BOOST_TEST_PASSPOINT();
  376. feeder_thread.join();
  377. BOOST_TEST_PASSPOINT();
  378. boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
  379. BOOST_TEST_PASSPOINT();
  380. reader_queue.stop_local();
  381. BOOST_TEST_PASSPOINT();
  382. reader_thread.join();
  383. boost::atomic_thread_fence(boost::memory_order_acquire);
  384. BOOST_CHECK_EQUAL(feeder_results[0], queue_t::succeeded);
  385. BOOST_CHECK_EQUAL(feeder_results[1], queue_t::succeeded);
  386. BOOST_CHECK_EQUAL(reader_results[0], queue_t::succeeded);
  387. BOOST_CHECK_EQUAL(reader_results[1], queue_t::aborted);
  388. }
  389. #endif // !defined(BOOST_LOG_NO_THREADS)
  390. #else // !defined(BOOST_LOG_WITHOUT_IPC)
  391. int main()
  392. {
  393. return 0;
  394. }
  395. #endif // !defined(BOOST_LOG_WITHOUT_IPC)