tokio/task/coop/
mod.rs

1#![cfg_attr(not(feature = "full"), allow(dead_code))]
2#![cfg_attr(not(feature = "rt"), allow(unreachable_pub))]
3
4//! Utilities for improved cooperative scheduling.
5//!
6//! ### Cooperative scheduling
7//!
8//! A single call to [`poll`] on a top-level task may potentially do a lot of
9//! work before it returns `Poll::Pending`. If a task runs for a long period of
10//! time without yielding back to the executor, it can starve other tasks
11//! waiting on that executor to execute them, or drive underlying resources.
12//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
13//! long-running task. Instead, this module provides an opt-in mechanism for
14//! futures to collaborate with the executor to avoid starvation.
15//!
16//! Consider a future like this one:
17//!
18//! ```
19//! # use tokio_stream::{Stream, StreamExt};
20//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
21//!     while let Some(_) = input.next().await {}
22//! }
23//! ```
24//!
25//! It may look harmless, but consider what happens under heavy load if the
26//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
27//! yield, and will starve other tasks and resources on the same executor.
28//!
29//! To account for this, Tokio has explicit yield points in a number of library
30//! functions, which force tasks to return to the executor periodically.
31//!
32//!
33//! #### unconstrained
34//!
35//! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative
36//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
37//! Tokio. For example:
38//!
39//! ```
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! use tokio::{task, sync::mpsc};
43//!
44//! let fut = async {
45//!     let (tx, mut rx) = mpsc::unbounded_channel();
46//!
47//!     for i in 0..1000 {
48//!         let _ = tx.send(());
49//!         // This will always be ready. If coop was in effect, this code would be forced to yield
50//!         // periodically. However, if left unconstrained, then this code will never yield.
51//!         rx.recv().await;
52//!     }
53//! };
54//!
55//! task::coop::unconstrained(fut).await;
56//! # }
57//! ```
58//! [`poll`]: method@std::future::Future::poll
59//! [`task::unconstrained`]: crate::task::unconstrained()
60
61cfg_rt! {
62    mod consume_budget;
63    pub use consume_budget::consume_budget;
64
65    mod unconstrained;
66    pub use unconstrained::{unconstrained, Unconstrained};
67}
68
69// ```ignore
70// # use tokio_stream::{Stream, StreamExt};
71// async fn drop_all<I: Stream + Unpin>(mut input: I) {
72//     while let Some(_) = input.next().await {
73//         tokio::coop::proceed().await;
74//     }
75// }
76// ```
77//
78// The `proceed` future will coordinate with the executor to make sure that
79// every so often control is yielded back to the executor so it can run other
80// tasks.
81//
82// # Placing yield points
83//
84// Voluntary yield points should be placed _after_ at least some work has been
85// done. If they are not, a future sufficiently deep in the task hierarchy may
86// end up _never_ getting to run because of the number of yield points that
87// inevitably appear before it is reached. In general, you will want yield
88// points to only appear in "leaf" futures -- those that do not themselves poll
89// other futures. By doing this, you avoid double-counting each iteration of
90// the outer future against the cooperating budget.
91
92use crate::runtime::context;
93
94/// Opaque type tracking the amount of "work" a task may still do before
95/// yielding back to the scheduler.
96#[derive(Debug, Copy, Clone)]
97pub(crate) struct Budget(Option<u8>);
98
99pub(crate) struct BudgetDecrement {
100    success: bool,
101    hit_zero: bool,
102}
103
104impl Budget {
105    /// Budget assigned to a task on each poll.
106    ///
107    /// The value itself is chosen somewhat arbitrarily. It needs to be high
108    /// enough to amortize wakeup and scheduling costs, but low enough that we
109    /// do not starve other tasks for too long. The value also needs to be high
110    /// enough that particularly deep tasks are able to do at least some useful
111    /// work at all.
112    ///
113    /// Note that as more yield points are added in the ecosystem, this value
114    /// will probably also have to be raised.
115    const fn initial() -> Budget {
116        Budget(Some(128))
117    }
118
119    /// Returns an unconstrained budget. Operations will not be limited.
120    pub(crate) const fn unconstrained() -> Budget {
121        Budget(None)
122    }
123
124    fn has_remaining(self) -> bool {
125        self.0.map_or(true, |budget| budget > 0)
126    }
127}
128
129/// Runs the given closure with a cooperative task budget. When the function
130/// returns, the budget is reset to the value prior to calling the function.
131#[inline(always)]
132pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
133    with_budget(Budget::initial(), f)
134}
135
136/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
137/// is reset to the value prior to calling the function.
138#[inline(always)]
139pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
140    with_budget(Budget::unconstrained(), f)
141}
142
143#[inline(always)]
144fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
145    struct ResetGuard {
146        prev: Budget,
147    }
148
149    impl Drop for ResetGuard {
150        fn drop(&mut self) {
151            let _ = context::budget(|cell| {
152                cell.set(self.prev);
153            });
154        }
155    }
156
157    #[allow(unused_variables)]
158    let maybe_guard = context::budget(|cell| {
159        let prev = cell.get();
160        cell.set(budget);
161
162        ResetGuard { prev }
163    });
164
165    // The function is called regardless even if the budget is not successfully
166    // set due to the thread-local being destroyed.
167    f()
168}
169
170/// Returns `true` if there is still budget left on the task.
171///
172/// # Examples
173///
174/// This example defines a `Timeout` future that requires a given `future` to complete before the
175/// specified duration elapses. If it does, its result is returned; otherwise, an error is returned
176/// and the future is canceled.
177///
178/// Note that the future could exhaust the budget before we evaluate the timeout. Using `has_budget_remaining`,
179/// we can detect this scenario and ensure the timeout is always checked.
180///
181/// ```
182/// # use std::future::Future;
183/// # use std::pin::{pin, Pin};
184/// # use std::task::{ready, Context, Poll};
185/// # use tokio::task::coop;
186/// # use tokio::time::Sleep;
187/// pub struct Timeout<T> {
188///     future: T,
189///     delay: Pin<Box<Sleep>>,
190/// }
191///
192/// impl<T> Future for Timeout<T>
193/// where
194///     T: Future + Unpin,
195/// {
196///     type Output = Result<T::Output, ()>;
197///
198///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199///         let this = Pin::into_inner(self);
200///         let future = Pin::new(&mut this.future);
201///         let delay = Pin::new(&mut this.delay);
202///
203///         // check if the future is ready
204///         let had_budget_before = coop::has_budget_remaining();
205///         if let Poll::Ready(v) = future.poll(cx) {
206///             return Poll::Ready(Ok(v));
207///         }
208///         let has_budget_now = coop::has_budget_remaining();
209///
210///         // evaluate the timeout
211///         if let (true, false) = (had_budget_before, has_budget_now) {
212///             // it is the underlying future that exhausted the budget
213///             ready!(pin!(coop::unconstrained(delay)).poll(cx));
214///         } else {
215///             ready!(delay.poll(cx));
216///         }
217///         return Poll::Ready(Err(()));
218///     }
219/// }
220///```
221#[inline(always)]
222#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
223pub fn has_budget_remaining() -> bool {
224    // If the current budget cannot be accessed due to the thread-local being
225    // shutdown, then we assume there is budget remaining.
226    context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
227}
228
229cfg_rt_multi_thread! {
230    /// Sets the current task's budget.
231    pub(crate) fn set(budget: Budget) {
232        let _ = context::budget(|cell| cell.set(budget));
233    }
234}
235
236cfg_rt! {
237    /// Forcibly removes the budgeting constraints early.
238    ///
239    /// Returns the remaining budget
240    pub(crate) fn stop() -> Budget {
241        context::budget(|cell| {
242            let prev = cell.get();
243            cell.set(Budget::unconstrained());
244            prev
245        }).unwrap_or(Budget::unconstrained())
246    }
247}
248
249cfg_coop! {
250    use pin_project_lite::pin_project;
251    use std::cell::Cell;
252    use std::future::Future;
253    use std::marker::PhantomData;
254    use std::pin::Pin;
255    use std::task::{ready, Context, Poll};
256
257    /// Value returned by the [`poll_proceed`] method.
258    #[derive(Debug)]
259    #[must_use]
260    pub struct RestoreOnPending(Cell<Budget>, PhantomData<*mut ()>);
261
262    impl RestoreOnPending {
263        fn new(budget: Budget) -> Self {
264            RestoreOnPending(
265                Cell::new(budget),
266                PhantomData,
267            )
268        }
269
270        /// Signals that the task that obtained this `RestoreOnPending` was able to make
271        /// progress. This prevents the task budget from being restored to the value
272        /// it had prior to obtaining this instance when it is dropped.
273        pub fn made_progress(&self) {
274            self.0.set(Budget::unconstrained());
275        }
276    }
277
278    impl Drop for RestoreOnPending {
279        fn drop(&mut self) {
280            // Don't reset if budget was unconstrained or if we made progress.
281            // They are both represented as the remembered budget being unconstrained.
282            let budget = self.0.get();
283            if !budget.is_unconstrained() {
284                let _ = context::budget(|cell| {
285                    cell.set(budget);
286                });
287            }
288        }
289    }
290
291    /// Decrements the task budget and returns [`Poll::Pending`] if the budget is depleted.
292    /// This indicates that the task should yield to the scheduler. Otherwise, returns
293    /// [`RestoreOnPending`] which can be used to commit the budget consumption.
294    ///
295    /// The returned [`RestoreOnPending`] will revert the budget to its former
296    /// value when dropped unless [`RestoreOnPending::made_progress`]
297    /// is called. It is the caller's responsibility to do so when it _was_ able to
298    /// make progress after the call to [`poll_proceed`].
299    /// Restoring the budget automatically ensures the task can try to make progress in some other
300    /// way.
301    ///
302    /// Note that [`RestoreOnPending`] restores the budget **as it was before [`poll_proceed`]**.
303    /// Therefore, if the budget is _further_ adjusted between when [`poll_proceed`] returns and
304    /// [`RestoreOnPending`] is dropped, those adjustments are erased unless the caller indicates
305    /// that progress was made.
306    ///
307    /// # Examples
308    ///
309    /// This example wraps the `futures::channel::mpsc::UnboundedReceiver` to
310    /// cooperate with the Tokio scheduler. Each time a value is received, task budget
311    /// is consumed. If no budget is available, the task yields to the scheduler.
312    ///
313    /// ```
314    /// use std::pin::Pin;
315    /// use std::task::{ready, Context, Poll};
316    /// use tokio::task::coop;
317    /// use futures::stream::{Stream, StreamExt};
318    /// use futures::channel::mpsc::UnboundedReceiver;
319    ///
320    /// struct CoopUnboundedReceiver<T> {
321    ///    receiver: UnboundedReceiver<T>,
322    /// }
323    ///
324    /// impl<T> Stream for CoopUnboundedReceiver<T> {
325    ///     type Item = T;
326    ///     fn poll_next(
327    ///         mut self: Pin<&mut Self>,
328    ///         cx: &mut Context<'_>
329    ///     ) -> Poll<Option<T>> {
330    ///         let coop = ready!(coop::poll_proceed(cx));
331    ///         match self.receiver.poll_next_unpin(cx) {
332    ///             Poll::Ready(v) => {
333    ///                 // We received a value, so consume budget.
334    ///                 coop.made_progress();
335    ///                 Poll::Ready(v)
336    ///             }
337    ///             Poll::Pending => Poll::Pending,
338    ///        }
339    ///     }
340    /// }
341    /// ```
342    #[inline]
343    pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
344        context::budget(|cell| {
345            let mut budget = cell.get();
346
347            let decrement = budget.decrement();
348
349            if decrement.success {
350                let restore = RestoreOnPending::new(cell.get());
351                cell.set(budget);
352
353                // avoid double counting
354                if decrement.hit_zero {
355                    inc_budget_forced_yield_count();
356                }
357
358                Poll::Ready(restore)
359            } else {
360                register_waker(cx);
361                Poll::Pending
362            }
363        }).unwrap_or(Poll::Ready(RestoreOnPending::new(Budget::unconstrained())))
364    }
365
366    /// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise.
367    ///
368    /// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when
369    /// polling for budget availability.
370    #[inline]
371    pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
372        if has_budget_remaining() {
373            Poll::Ready(())
374        } else {
375            register_waker(cx);
376
377            Poll::Pending
378        }
379    }
380
381    cfg_rt! {
382        cfg_unstable_metrics! {
383            #[inline(always)]
384            fn inc_budget_forced_yield_count() {
385                let _ = context::with_current(|handle| {
386                    handle.scheduler_metrics().inc_budget_forced_yield_count();
387                });
388            }
389        }
390
391        cfg_not_unstable_metrics! {
392            #[inline(always)]
393            fn inc_budget_forced_yield_count() {}
394        }
395
396        fn register_waker(cx: &mut Context<'_>) {
397            context::defer(cx.waker());
398        }
399    }
400
401    cfg_not_rt! {
402        #[inline(always)]
403        fn inc_budget_forced_yield_count() {}
404
405        fn register_waker(cx: &mut Context<'_>) {
406            cx.waker().wake_by_ref()
407        }
408    }
409
410    impl Budget {
411        /// Decrements the budget. Returns `true` if successful. Decrementing fails
412        /// when there is not enough remaining budget.
413        fn decrement(&mut self) -> BudgetDecrement {
414            if let Some(num) = &mut self.0 {
415                if *num > 0 {
416                    *num -= 1;
417
418                    let hit_zero = *num == 0;
419
420                    BudgetDecrement { success: true, hit_zero }
421                } else {
422                    BudgetDecrement { success: false, hit_zero: false }
423                }
424            } else {
425                BudgetDecrement { success: true, hit_zero: false }
426            }
427        }
428
429        fn is_unconstrained(self) -> bool {
430            self.0.is_none()
431        }
432    }
433
434    pin_project! {
435        /// Future wrapper to ensure cooperative scheduling created by [`cooperative`].
436        #[must_use = "futures do nothing unless polled"]
437        pub struct Coop<F: Future> {
438            #[pin]
439            pub(crate) fut: F,
440        }
441    }
442
443    impl<F: Future> Future for Coop<F> {
444        type Output = F::Output;
445
446        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
447            let coop = ready!(poll_proceed(cx));
448            let me = self.project();
449            if let Poll::Ready(ret) = me.fut.poll(cx) {
450                coop.made_progress();
451                Poll::Ready(ret)
452            } else {
453                Poll::Pending
454            }
455        }
456    }
457
458    /// Creates a wrapper future that makes the inner future cooperate with the Tokio scheduler.
459    ///
460    /// When polled, the wrapper will first call [`poll_proceed`] to consume task budget, and
461    /// immediately yield if the budget has been depleted. If budget was available, the inner future
462    /// is polled. The budget consumption will be made final using [`RestoreOnPending::made_progress`]
463    /// if the inner future resolves to its final value.
464    ///
465    /// # Examples
466    ///
467    /// When you call `recv` on the `Receiver` of a [`tokio::sync::mpsc`](crate::sync::mpsc)
468    /// channel, task budget will automatically be consumed when the next value is returned.
469    /// This makes tasks that use Tokio mpsc channels automatically cooperative.
470    ///
471    /// If you're using [`futures::channel::mpsc`](https://docs.rs/futures/latest/futures/channel/mpsc/index.html)
472    /// instead, automatic task budget consumption will not happen. This example shows how can use
473    /// `cooperative` to make `futures::channel::mpsc` channels cooperate with the scheduler in the
474    /// same way Tokio channels do.
475    ///
476    /// ```
477    /// use tokio::task::coop::cooperative;
478    /// use futures::channel::mpsc::Receiver;
479    /// use futures::stream::StreamExt;
480    ///
481    /// async fn receive_next<T>(receiver: &mut Receiver<T>) -> Option<T> {
482    ///     // Use `StreamExt::next` to obtain a `Future` that resolves to the next value
483    ///     let recv_future = receiver.next();
484    ///     // Wrap it a cooperative wrapper
485    ///     let coop_future = cooperative(recv_future);
486    ///     // And await
487    ///     coop_future.await
488    /// }
489    #[inline]
490    pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
491        Coop { fut }
492    }
493}
494
495#[cfg(all(test, not(loom)))]
496mod test {
497    use super::*;
498
499    #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
500    use wasm_bindgen_test::wasm_bindgen_test as test;
501
502    fn get() -> Budget {
503        context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
504    }
505
506    #[test]
507    fn budgeting() {
508        use std::future::poll_fn;
509        use tokio_test::*;
510
511        assert!(get().0.is_none());
512
513        let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
514
515        assert!(get().0.is_none());
516        drop(coop);
517        assert!(get().0.is_none());
518
519        budget(|| {
520            assert_eq!(get().0, Budget::initial().0);
521
522            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
523            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
524            drop(coop);
525            // we didn't make progress
526            assert_eq!(get().0, Budget::initial().0);
527
528            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
529            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
530            coop.made_progress();
531            drop(coop);
532            // we _did_ make progress
533            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
534
535            let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
536            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
537            coop.made_progress();
538            drop(coop);
539            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
540
541            budget(|| {
542                assert_eq!(get().0, Budget::initial().0);
543
544                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
545                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
546                coop.made_progress();
547                drop(coop);
548                assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
549            });
550
551            assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
552        });
553
554        assert!(get().0.is_none());
555
556        budget(|| {
557            let n = get().0.unwrap();
558
559            for _ in 0..n {
560                let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
561                coop.made_progress();
562            }
563
564            let mut task = task::spawn(poll_fn(|cx| {
565                let coop = std::task::ready!(poll_proceed(cx));
566                coop.made_progress();
567                Poll::Ready(())
568            }));
569
570            assert_pending!(task.poll());
571        });
572    }
573}