tokio/macros/try_join.rs
1macro_rules! doc {
2 ($try_join:item) => {
3 /// Waits on multiple concurrent branches, returning when **all** branches
4 /// complete with `Ok(_)` or on the first `Err(_)`.
5 ///
6 /// The `try_join!` macro must be used inside of async functions, closures, and
7 /// blocks.
8 ///
9 /// Similar to [`join!`], the `try_join!` macro takes a list of async
10 /// expressions and evaluates them concurrently on the same task. Each async
11 /// expression evaluates to a future and the futures from each expression are
12 /// multiplexed on the current task. The `try_join!` macro returns when **all**
13 /// branches return with `Ok` or when the **first** branch returns with `Err`.
14 ///
15 /// [`join!`]: macro@join
16 ///
17 /// # Notes
18 ///
19 /// The supplied futures are stored inline and do not require allocating a
20 /// `Vec`.
21 ///
22 /// ## Runtime characteristics
23 ///
24 /// By running all async expressions on the current task, the expressions are
25 /// able to run **concurrently** but not in **parallel**. This means all
26 /// expressions are run on the same thread and if one branch blocks the thread,
27 /// all other expressions will be unable to continue. If parallelism is
28 /// required, spawn each async expression using [`tokio::spawn`] and pass the
29 /// join handle to `try_join!`.
30 ///
31 /// [`tokio::spawn`]: crate::spawn
32 ///
33 /// ## Fairness
34 ///
35 /// By default, `try_join!`'s generated future rotates which
36 /// contained future is polled first whenever it is woken.
37 ///
38 /// This behavior can be overridden by adding `biased;` to the beginning of the
39 /// macro usage. See the examples for details. This will cause `try_join` to poll
40 /// the futures in the order they appear from top to bottom.
41 ///
42 /// You may want this if your futures may interact in a way where known polling order is significant.
43 ///
44 /// But there is an important caveat to this mode. It becomes your responsibility
45 /// to ensure that the polling order of your futures is fair. If for example you
46 /// are joining a stream and a shutdown future, and the stream has a
47 /// huge volume of messages that takes a long time to finish processing per poll, you should
48 /// place the shutdown future earlier in the `try_join!` list to ensure that it is
49 /// always polled, and will not be delayed due to the stream future taking a long time to return
50 /// `Poll::Pending`.
51 ///
52 /// # Examples
53 ///
54 /// Basic `try_join` with two branches.
55 ///
56 /// ```
57 /// async fn do_stuff_async() -> Result<(), &'static str> {
58 /// // async work
59 /// # Ok(())
60 /// }
61 ///
62 /// async fn more_async_work() -> Result<(), &'static str> {
63 /// // more here
64 /// # Ok(())
65 /// }
66 ///
67 /// #[tokio::main]
68 /// async fn main() {
69 /// let res = tokio::try_join!(
70 /// do_stuff_async(),
71 /// more_async_work());
72 ///
73 /// match res {
74 /// Ok((first, second)) => {
75 /// // do something with the values
76 /// }
77 /// Err(err) => {
78 /// println!("processing failed; error = {}", err);
79 /// }
80 /// }
81 /// }
82 /// ```
83 ///
84 /// Using `try_join!` with spawned tasks.
85 ///
86 /// ```
87 /// use tokio::task::JoinHandle;
88 ///
89 /// async fn do_stuff_async() -> Result<(), &'static str> {
90 /// // async work
91 /// # Err("failed")
92 /// }
93 ///
94 /// async fn more_async_work() -> Result<(), &'static str> {
95 /// // more here
96 /// # Ok(())
97 /// }
98 ///
99 /// async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> {
100 /// match handle.await {
101 /// Ok(Ok(result)) => Ok(result),
102 /// Ok(Err(err)) => Err(err),
103 /// Err(err) => Err("handling failed"),
104 /// }
105 /// }
106 ///
107 /// #[tokio::main]
108 /// async fn main() {
109 /// let handle1 = tokio::spawn(do_stuff_async());
110 /// let handle2 = tokio::spawn(more_async_work());
111 /// match tokio::try_join!(flatten(handle1), flatten(handle2)) {
112 /// Ok(val) => {
113 /// // do something with the values
114 /// }
115 /// Err(err) => {
116 /// println!("Failed with {}.", err);
117 /// # assert_eq!(err, "failed");
118 /// }
119 /// }
120 /// }
121 /// ```
122 /// Using the `biased;` mode to control polling order.
123 ///
124 /// ```
125 /// async fn do_stuff_async() -> Result<(), &'static str> {
126 /// // async work
127 /// # Ok(())
128 /// }
129 ///
130 /// async fn more_async_work() -> Result<(), &'static str> {
131 /// // more here
132 /// # Ok(())
133 /// }
134 ///
135 /// #[tokio::main]
136 /// async fn main() {
137 /// let res = tokio::try_join!(
138 /// biased;
139 /// do_stuff_async(),
140 /// more_async_work()
141 /// );
142 ///
143 /// match res {
144 /// Ok((first, second)) => {
145 /// // do something with the values
146 /// }
147 /// Err(err) => {
148 /// println!("processing failed; error = {}", err);
149 /// }
150 /// }
151 /// }
152 /// ```
153 #[macro_export]
154 #[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
155 $try_join
156 };
157}
158
159#[cfg(doc)]
160doc! {macro_rules! try_join {
161 ($(biased;)? $($future:expr),*) => { unimplemented!() }
162}}
163
164#[cfg(not(doc))]
165doc! {macro_rules! try_join {
166 (@ {
167 // Type of rotator that controls which inner future to start with
168 // when polling our output future.
169 rotator=$rotator:ty;
170
171 // One `_` for each branch in the `try_join!` macro. This is not used once
172 // normalization is complete.
173 ( $($count:tt)* )
174
175 // The expression `0+1+1+ ... +1` equal to the number of branches.
176 ( $($total:tt)* )
177
178 // Normalized try_join! branches
179 $( ( $($skip:tt)* ) $e:expr, )*
180
181 }) => {{
182 use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
183 use $crate::macros::support::Poll::{Ready, Pending};
184
185 // Safety: nothing must be moved out of `futures`. This is to satisfy
186 // the requirement of `Pin::new_unchecked` called below.
187 //
188 // We can't use the `pin!` macro for this because `futures` is a tuple
189 // and the standard library provides no way to pin-project to the fields
190 // of a tuple.
191 let mut futures = ( $( maybe_done($e), )* );
192
193 // This assignment makes sure that the `poll_fn` closure only has a
194 // reference to the futures, instead of taking ownership of them. This
195 // mitigates the issue described in
196 // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
197 let mut futures = &mut futures;
198
199 const COUNT: u32 = $($total)*;
200
201 // Each time the future created by poll_fn is polled, if not using biased mode,
202 // a different future is polled first to ensure every future passed to try_join!
203 // can make progress even if one of the futures consumes the whole budget.
204 let mut rotator = <$rotator>::default();
205
206 poll_fn(move |cx| {
207 let mut is_pending = false;
208 let mut to_run = COUNT;
209
210 // The number of futures that will be skipped in the first loop iteration.
211 let mut skip = rotator.num_skip();
212
213 // This loop runs twice and the first `skip` futures
214 // are not polled in the first iteration.
215 loop {
216 $(
217 if skip == 0 {
218 if to_run == 0 {
219 // Every future has been polled
220 break;
221 }
222 to_run -= 1;
223
224 // Extract the future for this branch from the tuple.
225 let ( $($skip,)* fut, .. ) = &mut *futures;
226
227 // Safety: future is stored on the stack above
228 // and never moved.
229 let mut fut = unsafe { Pin::new_unchecked(fut) };
230
231 // Try polling
232 if fut.as_mut().poll(cx).is_pending() {
233 is_pending = true;
234 } else if fut.as_mut().output_mut().expect("expected completed future").is_err() {
235 return Ready(Err(fut.take_output().expect("expected completed future").err().unwrap()))
236 }
237 } else {
238 // Future skipped, one less future to skip in the next iteration
239 skip -= 1;
240 }
241 )*
242 }
243
244 if is_pending {
245 Pending
246 } else {
247 Ready(Ok(($({
248 // Extract the future for this branch from the tuple.
249 let ( $($skip,)* fut, .. ) = &mut futures;
250
251 // Safety: future is stored on the stack above
252 // and never moved.
253 let mut fut = unsafe { Pin::new_unchecked(fut) };
254
255 fut
256 .take_output()
257 .expect("expected completed future")
258 .ok()
259 .expect("expected Ok(_)")
260 },)*)))
261 }
262 }).await
263 }};
264
265 // ===== Normalize =====
266
267 (@ { rotator=$rotator:ty; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
268 $crate::try_join!(@{ rotator=$rotator; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
269 };
270
271 // ===== Entry point =====
272 ( biased; $($e:expr),+ $(,)?) => {
273 $crate::try_join!(@{ rotator=$crate::macros::support::BiasedRotator; () (0) } $($e,)*)
274 };
275
276 ( $($e:expr),+ $(,)?) => {
277 $crate::try_join!(@{ rotator=$crate::macros::support::Rotator<COUNT>; () (0) } $($e,)*)
278 };
279
280 (biased;) => { async { Ok(()) }.await };
281
282 () => { async { Ok(()) }.await }
283}}