core_3_timeouts.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #include "snippets.hpp"
  10. #include <boost/beast/_experimental/unit_test/suite.hpp>
  11. #include <boost/beast/_experimental/test/stream.hpp>
  12. #include <boost/beast/core/async_base.hpp>
  13. #include <boost/beast/core/buffers_prefix.hpp>
  14. #include <boost/beast/core/error.hpp>
  15. #include <boost/beast/core/flat_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/tcp_stream.hpp>
  18. #include <boost/beast/http.hpp>
  19. #include <boost/beast/ssl/ssl_stream.hpp>
  20. #include <boost/asio/buffer.hpp>
  21. #include <boost/asio/read.hpp>
  22. #include <boost/asio/spawn.hpp>
  23. #include <cstdlib>
  24. #include <utility>
  25. #include <string>
  26. namespace boost {
  27. namespace beast {
  28. namespace {
  29. struct handler_type
  30. {
  31. template<class... Args>
  32. void operator()(Args&&...)
  33. {
  34. }
  35. };
  36. void
  37. core_3_timeouts_snippets()
  38. {
  39. handler_type handler;
  40. #include "snippets.ipp"
  41. {
  42. //[code_core_3_timeouts_1
  43. // `ioc` will be used to dispatch completion handlers
  44. tcp_stream stream(ioc);
  45. //]
  46. }
  47. {
  48. //[code_core_3_timeouts_2
  49. // The resolver is used to look up the IP addresses for a domain name
  50. net::ip::tcp::resolver resolver(ioc);
  51. // The stream will use the same executor as the resolver
  52. tcp_stream stream(resolver.get_executor());
  53. //]
  54. }
  55. {
  56. //[code_core_3_timeouts_3
  57. // The strand will be used to invoke all completion handlers
  58. tcp_stream stream(net::make_strand(ioc));
  59. //]
  60. net::ip::tcp::resolver resolver(ioc);
  61. //[code_core_3_timeouts_4
  62. // Set the logical operation timer to 30 seconds
  63. stream.expires_after (std::chrono::seconds(30));
  64. // If the connection is not established within 30 seconds,
  65. // the operation will be canceled and the handler will receive
  66. // error::timeout as the error code.
  67. stream.async_connect(resolver.resolve("www.example.com", "http"),
  68. [](error_code ec, net::ip::tcp::endpoint ep)
  69. {
  70. if(ec == error::timeout)
  71. std::cerr << "async_connect took too long\n";
  72. else if(! ec)
  73. std::cout << "Connected to " << ep << "\n";
  74. }
  75. );
  76. // The timer is still running. If we don't want the next
  77. // operation to time out 30 seconds relative to the previous
  78. // call to `expires_after`, we need to turn it off before
  79. // starting another asynchronous operation.
  80. stream.expires_never();
  81. //]
  82. }
  83. {
  84. //[code_core_3_timeouts_5
  85. // The acceptor is used to listen and accept incoming connections.
  86. // We construct the acceptor to use a new strand, and listen
  87. // on the loopback address with an operating-system assigned port.
  88. net::ip::tcp::acceptor acceptor(net::make_strand(ioc));
  89. acceptor.bind(net::ip::tcp::endpoint(net::ip::make_address_v4("127.0.0.1"), 0));
  90. acceptor.listen(0);
  91. // This blocks until a new incoming connection is established.
  92. // Upon success, the function returns a new socket which is
  93. // connected to the peer. The socket will have its own executor,
  94. // which in the call below is a new strand for the I/O context.
  95. net::ip::tcp::socket s = acceptor.accept(net::make_strand(ioc));
  96. // Construct a new tcp_stream from the connected socket.
  97. // The stream will use the strand created when the connection
  98. // was accepted.
  99. tcp_stream stream(std::move(s));
  100. //]
  101. }
  102. {
  103. tcp_stream stream(ioc);
  104. //[code_core_3_timeouts_6
  105. std::string s;
  106. // Set the logical operation timer to 30 seconds.
  107. stream.expires_after (std::chrono::seconds(30));
  108. // Read a line from the stream into the string.
  109. net::async_read_until(stream, net::dynamic_buffer(s), '\n',
  110. [&s, &stream](error_code ec, std::size_t bytes_transferred)
  111. {
  112. if(ec)
  113. return;
  114. // read_until can read past the '\n', these will end up in
  115. // our buffer but we don't want to echo those extra received
  116. // bytes. `bytes_transferred` will be the number of bytes
  117. // up to and including the '\n'. We use `buffers_prefix` so
  118. // that extra data is not written.
  119. net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(s)),
  120. [&s](error_code ec, std::size_t bytes_transferred)
  121. {
  122. // Consume the line from the buffer
  123. s.erase(s.begin(), s.begin() + bytes_transferred);
  124. if(ec)
  125. std::cerr << "Error: " << ec.message() << "\n";
  126. });
  127. });
  128. //]
  129. }
  130. {
  131. tcp_stream stream(ioc);
  132. //[code_core_3_timeouts_7
  133. std::string s1;
  134. std::string s2;
  135. // Set the logical operation timer to 15 seconds.
  136. stream.expires_after (std::chrono::seconds(15));
  137. // Read another line from the stream into our dynamic buffer.
  138. // The operation will time out after 15 seconds.
  139. net::async_read_until(stream, net::dynamic_buffer(s1), '\n', handler);
  140. // Set the logical operation timer to 30 seconds.
  141. stream.expires_after (std::chrono::seconds(30));
  142. // Write the contents of the other buffer.
  143. // This operation will time out after 30 seconds.
  144. net::async_write(stream, net::buffer(s2), handler);
  145. //]
  146. }
  147. {
  148. //[code_core_3_timeouts_8
  149. // To declare a stream with a rate policy, it is necessary to
  150. // write out all of the template parameter types.
  151. //
  152. // `simple_rate_policy` is default constructible, but
  153. // if the choice of RatePolicy is not DefaultConstructible,
  154. // an instance of the type may be passed to the constructor.
  155. basic_stream<net::ip::tcp, net::executor, simple_rate_policy> stream(ioc);
  156. // The policy object, which is default constructed, or
  157. // decay-copied upon construction, is attached to the stream
  158. // and may be accessed through the function `rate_policy`.
  159. //
  160. // Here we set individual rate limits for reading and writing
  161. stream.rate_policy().read_limit(10000); // bytes per second
  162. stream.rate_policy().write_limit(850000); // bytes per second
  163. //]
  164. }
  165. }
  166. //[code_core_3_timeouts_1f
  167. /** This function echoes back received lines from a peer, with a timeout.
  168. The algorithm terminates upon any error (including timeout).
  169. */
  170. template <class Protocol, class Executor>
  171. void do_async_echo (basic_stream<Protocol, Executor>& stream)
  172. {
  173. // This object will hold our state when reading the line.
  174. struct echo_line
  175. {
  176. basic_stream<Protocol, Executor>& stream;
  177. // The shared pointer is used to extend the lifetime of the
  178. // string until the last asynchronous operation completes.
  179. std::shared_ptr<std::string> s;
  180. // This starts a new operation to read and echo a line
  181. void operator()()
  182. {
  183. // If a line is not sent and received within 30 seconds, then
  184. // the connection will be closed and this algorithm will terminate.
  185. stream.expires_after(std::chrono::seconds(30));
  186. // Read a line from the stream into our dynamic buffer, with a timeout
  187. net::async_read_until(stream, net::dynamic_buffer(*s), '\n', std::move(*this));
  188. }
  189. // This function is called when the read completes
  190. void operator()(error_code ec, std::size_t bytes_transferred)
  191. {
  192. if(ec)
  193. return;
  194. net::async_write(stream, buffers_prefix(bytes_transferred, net::buffer(*s)),
  195. [this](error_code ec, std::size_t bytes_transferred)
  196. {
  197. s->erase(s->begin(), s->begin() + bytes_transferred);
  198. if(! ec)
  199. {
  200. // Run this algorithm again
  201. echo_line{stream, std::move(s)}();
  202. }
  203. else
  204. {
  205. std::cerr << "Error: " << ec.message() << "\n";
  206. }
  207. });
  208. }
  209. };
  210. // Create the operation and run it
  211. echo_line{stream, std::make_shared<std::string>()}();
  212. }
  213. //]
  214. //[code_core_3_timeouts_2f
  215. /** Request an HTTP resource from a TLS host and return it as a string, with a timeout.
  216. This example uses fibers (stackful coroutines) and its own I/O context.
  217. */
  218. std::string
  219. https_get (std::string const& host, std::string const& target, error_code& ec)
  220. {
  221. // It is the responsibility of the algorithm to clear the error first.
  222. ec = {};
  223. // We use our own I/O context, to make this function blocking.
  224. net::io_context ioc;
  225. // This context is used to hold client and server certificates.
  226. // We do not perform certificate verification in this example.
  227. net::ssl::context ctx(net::ssl::context::tlsv12);
  228. // This string will hold the body of the HTTP response, if any.
  229. std::string result;
  230. // Note that Networking TS does not come with spawn. This function
  231. // launches a "fiber" which is a coroutine that has its own separately
  232. // allocated stack.
  233. boost::asio::spawn(ioc,
  234. [&](boost::asio::yield_context yield)
  235. {
  236. // We use the Beast ssl_stream wrapped around a beast tcp_stream.
  237. ssl_stream<tcp_stream> stream(ioc, ctx);
  238. // The resolver will be used to look up the IP addresses for the host name
  239. net::ip::tcp::resolver resolver(ioc);
  240. // First, look up the name. Networking has its own timeout for this.
  241. // The `yield` object is a CompletionToken which specializes the
  242. // `net::async_result` customization point to make the fiber work.
  243. //
  244. // This call will appear to "block" until the operation completes.
  245. // It isn't really blocking. Instead, the fiber implementation saves
  246. // the call stack and suspends the function until the asynchronous
  247. // operation is complete. Then it restores the call stack, and resumes
  248. // the function to the statement following the async_resolve. This
  249. // allows an asynchronous algorithm to be expressed synchronously.
  250. auto const endpoints = resolver.async_resolve(host, "https", {}, yield[ec]);
  251. if(ec)
  252. return;
  253. // The function `get_lowest_layer` retrieves the "bottom most" object
  254. // in the stack of stream layers. In this case it will be the tcp_stream.
  255. // This timeout will apply to all subsequent operations collectively.
  256. // That is to say, they must all complete within the same 30 second
  257. // window.
  258. get_lowest_layer(stream).expires_after(std::chrono::seconds(30));
  259. // `tcp_stream` range connect algorithms are member functions, unlike net::
  260. get_lowest_layer(stream).async_connect(endpoints, yield[ec]);
  261. if(ec)
  262. return;
  263. // Perform the TLS handshake
  264. stream.async_handshake(net::ssl::stream_base::client, yield[ec]);
  265. if(ec)
  266. return;
  267. // Send an HTTP GET request for the target
  268. {
  269. http::request<http::empty_body> req;
  270. req.method(http::verb::get);
  271. req.target(target);
  272. req.version(11);
  273. req.set(http::field::host, host);
  274. req.set(http::field::user_agent, "Beast");
  275. http::async_write(stream, req, yield[ec]);
  276. if(ec)
  277. return;
  278. }
  279. // Now read the response
  280. flat_buffer buffer;
  281. http::response<http::string_body> res;
  282. http::async_read(stream, buffer, res, yield[ec]);
  283. if(ec)
  284. return;
  285. // Try to perform the TLS shutdown handshake
  286. stream.async_shutdown(yield[ec]);
  287. // `net::ssl::error::stream_truncated`, also known as an SSL "short read",
  288. // indicates the peer closed the connection without performing the
  289. // required closing handshake (for example, Google does this to
  290. // improve performance). Generally this can be a security issue,
  291. // but if your communication protocol is self-terminated (as
  292. // it is with both HTTP and WebSocket) then you may simply
  293. // ignore the lack of close_notify:
  294. //
  295. // https://github.com/boostorg/beast/issues/38
  296. //
  297. // https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
  298. //
  299. // When a short read would cut off the end of an HTTP message,
  300. // Beast returns the error beast::http::error::partial_message.
  301. // Therefore, if we see a short read here, it has occurred
  302. // after the message has been completed, so it is safe to ignore it.
  303. if(ec == net::ssl::error::stream_truncated)
  304. ec = {};
  305. else if(ec)
  306. return;
  307. // Set the string to return to the caller
  308. result = std::move(res.body());
  309. });
  310. // `run` will dispatch completion handlers, and block until there is
  311. // no more "work" remaining. When this call returns, the operations
  312. // are complete and we can give the caller the result.
  313. ioc.run();
  314. return result;
  315. }
  316. //]
  317. //[code_core_3_timeouts_3f
  318. class window
  319. {
  320. std::size_t value_ = 0;
  321. // The size of the exponential window, in seconds.
  322. // This should be a power of two.
  323. static std::size_t constexpr Window = 4;
  324. public:
  325. /** Returns the number of elapsed seconds since the given time, and adjusts the time.
  326. This function returns the number of elapsed seconds since the
  327. specified time point, rounding down. It also moves the specified
  328. time point forward by the number of elapsed seconds.
  329. @param since The time point from which to calculate elapsed time.
  330. The function will modify the value, by adding the number of elapsed
  331. seconds to it.
  332. @return The number of elapsed seconds.
  333. */
  334. template<class Clock, class Duration>
  335. static
  336. std::chrono::seconds
  337. get_elapsed(std::chrono::time_point<Clock, Duration>& since) noexcept
  338. {
  339. auto const elapsed = std::chrono::duration_cast<
  340. std::chrono::seconds>(Clock::now() - since);
  341. since += elapsed;
  342. return elapsed;
  343. }
  344. /// Returns the current value, after adding the given sample.
  345. std::size_t
  346. update(std::size_t sample, std::chrono::seconds elapsed) noexcept
  347. {
  348. // Apply exponential decay.
  349. //
  350. // This formula is fast (no division or multiplication) but inaccurate.
  351. // It overshoots by `n*(1-a)/(1-a^n), where a=(window-1)/window`.
  352. // Could be good enough for a rough approximation, but if relying
  353. // on this for production please perform tests!
  354. auto count = elapsed.count();
  355. while(count--)
  356. value_ -= (value_ + Window - 1) / Window;
  357. value_ += sample;
  358. return value_ / Window;
  359. }
  360. /// Returns the current value
  361. std::size_t
  362. value() const noexcept
  363. {
  364. return value_ / Window;
  365. }
  366. };
  367. //]
  368. //[code_core_3_timeouts_4f
  369. /** A RatePolicy to measure instantaneous throughput.
  370. This measures the rate of transfer for reading and writing
  371. using a simple exponential decay function.
  372. */
  373. class rate_gauge
  374. {
  375. // The clock used to measure elapsed time
  376. using clock_type = std::chrono::steady_clock;
  377. // This implements an exponential smoothing window function.
  378. // The value `Seconds` is the size of the window in seconds.
  379. clock_type::time_point when_;
  380. std::size_t read_bytes_ = 0;
  381. std::size_t write_bytes_ = 0;
  382. window read_window_;
  383. window write_window_;
  384. // Friending this type allows us to mark the
  385. // member functions required by RatePolicy as private.
  386. friend class rate_policy_access;
  387. // Returns the number of bytes available to read currently
  388. // Required by RatePolicy
  389. std::size_t
  390. available_read_bytes() const noexcept
  391. {
  392. // no limit
  393. return (std::numeric_limits<std::size_t>::max)();
  394. }
  395. // Returns the number of bytes available to write currently
  396. // Required by RatePolicy
  397. std::size_t
  398. available_write_bytes() const noexcept
  399. {
  400. // no limit
  401. return (std::numeric_limits<std::size_t>::max)();
  402. }
  403. // Called every time bytes are read
  404. // Required by RatePolicy
  405. void
  406. transfer_read_bytes(std::size_t n) noexcept
  407. {
  408. // Add this to our running total of bytes read
  409. read_bytes_ += n;
  410. }
  411. // Called every time bytes are written
  412. // Required by RatePolicy
  413. void
  414. transfer_write_bytes(std::size_t n) noexcept
  415. {
  416. // Add this to our running total of bytes written
  417. write_bytes_ += n;
  418. }
  419. // Called approximately once per second
  420. // Required by RatePolicy
  421. void
  422. on_timer()
  423. {
  424. // Calculate elapsed time in seconds, and adjust our time point
  425. auto const elapsed = window::get_elapsed(when_);
  426. // Skip the update when elapsed==0,
  427. // otherwise the measurement will have jitter
  428. if(elapsed.count() == 0)
  429. return;
  430. // Add our samples and apply exponential decay
  431. read_window_.update(read_bytes_, elapsed);
  432. write_window_.update(write_bytes_, elapsed);
  433. // Reset our counts of bytes transferred
  434. read_bytes_ = 0;
  435. write_bytes_ = 0;
  436. }
  437. public:
  438. rate_gauge()
  439. : when_(clock_type::now())
  440. {
  441. }
  442. /// Returns the current rate of reading in bytes per second
  443. std::size_t
  444. read_bytes_per_second() const noexcept
  445. {
  446. return read_window_.value();
  447. }
  448. /// Returns the current rate of writing in bytes per second
  449. std::size_t
  450. write_bytes_per_second() const noexcept
  451. {
  452. return write_window_.value();
  453. }
  454. };
  455. //]
  456. void
  457. core_3_timeouts_snippets2()
  458. {
  459. #include "snippets.ipp"
  460. {
  461. //[code_core_3_timeouts_9
  462. // This stream will use our new rate_gauge policy
  463. basic_stream<net::ip::tcp, net::executor, rate_gauge> stream(ioc);
  464. //...
  465. // Print the current rates
  466. std::cout <<
  467. stream.rate_policy().read_bytes_per_second() << " bytes/second read\n" <<
  468. stream.rate_policy().write_bytes_per_second() << " bytes/second written\n";
  469. //]
  470. }
  471. }
  472. } // (anon)
  473. template class basic_stream<net::ip::tcp, net::executor, rate_gauge>;
  474. struct core_3_timeouts_test
  475. : public beast::unit_test::suite
  476. {
  477. void
  478. testWindow()
  479. {
  480. window w;
  481. std::size_t v0 = w.value();
  482. std::size_t const N = 100000;
  483. for(std::size_t n = 1; n <= 2; ++n)
  484. {
  485. for(std::size_t i = 0;;++i)
  486. {
  487. auto const v = w.update(n * N, std::chrono::seconds(n));
  488. if(v == v0)
  489. {
  490. BEAST_PASS();
  491. #if 0
  492. log <<
  493. "update(" << n*N << ", " << n <<
  494. ") converged to " << w.value() <<
  495. " in " << i << std::endl;
  496. #endif
  497. break;
  498. }
  499. if(i > 1000)
  500. {
  501. BEAST_FAIL();
  502. break;
  503. }
  504. v0 = v;
  505. }
  506. }
  507. }
  508. void
  509. run() override
  510. {
  511. testWindow();
  512. BEAST_EXPECT(&core_3_timeouts_snippets);
  513. BEAST_EXPECT(&core_3_timeouts_snippets2);
  514. BEAST_EXPECT((&do_async_echo<net::ip::tcp, net::io_context::executor_type>));
  515. BEAST_EXPECT(&https_get);
  516. }
  517. };
  518. BEAST_DEFINE_TESTSUITE(beast,doc,core_3_timeouts);
  519. } // beast
  520. } // boost