tokio_util/sync/cancellation_token.rs
1//! An asynchronously awaitable [`CancellationToken`].
2//! The token allows to signal a cancellation request to one or more tasks.
3pub(crate) mod guard;
4pub(crate) mod guard_ref;
5mod tree_node;
6
7use crate::loom::sync::Arc;
8use crate::util::MaybeDangling;
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12
13use guard::DropGuard;
14use guard_ref::DropGuardRef;
15use pin_project_lite::pin_project;
16
17/// A token which can be used to signal a cancellation request to one or more
18/// tasks.
19///
20/// Tasks can call [`CancellationToken::cancelled()`] in order to
21/// obtain a Future which will be resolved when cancellation is requested.
22///
23/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
24///
25/// # Examples
26///
27/// ```no_run
28/// use tokio::select;
29/// use tokio_util::sync::CancellationToken;
30///
31/// #[tokio::main]
32/// async fn main() {
33/// let token = CancellationToken::new();
34/// let cloned_token = token.clone();
35///
36/// let join_handle = tokio::spawn(async move {
37/// // Wait for either cancellation or a very long time
38/// select! {
39/// _ = cloned_token.cancelled() => {
40/// // The token was cancelled
41/// 5
42/// }
43/// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
44/// 99
45/// }
46/// }
47/// });
48///
49/// tokio::spawn(async move {
50/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
51/// token.cancel();
52/// });
53///
54/// assert_eq!(5, join_handle.await.unwrap());
55/// }
56/// ```
57pub struct CancellationToken {
58 inner: Arc<tree_node::TreeNode>,
59}
60
61impl std::panic::UnwindSafe for CancellationToken {}
62impl std::panic::RefUnwindSafe for CancellationToken {}
63
64pin_project! {
65 /// A Future that is resolved once the corresponding [`CancellationToken`]
66 /// is cancelled.
67 #[must_use = "futures do nothing unless polled"]
68 pub struct WaitForCancellationFuture<'a> {
69 cancellation_token: &'a CancellationToken,
70 #[pin]
71 future: tokio::sync::futures::Notified<'a>,
72 }
73}
74
75pin_project! {
76 /// A Future that is resolved once the corresponding [`CancellationToken`]
77 /// is cancelled.
78 ///
79 /// This is the counterpart to [`WaitForCancellationFuture`] that takes
80 /// [`CancellationToken`] by value instead of using a reference.
81 #[must_use = "futures do nothing unless polled"]
82 pub struct WaitForCancellationFutureOwned {
83 // This field internally has a reference to the cancellation token, but camouflages
84 // the relationship with `'static`. To avoid Undefined Behavior, we must ensure
85 // that the reference is only used while the cancellation token is still alive. To
86 // do that, we ensure that the future is the first field, so that it is dropped
87 // before the cancellation token.
88 //
89 // We use `MaybeDanglingFuture` here because without it, the compiler could assert
90 // the reference inside `future` to be valid even after the destructor of that
91 // field runs. (Specifically, when the `WaitForCancellationFutureOwned` is passed
92 // as an argument to a function, the reference can be asserted to be valid for the
93 // rest of that function.) To avoid that, we use `MaybeDangling` which tells the
94 // compiler that the reference stored inside it might not be valid.
95 //
96 // See <https://users.rust-lang.org/t/unsafe-code-review-semi-owning-weak-rwlock-t-guard/95706>
97 // for more info.
98 #[pin]
99 future: MaybeDangling<tokio::sync::futures::Notified<'static>>,
100 cancellation_token: CancellationToken,
101 }
102}
103
104// ===== impl CancellationToken =====
105
106impl core::fmt::Debug for CancellationToken {
107 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
108 f.debug_struct("CancellationToken")
109 .field("is_cancelled", &self.is_cancelled())
110 .finish()
111 }
112}
113
114impl Clone for CancellationToken {
115 /// Creates a clone of the [`CancellationToken`] which will get cancelled
116 /// whenever the current token gets cancelled, and vice versa.
117 fn clone(&self) -> Self {
118 tree_node::increase_handle_refcount(&self.inner);
119 CancellationToken {
120 inner: self.inner.clone(),
121 }
122 }
123}
124
125impl Drop for CancellationToken {
126 fn drop(&mut self) {
127 tree_node::decrease_handle_refcount(&self.inner);
128 }
129}
130
131impl Default for CancellationToken {
132 fn default() -> CancellationToken {
133 CancellationToken::new()
134 }
135}
136
137impl CancellationToken {
138 /// Creates a new [`CancellationToken`] in the non-cancelled state.
139 pub fn new() -> CancellationToken {
140 CancellationToken {
141 inner: Arc::new(tree_node::TreeNode::new()),
142 }
143 }
144
145 /// Creates a [`CancellationToken`] which will get cancelled whenever the
146 /// current token gets cancelled. Unlike a cloned [`CancellationToken`],
147 /// cancelling a child token does not cancel the parent token.
148 ///
149 /// If the current token is already cancelled, the child token will get
150 /// returned in cancelled state.
151 ///
152 /// # Examples
153 ///
154 /// ```no_run
155 /// use tokio::select;
156 /// use tokio_util::sync::CancellationToken;
157 ///
158 /// #[tokio::main]
159 /// async fn main() {
160 /// let token = CancellationToken::new();
161 /// let child_token = token.child_token();
162 ///
163 /// let join_handle = tokio::spawn(async move {
164 /// // Wait for either cancellation or a very long time
165 /// select! {
166 /// _ = child_token.cancelled() => {
167 /// // The token was cancelled
168 /// 5
169 /// }
170 /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
171 /// 99
172 /// }
173 /// }
174 /// });
175 ///
176 /// tokio::spawn(async move {
177 /// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
178 /// token.cancel();
179 /// });
180 ///
181 /// assert_eq!(5, join_handle.await.unwrap());
182 /// }
183 /// ```
184 pub fn child_token(&self) -> CancellationToken {
185 CancellationToken {
186 inner: tree_node::child_node(&self.inner),
187 }
188 }
189
190 /// Cancel the [`CancellationToken`] and all child tokens which had been
191 /// derived from it.
192 ///
193 /// This will wake up all tasks which are waiting for cancellation.
194 ///
195 /// Be aware that cancellation is not an atomic operation. It is possible
196 /// for another thread running in parallel with a call to `cancel` to first
197 /// receive `true` from `is_cancelled` on one child node, and then receive
198 /// `false` from `is_cancelled` on another child node. However, once the
199 /// call to `cancel` returns, all child nodes have been fully cancelled.
200 pub fn cancel(&self) {
201 tree_node::cancel(&self.inner);
202 }
203
204 /// Returns `true` if the `CancellationToken` is cancelled.
205 pub fn is_cancelled(&self) -> bool {
206 tree_node::is_cancelled(&self.inner)
207 }
208
209 /// Returns a [`Future`] that gets fulfilled when cancellation is requested.
210 ///
211 /// Equivalent to:
212 ///
213 /// ```ignore
214 /// async fn cancelled(&self);
215 /// ```
216 ///
217 /// The future will complete immediately if the token is already cancelled
218 /// when this method is called.
219 ///
220 /// # Cancellation safety
221 ///
222 /// This method is cancel safe.
223 pub fn cancelled(&self) -> WaitForCancellationFuture<'_> {
224 WaitForCancellationFuture {
225 cancellation_token: self,
226 future: self.inner.notified(),
227 }
228 }
229
230 /// Returns a [`Future`] that gets fulfilled when cancellation is requested.
231 ///
232 /// Equivalent to:
233 ///
234 /// ```ignore
235 /// async fn cancelled_owned(self);
236 /// ```
237 ///
238 /// The future will complete immediately if the token is already cancelled
239 /// when this method is called.
240 ///
241 /// The function takes self by value and returns a future that owns the
242 /// token.
243 ///
244 /// # Cancellation safety
245 ///
246 /// This method is cancel safe.
247 pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
248 WaitForCancellationFutureOwned::new(self)
249 }
250
251 /// Creates a [`DropGuard`] for this token.
252 ///
253 /// Returned guard will cancel this token (and all its children) on drop
254 /// unless disarmed.
255 pub fn drop_guard(self) -> DropGuard {
256 DropGuard { inner: Some(self) }
257 }
258
259 /// Creates a [`DropGuardRef`] for this token.
260 ///
261 /// Returned guard will cancel this token (and all its children) on drop
262 /// unless disarmed.
263 pub fn drop_guard_ref(&self) -> DropGuardRef<'_> {
264 DropGuardRef { inner: Some(self) }
265 }
266
267 /// Runs a future to completion and returns its result wrapped inside of an `Option`
268 /// unless the [`CancellationToken`] is cancelled. In that case the function returns
269 /// `None` and the future gets dropped.
270 ///
271 /// # Fairness
272 ///
273 /// Calling this on an already-cancelled token directly returns `None`.
274 /// For all subsequent polls, in case of concurrent completion and
275 /// cancellation, this is biased towards the future completion.
276 ///
277 /// # Cancellation safety
278 ///
279 /// This method is only cancel safe if `fut` is cancel safe.
280 pub async fn run_until_cancelled<F>(&self, fut: F) -> Option<F::Output>
281 where
282 F: Future,
283 {
284 if self.is_cancelled() {
285 None
286 } else {
287 RunUntilCancelledFuture {
288 cancellation: self.cancelled(),
289 future: fut,
290 }
291 .await
292 }
293 }
294
295 /// Runs a future to completion and returns its result wrapped inside of an `Option`
296 /// unless the [`CancellationToken`] is cancelled. In that case the function returns
297 /// `None` and the future gets dropped.
298 ///
299 /// The function takes self by value and returns a future that owns the token.
300 ///
301 /// # Fairness
302 ///
303 /// Calling this on an already-cancelled token directly returns `None`.
304 /// For all subsequent polls, in case of concurrent completion and
305 /// cancellation, this is biased towards the future completion.
306 ///
307 /// # Cancellation safety
308 ///
309 /// This method is only cancel safe if `fut` is cancel safe.
310 pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
311 where
312 F: Future,
313 {
314 self.run_until_cancelled(fut).await
315 }
316}
317
318// ===== impl WaitForCancellationFuture =====
319
320impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
321 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
322 f.debug_struct("WaitForCancellationFuture").finish()
323 }
324}
325
326impl<'a> Future for WaitForCancellationFuture<'a> {
327 type Output = ();
328
329 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
330 let mut this = self.project();
331 loop {
332 if this.cancellation_token.is_cancelled() {
333 return Poll::Ready(());
334 }
335
336 // No wakeups can be lost here because there is always a call to
337 // `is_cancelled` between the creation of the future and the call to
338 // `poll`, and the code that sets the cancelled flag does so before
339 // waking the `Notified`.
340 if this.future.as_mut().poll(cx).is_pending() {
341 return Poll::Pending;
342 }
343
344 this.future.set(this.cancellation_token.inner.notified());
345 }
346 }
347}
348
349// ===== impl WaitForCancellationFutureOwned =====
350
351impl core::fmt::Debug for WaitForCancellationFutureOwned {
352 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
353 f.debug_struct("WaitForCancellationFutureOwned").finish()
354 }
355}
356
357impl WaitForCancellationFutureOwned {
358 fn new(cancellation_token: CancellationToken) -> Self {
359 WaitForCancellationFutureOwned {
360 // cancellation_token holds a heap allocation and is guaranteed to have a
361 // stable deref, thus it would be ok to move the cancellation_token while
362 // the future holds a reference to it.
363 //
364 // # Safety
365 //
366 // cancellation_token is dropped after future due to the field ordering.
367 future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }),
368 cancellation_token,
369 }
370 }
371
372 /// # Safety
373 /// The returned future must be destroyed before the cancellation token is
374 /// destroyed.
375 unsafe fn new_future(
376 cancellation_token: &CancellationToken,
377 ) -> tokio::sync::futures::Notified<'static> {
378 let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
379 // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains
380 // valid until the strong count of the Arc drops to zero, and the caller
381 // guarantees that they will drop the future before that happens.
382 (*inner_ptr).notified()
383 }
384}
385
386impl Future for WaitForCancellationFutureOwned {
387 type Output = ();
388
389 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
390 let mut this = self.project();
391
392 loop {
393 if this.cancellation_token.is_cancelled() {
394 return Poll::Ready(());
395 }
396
397 // No wakeups can be lost here because there is always a call to
398 // `is_cancelled` between the creation of the future and the call to
399 // `poll`, and the code that sets the cancelled flag does so before
400 // waking the `Notified`.
401 if this.future.as_mut().poll(cx).is_pending() {
402 return Poll::Pending;
403 }
404
405 // # Safety
406 //
407 // cancellation_token is dropped after future due to the field ordering.
408 this.future.set(MaybeDangling::new(unsafe {
409 Self::new_future(this.cancellation_token)
410 }));
411 }
412 }
413}
414
415pin_project! {
416 /// A Future that is resolved once the corresponding [`CancellationToken`]
417 /// is cancelled or a given Future gets resolved. It is biased towards the
418 /// Future completion.
419 #[must_use = "futures do nothing unless polled"]
420 pub(crate) struct RunUntilCancelledFuture<'a, F: Future> {
421 #[pin]
422 cancellation: WaitForCancellationFuture<'a>,
423 #[pin]
424 future: F,
425 }
426}
427
428impl<'a, F: Future> RunUntilCancelledFuture<'a, F> {
429 pub(crate) fn new(cancellation_token: &'a CancellationToken, future: F) -> Self {
430 Self {
431 cancellation: cancellation_token.cancelled(),
432 future,
433 }
434 }
435}
436
437impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> {
438 type Output = Option<F::Output>;
439
440 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441 let this = self.project();
442 if let Poll::Ready(res) = this.future.poll(cx) {
443 Poll::Ready(Some(res))
444 } else if this.cancellation.poll(cx).is_ready() {
445 Poll::Ready(None)
446 } else {
447 Poll::Pending
448 }
449 }
450}
451
452pin_project! {
453 /// A Future that is resolved once the corresponding [`CancellationToken`]
454 /// is cancelled or a given Future gets resolved. It is biased towards the
455 /// Future completion.
456 #[must_use = "futures do nothing unless polled"]
457 pub(crate) struct RunUntilCancelledFutureOwned<F: Future> {
458 #[pin]
459 cancellation: WaitForCancellationFutureOwned,
460 #[pin]
461 future: F,
462 }
463}
464
465impl<F: Future> Future for RunUntilCancelledFutureOwned<F> {
466 type Output = Option<F::Output>;
467
468 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469 let this = self.project();
470 if let Poll::Ready(res) = this.future.poll(cx) {
471 Poll::Ready(Some(res))
472 } else if this.cancellation.poll(cx).is_ready() {
473 Poll::Ready(None)
474 } else {
475 Poll::Pending
476 }
477 }
478}
479
480impl<F: Future> RunUntilCancelledFutureOwned<F> {
481 pub(crate) fn new(cancellation_token: CancellationToken, future: F) -> Self {
482 Self {
483 cancellation: cancellation_token.cancelled_owned(),
484 future,
485 }
486 }
487}