[/ Copyright Oliver Kowalke, Nat Goodspeed 2015. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt ] [/ import path is relative to this .qbk file] [import ../examples/wait_stuff.cpp] [#when_any] [section:when_any when_any / when_all functionality] [heading Overview] A bit of wisdom from the early days of computing still holds true today: prefer to model program state using the instruction pointer rather than with Boolean flags. In other words, if the program must ["do something] and then do something almost the same, but with minor changes... perhaps parts of that something should be broken out as smaller separate functions, rather than introducing flags to alter the internal behavior of a monolithic function. To that we would add: prefer to describe control flow using C++ native constructs such as function calls, `if`, `while`, `for`, `do` et al. rather than as chains of callbacks. One of the great strengths of __boost_fiber__ is the flexibility it confers on the coder to restructure an application from chains of callbacks to straightforward C++ statement sequence, even when code in that fiber is in fact interleaved with code running in other fibers. There has been much recent discussion about the benefits of when_any and when_all functionality. When dealing with asynchronous and possibly unreliable services, these are valuable idioms. But of course when_any and when_all are closely tied to the use of chains of callbacks. This section presents recipes for achieving the same ends, in the context of a fiber that wants to ["do something] when one or more other independent activities have completed. Accordingly, these are `wait_something()` functions rather than `when_something()` functions. The expectation is that the calling fiber asks to launch those independent activities, then waits for them, then sequentially proceeds with whatever processing depends on those results. The function names shown (e.g. [link wait_first_simple `wait_first_simple()`]) are for illustrative purposes only, because all these functions have been bundled into a single source file. Presumably, if (say) [link wait_first_success `wait_first_success()`] best suits your application needs, you could introduce that variant with the name `wait_any()`. [note The functions presented in this section accept variadic argument lists of task functions. Corresponding `wait_something()` functions accepting a container of task functions are left as an exercise for the interested reader. Those should actually be simpler. Most of the complexity would arise from overloading the same name for both purposes.] [/ @path link is relative to (eventual) doc/html/index.html, hence ../..] All the source code for this section is found in [@../../examples/wait_stuff.cpp wait_stuff.cpp]. [heading Example Task Function] [#wait_sleeper] We found it convenient to model an asynchronous task using this function: [wait_sleeper] with type-specific `sleeper()` ["front ends] for `std::string`, `double` and `int`. `Verbose` simply prints a message to `std::cout` on construction and destruction. Basically: # `sleeper()` prints a start message; # sleeps for the specified number of milliseconds; # if `thrw` is passed as `true`, throws a string description of the passed `item`; # else returns the passed `item`. # On the way out, `sleeper()` produces a stop message. This function will feature in the example calls to the various functions presented below. [section when_any] [#wait_first_simple_section] [section when_any, simple completion] The simplest case is when you only need to know that the first of a set of asynchronous tasks has completed [mdash] but you don't need to obtain a return value, and you're confident that they will not throw exceptions. [#wait_done] For this we introduce a `Done` class to wrap a `bool` variable with a [class_link condition_variable] and a [class_link mutex]: [wait_done] The pattern we follow throughout this section is to pass a [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`] to the relevant synchronization object to the various tasks' fiber functions. This eliminates nagging questions about the lifespan of the synchronization object relative to the last of the fibers. [#wait_first_simple] `wait_first_simple()` uses that tactic for [link wait_done `Done`]: [wait_first_simple] [#wait_first_simple_impl] `wait_first_simple_impl()` is an ordinary recursion over the argument pack, capturing `Done::ptr` for each new fiber: [wait_first_simple_impl] The body of the fiber's lambda is extremely simple, as promised: call the function, notify [link wait_done `Done`] when it returns. The first fiber to do so allows `wait_first_simple()` to return [mdash] which is why it's useful to have `std::shared_ptr` manage the lifespan of our `Done` object rather than declaring it as a stack variable in `wait_first_simple()`. This is how you might call it: [wait_first_simple_ex] In this example, control resumes after `wait_first_simple()` when [link wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the other two `sleeper()` fibers are still running. [endsect] [section when_any, return value] It seems more useful to add the ability to capture the return value from the first of the task functions to complete. Again, we assume that none will throw an exception. One tactic would be to adapt our [link wait_done `Done`] class to store the first of the return values, rather than a simple `bool`. However, we choose instead to use a [template_link buffered_channel]. We'll only need to enqueue the first value, so we'll [member_link buffered_channel..close] it once we've retrieved that value. Subsequent `push()` calls will return `closed`. [#wait_first_value] [wait_first_value] [#wait_first_value_impl] The meat of the `wait_first_value_impl()` function is as you might expect: [wait_first_value_impl] It calls the passed function, pushes its return value and ignores the `push()` result. You might call it like this: [wait_first_value_ex] [endsect] [section when_any, produce first outcome, whether result or exception] We may not be running in an environment in which we can guarantee no exception will be thrown by any of our task functions. In that case, the above implementations of `wait_first_something()` would be naïve: as mentioned in [link exceptions the section on Fiber Management], an uncaught exception in one of our task fibers would cause `std::terminate()` to be called. Let's at least ensure that such an exception would propagate to the fiber awaiting the first result. We can use [template_link future] to transport either a return value or an exception. Therefore, we will change [link wait_first_value `wait_first_value()`]'s [template_link buffered_channel] to hold `future< T >` items instead of simply `T`. Once we have a `future<>` in hand, all we need do is call [member_link future..get], which will either return the value or rethrow the exception. [#wait_first_outcome] [wait_first_outcome] So far so good [mdash] but there's a timing issue. How should we obtain the `future<>` to [member_link buffered_channel..push] on the queue? We could call [ns_function_link fibers..async]. That would certainly produce a `future<>` for the task function. The trouble is that it would return too quickly! We only want `future<>` items for ['completed] tasks on our `queue<>`. In fact, we only want the `future<>` for the one that completes first. If each fiber launched by `wait_first_outcome()` were to `push()` the result of calling `async()`, the queue would only ever report the result of the leftmost task item [mdash] ['not] the one that completes most quickly. Calling [member_link future..get] on the future returned by `async()` wouldn't be right. You can only call `get()` once per `future<>` instance! And if there were an exception, it would be rethrown inside the helper fiber at the producer end of the queue, rather than propagated to the consumer end. We could call [member_link future..wait]. That would block the helper fiber until the `future<>` became ready, at which point we could `push()` it to be retrieved by `wait_first_outcome()`. That would work [mdash] but there's a simpler tactic that avoids creating an extra fiber. We can wrap the task function in a [template_link packaged_task]. While one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is, in fact, what `async()` does [mdash] in this case, we're already running in the helper fiber at the producer end of the queue! We can simply ['call] the `packaged_task<>`. On return from that call, the task function has completed, meaning that the `future<>` obtained from the `packaged_task<>` is certain to be ready. At that point we can simply `push()` it to the queue. [#wait_first_outcome_impl] [wait_first_outcome_impl] Calling it might look like this: [wait_first_outcome_ex] [endsect] [section when_any, produce first success] One scenario for ["when_any] functionality is when we're redundantly contacting some number of possibly-unreliable web services. Not only might they be slow [mdash] any one of them might produce a failure rather than the desired result. In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the right approach. If one of the services produces an error quickly, while another follows up with a real answer, we don't want to prefer the error just because it arrived first! Given the `queue< future< T > >` we already constructed for `wait_first_outcome()`, though, we can readily recast the interface function to deliver the first ['successful] result. That does beg the question: what if ['all] the task functions throw an exception? In that case we'd probably better know about it. [#exception_list] The [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis C++ Parallelism Draft Technical Specification] proposes a `std::exception_list` exception capable of delivering a collection of `std::exception_ptr`s. Until that becomes universally available, let's fake up an `exception_list` of our own: [exception_list] Now we can build `wait_first_success()`, using [link wait_first_outcome_impl `wait_first_outcome_impl()`]. Instead of retrieving only the first `future<>` from the queue, we must now loop over `future<>` items. Of course we must limit that iteration! If we launch only `count` producer fibers, the `(count+1)`[superscript st] [member_link buffered_channel..pop] call would block forever. Given a ready `future<>`, we can distinguish failure by calling [member_link future..get_exception_ptr]. If the `future<>` in fact contains a result rather than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we can confidently call [member_link future..get] to return that result to our caller. If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into our pending `exception_list` and loop back for the next `future<>` from the queue. If we fall out of the loop [mdash] if every single task fiber threw an exception [mdash] we throw the `exception_list` exception into which we've been collecting those `std::exception_ptr`s. [#wait_first_success] [wait_first_success] A call might look like this: [wait_first_success_ex] [endsect] [section when_any, heterogeneous types] We would be remiss to ignore the case in which the various task functions have distinct return types. That means that the value returned by the first of them might have any one of those types. We can express that with [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant]. To keep the example simple, we'll revert to pretending that none of them can throw an exception. That makes `wait_first_value_het()` strongly resemble [link wait_first_value `wait_first_value()`]. We can actually reuse [link wait_first_value_impl `wait_first_value_impl()`], merely passing `boost::variant` as the queue's value type rather than the common `T`! Naturally this could be extended to use [link wait_first_success `wait_first_success()`] semantics instead. [wait_first_value_het] It might be called like this: [wait_first_value_het_ex] [endsect] [section when_any, a dubious alternative] Certain topics in C++ can arouse strong passions, and exceptions are no exception. We cannot resist mentioning [mdash] for purely informational purposes [mdash] that when you need only the ['first] result from some number of concurrently-running fibers, it would be possible to pass a [^shared_ptr<[template_link promise]>] to the participating fibers, then cause the initiating fiber to call [member_link future..get] on its [template_link future]. The first fiber to call [member_link promise..set_value] on that shared `promise` will succeed; subsequent `set_value()` calls on the same `promise` instance will throw `future_error`. Use this information at your own discretion. Beware the dark side. [endsect] [endsect][/ when_any] [section when_all functionality] [section when_all, simple completion] For the case in which we must wait for ['all] task functions to complete [mdash] but we don't need results (or expect exceptions) from any of them [mdash] we can write `wait_all_simple()` that looks remarkably like [link wait_first_simple `wait_first_simple()`]. The difference is that instead of our [link wait_done `Done`] class, we instantiate a [class_link barrier] and call its [member_link barrier..wait]. We initialize the `barrier` with `(count+1)` because we are launching `count` fibers, plus the `wait()` call within `wait_all_simple()` itself. [wait_all_simple] As stated above, the only difference between `wait_all_simple_impl()` and [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former calls `barrier::wait()` rather than `Done::notify()`: [wait_all_simple_impl] You might call it like this: [wait_all_simple_ex] Control will not return from the `wait_all_simple()` call until the last of its task functions has completed. [endsect] [section when_all, return values] As soon as we want to collect return values from all the task functions, we can see right away how to reuse [link wait_first_value `wait_first_value()`]'s queue for the purpose. All we have to do is avoid closing it after the first value! But in fact, collecting multiple values raises an interesting question: do we ['really] want to wait until the slowest of them has arrived? Wouldn't we rather process each result as soon as it becomes available? Fortunately we can present both APIs. Let's define `wait_all_values_source()` to return `shared_ptr>`. [#wait_all_values] Given `wait_all_values_source()`, it's straightforward to implement `wait_all_values()`: [wait_all_values] It might be called like this: [wait_all_values_ex] As you can see from the loop in `wait_all_values()`, instead of requiring its caller to count values, we define `wait_all_values_source()` to [member_link buffered_channel..close] the queue when done. But how do we do that? Each producer fiber is independent. It has no idea whether it is the last one to [member_link buffered_channel..push] a value. [#wait_nqueue] We can address that problem with a counting façade for the `queue<>`. In fact, our façade need only support the producer end of the queue. [wait_nqueue] [#wait_all_values_source] Armed with `nqueue<>`, we can implement `wait_all_values_source()`. It starts just like [link wait_first_value `wait_first_value()`]. The difference is that we wrap the `queue` with an `nqueue` to pass to the producer fibers. Then, of course, instead of popping the first value, closing the queue and returning it, we simply return the `shared_ptr>`. [wait_all_values_source] For example: [wait_all_values_source_ex] [#wait_all_values_impl] `wait_all_values_impl()` really is just like [link wait_first_value_impl `wait_first_value_impl()`] except for the use of `nqueue` rather than `queue`: [wait_all_values_impl] [endsect] [section when_all until first exception] Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we can elaborate [link wait_all_values `wait_all_values()`] and [link wait_all_values_source `wait_all_values_source()`] by passing `future< T >` instead of plain `T`. [#wait_all_until_error] `wait_all_until_error()` pops that `future< T >` and calls its [member_link future..get]: [wait_all_until_error] For example: [wait_all_until_error_ex] [#wait_all_until_error_source] Naturally this complicates the API for `wait_all_until_error_source()`. The caller must both retrieve a `future< T >` and call its `get()` method. It would, of course, be possible to return a façade over the consumer end of the queue that would implicitly perform the `get()` and return a simple `T` (or throw). The implementation is just as you would expect. Notice, however, that we can reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the `nqueue` rather than `queue`. [wait_all_until_error_source] For example: [wait_all_until_error_source_ex] [endsect] [section wait_all, collecting all exceptions] [#wait_all_collect_errors] Given [link wait_all_until_error_source `wait_all_until_error_source()`], it might be more reasonable to make a `wait_all_...()` that collects ['all] errors instead of presenting only the first: [wait_all_collect_errors] The implementation is a simple variation on [link wait_first_success `wait_first_success()`], using the same [link exception_list `exception_list`] exception class. [endsect] [section when_all, heterogeneous types] But what about the case when we must wait for all results of different types? We can present an API that is frankly quite cool. Consider a sample struct: [wait_Data] Let's fill its members from task functions all running concurrently: [wait_all_members_data_ex] Note that for this case, we abandon the notion of capturing the earliest result first, and so on: we must fill exactly the passed struct in left-to-right order. That permits a beautifully simple implementation: [wait_all_members] [wait_all_members_get] It is tempting to try to implement `wait_all_members()` as a one-liner like this: return Result{ boost::fibers::async(functions).get()... }; The trouble with this tactic is that it would serialize all the task functions. The runtime makes a single pass through `functions`, calling [ns_function_link fibers..async] for each and then immediately calling [member_link future..get] on its returned `future<>`. That blocks the implicit loop. The above is almost equivalent to writing: return Result{ functions()... }; in which, of course, there is no concurrency at all. Passing the argument pack through a function-call boundary (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in `wait_all_members()` to collect the `future<>`s from all the `async()` calls, the second in `wait_all_members_get()` to fetch each of the results. As noted in comments, within the `wait_all_members_get()` parameter pack expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the way, we will hit the `get()` for the slowest task function; after that every subsequent `get()` will complete in trivial time. By the way, we could also use this same API to fill a vector or other collection: [wait_all_members_vector_ex] [endsect] [endsect][/ when_all] [endsect][/ outermost]