tokio_util/sync/
cancellation_token.rs

1//! An asynchronously awaitable [`CancellationToken`].
2//! The token allows to signal a cancellation request to one or more tasks.
3pub(crate) mod guard;
4pub(crate) mod guard_ref;
5mod tree_node;
6
7use crate::loom::sync::Arc;
8use crate::util::MaybeDangling;
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12
13use guard::DropGuard;
14use guard_ref::DropGuardRef;
15use pin_project_lite::pin_project;
16
17/// A token which can be used to signal a cancellation request to one or more
18/// tasks.
19///
20/// Tasks can call [`CancellationToken::cancelled()`] in order to
21/// obtain a Future which will be resolved when cancellation is requested.
22///
23/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
24///
25/// # Examples
26///
27/// ```no_run
28/// use tokio::select;
29/// use tokio_util::sync::CancellationToken;
30///
31/// #[tokio::main]
32/// async fn main() {
33///     let token = CancellationToken::new();
34///     let cloned_token = token.clone();
35///
36///     let join_handle = tokio::spawn(async move {
37///         // Wait for either cancellation or a very long time
38///         select! {
39///             _ = cloned_token.cancelled() => {
40///                 // The token was cancelled
41///                 5
42///             }
43///             _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
44///                 99
45///             }
46///         }
47///     });
48///
49///     tokio::spawn(async move {
50///         tokio::time::sleep(std::time::Duration::from_millis(10)).await;
51///         token.cancel();
52///     });
53///
54///     assert_eq!(5, join_handle.await.unwrap());
55/// }
56/// ```
57pub struct CancellationToken {
58    inner: Arc<tree_node::TreeNode>,
59}
60
61impl std::panic::UnwindSafe for CancellationToken {}
62impl std::panic::RefUnwindSafe for CancellationToken {}
63
64pin_project! {
65    /// A Future that is resolved once the corresponding [`CancellationToken`]
66    /// is cancelled.
67    #[must_use = "futures do nothing unless polled"]
68    pub struct WaitForCancellationFuture<'a> {
69        cancellation_token: &'a CancellationToken,
70        #[pin]
71        future: tokio::sync::futures::Notified<'a>,
72    }
73}
74
75pin_project! {
76    /// A Future that is resolved once the corresponding [`CancellationToken`]
77    /// is cancelled.
78    ///
79    /// This is the counterpart to [`WaitForCancellationFuture`] that takes
80    /// [`CancellationToken`] by value instead of using a reference.
81    #[must_use = "futures do nothing unless polled"]
82    pub struct WaitForCancellationFutureOwned {
83        // This field internally has a reference to the cancellation token, but camouflages
84        // the relationship with `'static`. To avoid Undefined Behavior, we must ensure
85        // that the reference is only used while the cancellation token is still alive. To
86        // do that, we ensure that the future is the first field, so that it is dropped
87        // before the cancellation token.
88        //
89        // We use `MaybeDanglingFuture` here because without it, the compiler could assert
90        // the reference inside `future` to be valid even after the destructor of that
91        // field runs. (Specifically, when the `WaitForCancellationFutureOwned` is passed
92        // as an argument to a function, the reference can be asserted to be valid for the
93        // rest of that function.) To avoid that, we use `MaybeDangling` which tells the
94        // compiler that the reference stored inside it might not be valid.
95        //
96        // See <https://users.rust-lang.org/t/unsafe-code-review-semi-owning-weak-rwlock-t-guard/95706>
97        // for more info.
98        #[pin]
99        future: MaybeDangling<tokio::sync::futures::Notified<'static>>,
100        cancellation_token: CancellationToken,
101    }
102}
103
104// ===== impl CancellationToken =====
105
106impl core::fmt::Debug for CancellationToken {
107    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
108        f.debug_struct("CancellationToken")
109            .field("is_cancelled", &self.is_cancelled())
110            .finish()
111    }
112}
113
114impl Clone for CancellationToken {
115    /// Creates a clone of the [`CancellationToken`] which will get cancelled
116    /// whenever the current token gets cancelled, and vice versa.
117    fn clone(&self) -> Self {
118        tree_node::increase_handle_refcount(&self.inner);
119        CancellationToken {
120            inner: self.inner.clone(),
121        }
122    }
123}
124
125impl Drop for CancellationToken {
126    fn drop(&mut self) {
127        tree_node::decrease_handle_refcount(&self.inner);
128    }
129}
130
131impl Default for CancellationToken {
132    fn default() -> CancellationToken {
133        CancellationToken::new()
134    }
135}
136
137impl CancellationToken {
138    /// Creates a new [`CancellationToken`] in the non-cancelled state.
139    pub fn new() -> CancellationToken {
140        CancellationToken {
141            inner: Arc::new(tree_node::TreeNode::new()),
142        }
143    }
144
145    /// Creates a [`CancellationToken`] which will get cancelled whenever the
146    /// current token gets cancelled. Unlike a cloned [`CancellationToken`],
147    /// cancelling a child token does not cancel the parent token.
148    ///
149    /// If the current token is already cancelled, the child token will get
150    /// returned in cancelled state.
151    ///
152    /// # Examples
153    ///
154    /// ```no_run
155    /// use tokio::select;
156    /// use tokio_util::sync::CancellationToken;
157    ///
158    /// #[tokio::main]
159    /// async fn main() {
160    ///     let token = CancellationToken::new();
161    ///     let child_token = token.child_token();
162    ///
163    ///     let join_handle = tokio::spawn(async move {
164    ///         // Wait for either cancellation or a very long time
165    ///         select! {
166    ///             _ = child_token.cancelled() => {
167    ///                 // The token was cancelled
168    ///                 5
169    ///             }
170    ///             _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
171    ///                 99
172    ///             }
173    ///         }
174    ///     });
175    ///
176    ///     tokio::spawn(async move {
177    ///         tokio::time::sleep(std::time::Duration::from_millis(10)).await;
178    ///         token.cancel();
179    ///     });
180    ///
181    ///     assert_eq!(5, join_handle.await.unwrap());
182    /// }
183    /// ```
184    pub fn child_token(&self) -> CancellationToken {
185        CancellationToken {
186            inner: tree_node::child_node(&self.inner),
187        }
188    }
189
190    /// Cancel the [`CancellationToken`] and all child tokens which had been
191    /// derived from it.
192    ///
193    /// This will wake up all tasks which are waiting for cancellation.
194    ///
195    /// Be aware that cancellation is not an atomic operation. It is possible
196    /// for another thread running in parallel with a call to `cancel` to first
197    /// receive `true` from `is_cancelled` on one child node, and then receive
198    /// `false` from `is_cancelled` on another child node. However, once the
199    /// call to `cancel` returns, all child nodes have been fully cancelled.
200    pub fn cancel(&self) {
201        tree_node::cancel(&self.inner);
202    }
203
204    /// Returns `true` if the `CancellationToken` is cancelled.
205    pub fn is_cancelled(&self) -> bool {
206        tree_node::is_cancelled(&self.inner)
207    }
208
209    /// Returns a [`Future`] that gets fulfilled when cancellation is requested.
210    ///
211    /// Equivalent to:
212    ///
213    /// ```ignore
214    /// async fn cancelled(&self);
215    /// ```
216    ///
217    /// The future will complete immediately if the token is already cancelled
218    /// when this method is called.
219    ///
220    /// # Cancellation safety
221    ///
222    /// This method is cancel safe.
223    pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
224        WaitForCancellationFuture {
225            cancellation_token: self,
226            future: self.inner.notified(),
227        }
228    }
229
230    /// Returns a [`Future`] that gets fulfilled when cancellation is requested.
231    ///
232    /// Equivalent to:
233    ///
234    /// ```ignore
235    /// async fn cancelled_owned(self);
236    /// ```
237    ///
238    /// The future will complete immediately if the token is already cancelled
239    /// when this method is called.
240    ///
241    /// The function takes self by value and returns a future that owns the
242    /// token.
243    ///
244    /// # Cancellation safety
245    ///
246    /// This method is cancel safe.
247    pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
248        WaitForCancellationFutureOwned::new(self)
249    }
250
251    /// Creates a [`DropGuard`] for this token.
252    ///
253    /// Returned guard will cancel this token (and all its children) on drop
254    /// unless disarmed.
255    pub fn drop_guard(self) -> DropGuard {
256        DropGuard { inner: Some(self) }
257    }
258
259    /// Creates a [`DropGuardRef`] for this token.
260    ///
261    /// Returned guard will cancel this token (and all its children) on drop
262    /// unless disarmed.
263    pub fn drop_guard_ref(&self) -> DropGuardRef<'_> {
264        DropGuardRef { inner: Some(self) }
265    }
266
267    /// Runs a future to completion and returns its result wrapped inside of an `Option`
268    /// unless the [`CancellationToken`] is cancelled. In that case the function returns
269    /// `None` and the future gets dropped.
270    ///
271    /// # Fairness
272    ///
273    /// Calling this on an already-cancelled token directly returns `None`.
274    /// For all subsequent polls, in case of concurrent completion and
275    /// cancellation, this is biased towards the future completion.
276    ///
277    /// # Cancellation safety
278    ///
279    /// This method is only cancel safe if `fut` is cancel safe.
280    pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
281    where
282        F: Future,
283    {
284        if self.is_cancelled() {
285            None
286        } else {
287            RunUntilCancelledFuture {
288                cancellation: self.cancelled(),
289                future: fut,
290            }
291            .await
292        }
293    }
294
295    /// Runs a future to completion and returns its result wrapped inside of an `Option`
296    /// unless the [`CancellationToken`] is cancelled. In that case the function returns
297    /// `None` and the future gets dropped.
298    ///
299    /// The function takes self by value and returns a future that owns the token.
300    ///
301    /// # Fairness
302    ///
303    /// Calling this on an already-cancelled token directly returns `None`.
304    /// For all subsequent polls, in case of concurrent completion and
305    /// cancellation, this is biased towards the future completion.
306    ///
307    /// # Cancellation safety
308    ///
309    /// This method is only cancel safe if `fut` is cancel safe.
310    pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
311    where
312        F: Future,
313    {
314        self.run_until_cancelled(fut).await
315    }
316}
317
318// ===== impl WaitForCancellationFuture =====
319
320impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
321    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
322        f.debug_struct("WaitForCancellationFuture").finish()
323    }
324}
325
326impl<'a> Future for WaitForCancellationFuture<'a> {
327    type Output = ();
328
329    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
330        let mut this = self.project();
331        loop {
332            if this.cancellation_token.is_cancelled() {
333                return Poll::Ready(());
334            }
335
336            // No wakeups can be lost here because there is always a call to
337            // `is_cancelled` between the creation of the future and the call to
338            // `poll`, and the code that sets the cancelled flag does so before
339            // waking the `Notified`.
340            if this.future.as_mut().poll(cx).is_pending() {
341                return Poll::Pending;
342            }
343
344            this.future.set(this.cancellation_token.inner.notified());
345        }
346    }
347}
348
349// ===== impl WaitForCancellationFutureOwned =====
350
351impl core::fmt::Debug for WaitForCancellationFutureOwned {
352    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
353        f.debug_struct("WaitForCancellationFutureOwned").finish()
354    }
355}
356
357impl WaitForCancellationFutureOwned {
358    fn new(cancellation_token: CancellationToken) -> Self {
359        WaitForCancellationFutureOwned {
360            // cancellation_token holds a heap allocation and is guaranteed to have a
361            // stable deref, thus it would be ok to move the cancellation_token while
362            // the future holds a reference to it.
363            //
364            // # Safety
365            //
366            // cancellation_token is dropped after future due to the field ordering.
367            future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }),
368            cancellation_token,
369        }
370    }
371
372    /// # Safety
373    /// The returned future must be destroyed before the cancellation token is
374    /// destroyed.
375    unsafe fn new_future(
376        cancellation_token: &CancellationToken,
377    ) -> tokio::sync::futures::Notified<'static> {
378        let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
379        // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains
380        // valid until the strong count of the Arc drops to zero, and the caller
381        // guarantees that they will drop the future before that happens.
382        (*inner_ptr).notified()
383    }
384}
385
386impl Future for WaitForCancellationFutureOwned {
387    type Output = ();
388
389    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
390        let mut this = self.project();
391
392        loop {
393            if this.cancellation_token.is_cancelled() {
394                return Poll::Ready(());
395            }
396
397            // No wakeups can be lost here because there is always a call to
398            // `is_cancelled` between the creation of the future and the call to
399            // `poll`, and the code that sets the cancelled flag does so before
400            // waking the `Notified`.
401            if this.future.as_mut().poll(cx).is_pending() {
402                return Poll::Pending;
403            }
404
405            // # Safety
406            //
407            // cancellation_token is dropped after future due to the field ordering.
408            this.future.set(MaybeDangling::new(unsafe {
409                Self::new_future(this.cancellation_token)
410            }));
411        }
412    }
413}
414
415pin_project! {
416    /// A Future that is resolved once the corresponding [`CancellationToken`]
417    /// is cancelled or a given Future gets resolved. It is biased towards the
418    /// Future completion.
419    #[must_use = "futures do nothing unless polled"]
420    pub(crate) struct RunUntilCancelledFuture<'a, F: Future> {
421        #[pin]
422        cancellation: WaitForCancellationFuture<'a>,
423        #[pin]
424        future: F,
425    }
426}
427
428impl<'a, F: Future> RunUntilCancelledFuture<'a, F> {
429    pub(crate) fn new(cancellation_token: &'a CancellationToken, future: F) -> Self {
430        Self {
431            cancellation: cancellation_token.cancelled(),
432            future,
433        }
434    }
435}
436
437impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
438    type Output = Option<F::Output>;
439
440    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441        let this = self.project();
442        if let Poll::Ready(res) = this.future.poll(cx) {
443            Poll::Ready(Some(res))
444        } else if this.cancellation.poll(cx).is_ready() {
445            Poll::Ready(None)
446        } else {
447            Poll::Pending
448        }
449    }
450}
451
452pin_project! {
453    /// A Future that is resolved once the corresponding [`CancellationToken`]
454    /// is cancelled or a given Future gets resolved. It is biased towards the
455    /// Future completion.
456    #[must_use = "futures do nothing unless polled"]
457    pub(crate) struct RunUntilCancelledFutureOwned<F: Future> {
458        #[pin]
459        cancellation: WaitForCancellationFutureOwned,
460        #[pin]
461        future: F,
462    }
463}
464
465impl<F: Future> Future for RunUntilCancelledFutureOwned<F> {
466    type Output = Option<F::Output>;
467
468    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469        let this = self.project();
470        if let Poll::Ready(res) = this.future.poll(cx) {
471            Poll::Ready(Some(res))
472        } else if this.cancellation.poll(cx).is_ready() {
473            Poll::Ready(None)
474        } else {
475            Poll::Pending
476        }
477    }
478}
479
480impl<F: Future> RunUntilCancelledFutureOwned<F> {
481    pub(crate) fn new(cancellation_token: CancellationToken, future: F) -> Self {
482        Self {
483            cancellation: cancellation_token.cancelled_owned(),
484            future,
485        }
486    }
487}