stream.hpp 11 KB


  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
  11. #include <boost/beast/core/bind_handler.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/beast/core/detail/service_base.hpp>
  14. #include <boost/beast/core/detail/is_invocable.hpp>
  15. #include <mutex>
  16. #include <stdexcept>
  17. #include <vector>
  18. namespace boost {
  19. namespace beast {
  20. namespace test {
  21. //------------------------------------------------------------------------------
  22. struct stream::service_impl
  23. {
  24. std::mutex m_;
  25. std::vector<state*> v_;
  26. BOOST_BEAST_DECL
  27. void
  28. remove(state& impl);
  29. };
  30. class stream::service
  31. : public beast::detail::service_base<service>
  32. {
  33. boost::shared_ptr<service_impl> sp_;
  34. BOOST_BEAST_DECL
  35. void
  36. shutdown() override;
  37. public:
  38. BOOST_BEAST_DECL
  39. explicit
  40. service(net::execution_context& ctx);
  41. BOOST_BEAST_DECL
  42. static
  43. auto
  44. make_impl(
  45. net::io_context& ctx,
  46. test::fail_count* fc) ->
  47. boost::shared_ptr<state>;
  48. };
  49. //------------------------------------------------------------------------------
  50. template<class Handler, class Buffers>
  51. class stream::read_op : public stream::read_op_base
  52. {
  53. using ex1_type =
  54. net::io_context::executor_type;
  55. using ex2_type
  56. = net::associated_executor_t<Handler, ex1_type>;
  57. struct lambda
  58. {
  59. Handler h_;
  60. boost::weak_ptr<state> wp_;
  61. Buffers b_;
  62. net::executor_work_guard<ex2_type> wg2_;
  63. lambda(lambda&&) = default;
  64. lambda(lambda const&) = default;
  65. template<class Handler_>
  66. lambda(
  67. Handler_&& h,
  68. boost::shared_ptr<state> const& s,
  69. Buffers const& b)
  70. : h_(std::forward<Handler_>(h))
  71. , wp_(s)
  72. , b_(b)
  73. , wg2_(net::get_associated_executor(
  74. h_, s->ioc.get_executor()))
  75. {
  76. }
  77. void
  78. operator()(error_code ec)
  79. {
  80. std::size_t bytes_transferred = 0;
  81. auto sp = wp_.lock();
  82. if(! sp)
  83. ec = net::error::operation_aborted;
  84. if(! ec)
  85. {
  86. std::lock_guard<std::mutex> lock(sp->m);
  87. BOOST_ASSERT(! sp->op);
  88. if(sp->b.size() > 0)
  89. {
  90. bytes_transferred =
  91. net::buffer_copy(
  92. b_, sp->b.data(), sp->read_max);
  93. sp->b.consume(bytes_transferred);
  94. sp->nread_bytes += bytes_transferred;
  95. }
  96. else if (buffer_bytes(b_) > 0)
  97. {
  98. ec = net::error::eof;
  99. }
  100. }
  101. auto alloc = net::get_associated_allocator(h_);
  102. wg2_.get_executor().dispatch(
  103. beast::bind_front_handler(std::move(h_),
  104. ec, bytes_transferred), alloc);
  105. wg2_.reset();
  106. }
  107. };
  108. lambda fn_;
  109. net::executor_work_guard<ex1_type> wg1_;
  110. public:
  111. template<class Handler_>
  112. read_op(
  113. Handler_&& h,
  114. boost::shared_ptr<state> const& s,
  115. Buffers const& b)
  116. : fn_(std::forward<Handler_>(h), s, b)
  117. , wg1_(s->ioc.get_executor())
  118. {
  119. }
  120. void
  121. operator()(error_code ec) override
  122. {
  123. auto alloc = net::get_associated_allocator(fn_.h_);
  124. wg1_.get_executor().post(
  125. beast::bind_front_handler(std::move(fn_), ec), alloc);
  126. wg1_.reset();
  127. }
  128. };
  129. struct stream::run_read_op
  130. {
  131. template<
  132. class ReadHandler,
  133. class MutableBufferSequence>
  134. void
  135. operator()(
  136. ReadHandler&& h,
  137. boost::shared_ptr<state> const& in,
  138. MutableBufferSequence const& buffers)
  139. {
  140. // If you get an error on the following line it means
  141. // that your handler does not meet the documented type
  142. // requirements for the handler.
  143. static_assert(
  144. beast::detail::is_invocable<ReadHandler,
  145. void(error_code, std::size_t)>::value,
  146. "ReadHandler type requirements not met");
  147. initiate_read(
  148. in,
  149. std::unique_ptr<read_op_base>{
  150. new read_op<
  151. typename std::decay<ReadHandler>::type,
  152. MutableBufferSequence>(
  153. std::move(h),
  154. in,
  155. buffers)},
  156. buffer_bytes(buffers));
  157. }
  158. };
  159. struct stream::run_write_op
  160. {
  161. template<
  162. class WriteHandler,
  163. class ConstBufferSequence>
  164. void
  165. operator()(
  166. WriteHandler&& h,
  167. boost::shared_ptr<state> in_,
  168. boost::weak_ptr<state> out_,
  169. ConstBufferSequence const& buffers)
  170. {
  171. // If you get an error on the following line it means
  172. // that your handler does not meet the documented type
  173. // requirements for the handler.
  174. static_assert(
  175. beast::detail::is_invocable<WriteHandler,
  176. void(error_code, std::size_t)>::value,
  177. "WriteHandler type requirements not met");
  178. ++in_->nwrite;
  179. auto const upcall = [&](error_code ec, std::size_t n)
  180. {
  181. net::post(
  182. in_->ioc.get_executor(),
  183. beast::bind_front_handler(std::move(h), ec, n));
  184. };
  185. // test failure
  186. error_code ec;
  187. std::size_t n = 0;
  188. if(in_->fc && in_->fc->fail(ec))
  189. return upcall(ec, n);
  190. // A request to write 0 bytes to a stream is a no-op.
  191. if(buffer_bytes(buffers) == 0)
  192. return upcall(ec, n);
  193. // connection closed
  194. auto out = out_.lock();
  195. if(! out)
  196. return upcall(net::error::connection_reset, n);
  197. // copy buffers
  198. n = std::min<std::size_t>(
  199. buffer_bytes(buffers), in_->write_max);
  200. {
  201. std::lock_guard<std::mutex> lock(out->m);
  202. n = net::buffer_copy(out->b.prepare(n), buffers);
  203. out->b.commit(n);
  204. out->nwrite_bytes += n;
  205. out->notify_read();
  206. }
  207. BOOST_ASSERT(! ec);
  208. upcall(ec, n);
  209. }
  210. };
  211. //------------------------------------------------------------------------------
  212. template<class MutableBufferSequence>
  213. std::size_t
  214. stream::
  215. read_some(MutableBufferSequence const& buffers)
  216. {
  217. static_assert(net::is_mutable_buffer_sequence<
  218. MutableBufferSequence>::value,
  219. "MutableBufferSequence type requirements not met");
  220. error_code ec;
  221. auto const n = read_some(buffers, ec);
  222. if(ec)
  223. BOOST_THROW_EXCEPTION(system_error{ec});
  224. return n;
  225. }
  226. template<class MutableBufferSequence>
  227. std::size_t
  228. stream::
  229. read_some(MutableBufferSequence const& buffers,
  230. error_code& ec)
  231. {
  232. static_assert(net::is_mutable_buffer_sequence<
  233. MutableBufferSequence>::value,
  234. "MutableBufferSequence type requirements not met");
  235. ++in_->nread;
  236. // test failure
  237. if(in_->fc && in_->fc->fail(ec))
  238. return 0;
  239. // A request to read 0 bytes from a stream is a no-op.
  240. if(buffer_bytes(buffers) == 0)
  241. {
  242. ec = {};
  243. return 0;
  244. }
  245. std::unique_lock<std::mutex> lock{in_->m};
  246. BOOST_ASSERT(! in_->op);
  247. in_->cv.wait(lock,
  248. [&]()
  249. {
  250. return
  251. in_->b.size() > 0 ||
  252. in_->code != status::ok;
  253. });
  254. // deliver bytes before eof
  255. if(in_->b.size() > 0)
  256. {
  257. auto const n = net::buffer_copy(
  258. buffers, in_->b.data(), in_->read_max);
  259. in_->b.consume(n);
  260. in_->nread_bytes += n;
  261. return n;
  262. }
  263. // deliver error
  264. BOOST_ASSERT(in_->code != status::ok);
  265. ec = net::error::eof;
  266. return 0;
  267. }
  268. template<class MutableBufferSequence, class ReadHandler>
  269. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  270. stream::
  271. async_read_some(
  272. MutableBufferSequence const& buffers,
  273. ReadHandler&& handler)
  274. {
  275. static_assert(net::is_mutable_buffer_sequence<
  276. MutableBufferSequence>::value,
  277. "MutableBufferSequence type requirements not met");
  278. return net::async_initiate<
  279. ReadHandler,
  280. void(error_code, std::size_t)>(
  281. run_read_op{},
  282. handler,
  283. in_,
  284. buffers);
  285. }
  286. template<class ConstBufferSequence>
  287. std::size_t
  288. stream::
  289. write_some(ConstBufferSequence const& buffers)
  290. {
  291. static_assert(net::is_const_buffer_sequence<
  292. ConstBufferSequence>::value,
  293. "ConstBufferSequence type requirements not met");
  294. error_code ec;
  295. auto const bytes_transferred =
  296. write_some(buffers, ec);
  297. if(ec)
  298. BOOST_THROW_EXCEPTION(system_error{ec});
  299. return bytes_transferred;
  300. }
  301. template<class ConstBufferSequence>
  302. std::size_t
  303. stream::
  304. write_some(
  305. ConstBufferSequence const& buffers, error_code& ec)
  306. {
  307. static_assert(net::is_const_buffer_sequence<
  308. ConstBufferSequence>::value,
  309. "ConstBufferSequence type requirements not met");
  310. ++in_->nwrite;
  311. // test failure
  312. if(in_->fc && in_->fc->fail(ec))
  313. return 0;
  314. // A request to write 0 bytes to a stream is a no-op.
  315. if(buffer_bytes(buffers) == 0)
  316. {
  317. ec = {};
  318. return 0;
  319. }
  320. // connection closed
  321. auto out = out_.lock();
  322. if(! out)
  323. {
  324. ec = net::error::connection_reset;
  325. return 0;
  326. }
  327. // copy buffers
  328. auto n = std::min<std::size_t>(
  329. buffer_bytes(buffers), in_->write_max);
  330. {
  331. std::lock_guard<std::mutex> lock(out->m);
  332. n = net::buffer_copy(out->b.prepare(n), buffers);
  333. out->b.commit(n);
  334. out->nwrite_bytes += n;
  335. out->notify_read();
  336. }
  337. return n;
  338. }
  339. template<class ConstBufferSequence, class WriteHandler>
  340. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  341. stream::
  342. async_write_some(
  343. ConstBufferSequence const& buffers,
  344. WriteHandler&& handler)
  345. {
  346. static_assert(net::is_const_buffer_sequence<
  347. ConstBufferSequence>::value,
  348. "ConstBufferSequence type requirements not met");
  349. return net::async_initiate<
  350. WriteHandler,
  351. void(error_code, std::size_t)>(
  352. run_write_op{},
  353. handler,
  354. in_,
  355. out_,
  356. buffers);
  357. }
  358. //------------------------------------------------------------------------------
  359. template<class TeardownHandler>
  360. void
  361. async_teardown(
  362. role_type,
  363. stream& s,
  364. TeardownHandler&& handler)
  365. {
  366. error_code ec;
  367. if( s.in_->fc &&
  368. s.in_->fc->fail(ec))
  369. return net::post(
  370. s.get_executor(),
  371. beast::bind_front_handler(
  372. std::move(handler), ec));
  373. s.close();
  374. if( s.in_->fc &&
  375. s.in_->fc->fail(ec))
  376. ec = net::error::eof;
  377. else
  378. ec = {};
  379. net::post(
  380. s.get_executor(),
  381. beast::bind_front_handler(
  382. std::move(handler), ec));
  383. }
  384. //------------------------------------------------------------------------------
  385. template<class Arg1, class... ArgN>
  386. stream
  387. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  388. {
  389. stream from{
  390. std::forward<Arg1>(arg1),
  391. std::forward<ArgN>(argn)...};
  392. from.connect(to);
  393. return from;
  394. }
  395. } // test
  396. } // beast
  397. } // boost
  398. #endif