tokio/sync/oneshot.rs
1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! Since the `send` method is not async, it can be used anywhere. This includes
13//! sending between two runtimes, and using it from non-async code.
14//!
15//! If the [`Receiver`] is closed before receiving a message which has already
16//! been sent, the message will remain in the channel until the receiver is
17//! dropped, at which point the message will be dropped immediately.
18//!
19//! # Examples
20//!
21//! ```
22//! use tokio::sync::oneshot;
23//!
24//! # #[tokio::main(flavor = "current_thread")]
25//! # async fn main() {
26//! let (tx, rx) = oneshot::channel();
27//!
28//! tokio::spawn(async move {
29//! if let Err(_) = tx.send(3) {
30//! println!("the receiver dropped");
31//! }
32//! });
33//!
34//! match rx.await {
35//! Ok(v) => println!("got = {:?}", v),
36//! Err(_) => println!("the sender dropped"),
37//! }
38//! # }
39//! ```
40//!
41//! If the sender is dropped without sending, the receiver will fail with
42//! [`error::RecvError`]:
43//!
44//! ```
45//! use tokio::sync::oneshot;
46//!
47//! # #[tokio::main(flavor = "current_thread")]
48//! # async fn main() {
49//! let (tx, rx) = oneshot::channel::<u32>();
50//!
51//! tokio::spawn(async move {
52//! drop(tx);
53//! });
54//!
55//! match rx.await {
56//! Ok(_) => panic!("This doesn't happen"),
57//! Err(_) => println!("the sender dropped"),
58//! }
59//! # }
60//! ```
61//!
62//! To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
63//! the channel.
64//!
65//! ```
66//! use tokio::sync::oneshot;
67//! use tokio::time::{interval, sleep, Duration};
68//!
69//! # #[tokio::main(flavor = "current_thread")]
70//! # async fn _doc() {}
71//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
72//! # async fn main() {
73//! let (send, mut recv) = oneshot::channel();
74//! let mut interval = interval(Duration::from_millis(100));
75//!
76//! # let handle =
77//! tokio::spawn(async move {
78//! sleep(Duration::from_secs(1)).await;
79//! send.send("shut down").unwrap();
80//! });
81//!
82//! loop {
83//! tokio::select! {
84//! _ = interval.tick() => println!("Another 100ms"),
85//! msg = &mut recv => {
86//! println!("Got message: {}", msg.unwrap());
87//! break;
88//! }
89//! }
90//! }
91//! # handle.await.unwrap();
92//! # }
93//! ```
94//!
95//! To use a `Sender` from a destructor, put it in an [`Option`] and call
96//! [`Option::take`].
97//!
98//! ```
99//! use tokio::sync::oneshot;
100//!
101//! struct SendOnDrop {
102//! sender: Option<oneshot::Sender<&'static str>>,
103//! }
104//! impl Drop for SendOnDrop {
105//! fn drop(&mut self) {
106//! if let Some(sender) = self.sender.take() {
107//! // Using `let _ =` to ignore send errors.
108//! let _ = sender.send("I got dropped!");
109//! }
110//! }
111//! }
112//!
113//! # #[tokio::main(flavor = "current_thread")]
114//! # async fn _doc() {}
115//! # #[tokio::main(flavor = "current_thread")]
116//! # async fn main() {
117//! let (send, recv) = oneshot::channel();
118//!
119//! let send_on_drop = SendOnDrop { sender: Some(send) };
120//! drop(send_on_drop);
121//!
122//! assert_eq!(recv.await, Ok("I got dropped!"));
123//! # }
124//! ```
125
126use crate::loom::cell::UnsafeCell;
127use crate::loom::sync::atomic::AtomicUsize;
128use crate::loom::sync::Arc;
129#[cfg(all(tokio_unstable, feature = "tracing"))]
130use crate::util::trace;
131
132use std::fmt;
133use std::future::Future;
134use std::mem::MaybeUninit;
135use std::pin::Pin;
136use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
137use std::task::Poll::{Pending, Ready};
138use std::task::{ready, Context, Poll, Waker};
139
140/// Sends a value to the associated [`Receiver`].
141///
142/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
143/// [`channel`](fn@channel) function.
144///
145/// # Examples
146///
147/// ```
148/// use tokio::sync::oneshot;
149///
150/// # #[tokio::main(flavor = "current_thread")]
151/// # async fn main() {
152/// let (tx, rx) = oneshot::channel();
153///
154/// tokio::spawn(async move {
155/// if let Err(_) = tx.send(3) {
156/// println!("the receiver dropped");
157/// }
158/// });
159///
160/// match rx.await {
161/// Ok(v) => println!("got = {:?}", v),
162/// Err(_) => println!("the sender dropped"),
163/// }
164/// # }
165/// ```
166///
167/// If the sender is dropped without sending, the receiver will fail with
168/// [`error::RecvError`]:
169///
170/// ```
171/// use tokio::sync::oneshot;
172///
173/// # #[tokio::main(flavor = "current_thread")]
174/// # async fn main() {
175/// let (tx, rx) = oneshot::channel::<u32>();
176///
177/// tokio::spawn(async move {
178/// drop(tx);
179/// });
180///
181/// match rx.await {
182/// Ok(_) => panic!("This doesn't happen"),
183/// Err(_) => println!("the sender dropped"),
184/// }
185/// # }
186/// ```
187///
188/// To use a `Sender` from a destructor, put it in an [`Option`] and call
189/// [`Option::take`].
190///
191/// ```
192/// use tokio::sync::oneshot;
193///
194/// struct SendOnDrop {
195/// sender: Option<oneshot::Sender<&'static str>>,
196/// }
197/// impl Drop for SendOnDrop {
198/// fn drop(&mut self) {
199/// if let Some(sender) = self.sender.take() {
200/// // Using `let _ =` to ignore send errors.
201/// let _ = sender.send("I got dropped!");
202/// }
203/// }
204/// }
205///
206/// # #[tokio::main(flavor = "current_thread")]
207/// # async fn _doc() {}
208/// # #[tokio::main(flavor = "current_thread")]
209/// # async fn main() {
210/// let (send, recv) = oneshot::channel();
211///
212/// let send_on_drop = SendOnDrop { sender: Some(send) };
213/// drop(send_on_drop);
214///
215/// assert_eq!(recv.await, Ok("I got dropped!"));
216/// # }
217/// ```
218///
219/// [`Option`]: std::option::Option
220/// [`Option::take`]: std::option::Option::take
221#[derive(Debug)]
222pub struct Sender<T> {
223 inner: Option<Arc<Inner<T>>>,
224 #[cfg(all(tokio_unstable, feature = "tracing"))]
225 resource_span: tracing::Span,
226}
227
228/// Receives a value from the associated [`Sender`].
229///
230/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
231/// [`channel`](fn@channel) function.
232///
233/// This channel has no `recv` method because the receiver itself implements the
234/// [`Future`] trait. To receive a `Result<T, `[`error::RecvError`]`>`, `.await` the `Receiver` object directly.
235///
236/// The `poll` method on the `Future` trait is allowed to spuriously return
237/// `Poll::Pending` even if the message has been sent. If such a spurious
238/// failure happens, then the caller will be woken when the spurious failure has
239/// been resolved so that the caller can attempt to receive the message again.
240/// Note that receiving such a wakeup does not guarantee that the next call will
241/// succeed — it could fail with another spurious failure. (A spurious failure
242/// does not mean that the message is lost. It is just delayed.)
243///
244/// [`Future`]: trait@std::future::Future
245///
246/// # Examples
247///
248/// ```
249/// use tokio::sync::oneshot;
250///
251/// # #[tokio::main(flavor = "current_thread")]
252/// # async fn main() {
253/// let (tx, rx) = oneshot::channel();
254///
255/// tokio::spawn(async move {
256/// if let Err(_) = tx.send(3) {
257/// println!("the receiver dropped");
258/// }
259/// });
260///
261/// match rx.await {
262/// Ok(v) => println!("got = {:?}", v),
263/// Err(_) => println!("the sender dropped"),
264/// }
265/// # }
266/// ```
267///
268/// If the sender is dropped without sending, the receiver will fail with
269/// [`error::RecvError`]:
270///
271/// ```
272/// use tokio::sync::oneshot;
273///
274/// # #[tokio::main(flavor = "current_thread")]
275/// # async fn main() {
276/// let (tx, rx) = oneshot::channel::<u32>();
277///
278/// tokio::spawn(async move {
279/// drop(tx);
280/// });
281///
282/// match rx.await {
283/// Ok(_) => panic!("This doesn't happen"),
284/// Err(_) => println!("the sender dropped"),
285/// }
286/// # }
287/// ```
288///
289/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
290/// channel.
291///
292/// ```
293/// use tokio::sync::oneshot;
294/// use tokio::time::{interval, sleep, Duration};
295///
296/// # #[tokio::main(flavor = "current_thread")]
297/// # async fn _doc() {}
298/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
299/// # async fn main() {
300/// let (send, mut recv) = oneshot::channel();
301/// let mut interval = interval(Duration::from_millis(100));
302///
303/// # let handle =
304/// tokio::spawn(async move {
305/// sleep(Duration::from_secs(1)).await;
306/// send.send("shut down").unwrap();
307/// });
308///
309/// loop {
310/// tokio::select! {
311/// _ = interval.tick() => println!("Another 100ms"),
312/// msg = &mut recv => {
313/// println!("Got message: {}", msg.unwrap());
314/// break;
315/// }
316/// }
317/// }
318/// # handle.await.unwrap();
319/// # }
320/// ```
321#[derive(Debug)]
322pub struct Receiver<T> {
323 inner: Option<Arc<Inner<T>>>,
324 #[cfg(all(tokio_unstable, feature = "tracing"))]
325 resource_span: tracing::Span,
326 #[cfg(all(tokio_unstable, feature = "tracing"))]
327 async_op_span: tracing::Span,
328 #[cfg(all(tokio_unstable, feature = "tracing"))]
329 async_op_poll_span: tracing::Span,
330}
331
332pub mod error {
333 //! `Oneshot` error types.
334
335 use std::fmt;
336
337 /// Error returned by the `Future` implementation for `Receiver`.
338 ///
339 /// This error is returned by the receiver when the sender is dropped without sending.
340 #[derive(Debug, Eq, PartialEq, Clone)]
341 pub struct RecvError(pub(super) ());
342
343 /// Error returned by the `try_recv` function on `Receiver`.
344 #[derive(Debug, Eq, PartialEq, Clone)]
345 pub enum TryRecvError {
346 /// The send half of the channel has not yet sent a value.
347 Empty,
348
349 /// The send half of the channel was dropped without sending a value.
350 Closed,
351 }
352
353 // ===== impl RecvError =====
354
355 impl fmt::Display for RecvError {
356 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
357 write!(fmt, "channel closed")
358 }
359 }
360
361 impl std::error::Error for RecvError {}
362
363 // ===== impl TryRecvError =====
364
365 impl fmt::Display for TryRecvError {
366 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
367 match self {
368 TryRecvError::Empty => write!(fmt, "channel empty"),
369 TryRecvError::Closed => write!(fmt, "channel closed"),
370 }
371 }
372 }
373
374 impl std::error::Error for TryRecvError {}
375}
376
377use self::error::*;
378
379struct Inner<T> {
380 /// Manages the state of the inner cell.
381 state: AtomicUsize,
382
383 /// The value. This is set by `Sender` and read by `Receiver`. The state of
384 /// the cell is tracked by `state`.
385 value: UnsafeCell<Option<T>>,
386
387 /// The task to notify when the receiver drops without consuming the value.
388 ///
389 /// ## Safety
390 ///
391 /// The `TX_TASK_SET` bit in the `state` field is set if this field is
392 /// initialized. If that bit is unset, this field may be uninitialized.
393 tx_task: Task,
394
395 /// The task to notify when the value is sent.
396 ///
397 /// ## Safety
398 ///
399 /// The `RX_TASK_SET` bit in the `state` field is set if this field is
400 /// initialized. If that bit is unset, this field may be uninitialized.
401 rx_task: Task,
402}
403
404struct Task(UnsafeCell<MaybeUninit<Waker>>);
405
406impl Task {
407 unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
408 self.with_task(|w| w.will_wake(cx.waker()))
409 }
410
411 unsafe fn with_task<F, R>(&self, f: F) -> R
412 where
413 F: FnOnce(&Waker) -> R,
414 {
415 self.0.with(|ptr| {
416 let waker: *const Waker = (*ptr).as_ptr();
417 f(&*waker)
418 })
419 }
420
421 unsafe fn drop_task(&self) {
422 self.0.with_mut(|ptr| {
423 let ptr: *mut Waker = (*ptr).as_mut_ptr();
424 ptr.drop_in_place();
425 });
426 }
427
428 unsafe fn set_task(&self, cx: &mut Context<'_>) {
429 self.0.with_mut(|ptr| {
430 let ptr: *mut Waker = (*ptr).as_mut_ptr();
431 ptr.write(cx.waker().clone());
432 });
433 }
434}
435
436#[derive(Clone, Copy)]
437struct State(usize);
438
439/// Creates a new one-shot channel for sending single values across asynchronous
440/// tasks.
441///
442/// The function returns separate "send" and "receive" handles. The `Sender`
443/// handle is used by the producer to send the value. The `Receiver` handle is
444/// used by the consumer to receive the value.
445///
446/// Each handle can be used on separate tasks.
447///
448/// # Examples
449///
450/// ```
451/// use tokio::sync::oneshot;
452///
453/// # #[tokio::main(flavor = "current_thread")]
454/// # async fn main() {
455/// let (tx, rx) = oneshot::channel();
456///
457/// tokio::spawn(async move {
458/// if let Err(_) = tx.send(3) {
459/// println!("the receiver dropped");
460/// }
461/// });
462///
463/// match rx.await {
464/// Ok(v) => println!("got = {:?}", v),
465/// Err(_) => println!("the sender dropped"),
466/// }
467/// # }
468/// ```
469#[track_caller]
470pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
471 #[cfg(all(tokio_unstable, feature = "tracing"))]
472 let resource_span = {
473 let location = std::panic::Location::caller();
474
475 let resource_span = tracing::trace_span!(
476 parent: None,
477 "runtime.resource",
478 concrete_type = "Sender|Receiver",
479 kind = "Sync",
480 loc.file = location.file(),
481 loc.line = location.line(),
482 loc.col = location.column(),
483 );
484
485 resource_span.in_scope(|| {
486 tracing::trace!(
487 target: "runtime::resource::state_update",
488 tx_dropped = false,
489 tx_dropped.op = "override",
490 )
491 });
492
493 resource_span.in_scope(|| {
494 tracing::trace!(
495 target: "runtime::resource::state_update",
496 rx_dropped = false,
497 rx_dropped.op = "override",
498 )
499 });
500
501 resource_span.in_scope(|| {
502 tracing::trace!(
503 target: "runtime::resource::state_update",
504 value_sent = false,
505 value_sent.op = "override",
506 )
507 });
508
509 resource_span.in_scope(|| {
510 tracing::trace!(
511 target: "runtime::resource::state_update",
512 value_received = false,
513 value_received.op = "override",
514 )
515 });
516
517 resource_span
518 };
519
520 let inner = Arc::new(Inner {
521 state: AtomicUsize::new(State::new().as_usize()),
522 value: UnsafeCell::new(None),
523 tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
524 rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
525 });
526
527 let tx = Sender {
528 inner: Some(inner.clone()),
529 #[cfg(all(tokio_unstable, feature = "tracing"))]
530 resource_span: resource_span.clone(),
531 };
532
533 #[cfg(all(tokio_unstable, feature = "tracing"))]
534 let async_op_span = resource_span
535 .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
536
537 #[cfg(all(tokio_unstable, feature = "tracing"))]
538 let async_op_poll_span =
539 async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
540
541 let rx = Receiver {
542 inner: Some(inner),
543 #[cfg(all(tokio_unstable, feature = "tracing"))]
544 resource_span,
545 #[cfg(all(tokio_unstable, feature = "tracing"))]
546 async_op_span,
547 #[cfg(all(tokio_unstable, feature = "tracing"))]
548 async_op_poll_span,
549 };
550
551 (tx, rx)
552}
553
554impl<T> Sender<T> {
555 /// Attempts to send a value on this channel, returning it back if it could
556 /// not be sent.
557 ///
558 /// This method consumes `self` as only one value may ever be sent on a `oneshot`
559 /// channel. It is not marked async because sending a message to an `oneshot`
560 /// channel never requires any form of waiting. Because of this, the `send`
561 /// method can be used in both synchronous and asynchronous code without
562 /// problems.
563 ///
564 /// A successful send occurs when it is determined that the other end of the
565 /// channel has not hung up already. An unsuccessful send would be one where
566 /// the corresponding receiver has already been deallocated. Note that a
567 /// return value of `Err` means that the data will never be received, but
568 /// a return value of `Ok` does *not* mean that the data will be received.
569 /// It is possible for the corresponding receiver to hang up immediately
570 /// after this function returns `Ok`.
571 ///
572 /// # Examples
573 ///
574 /// Send a value to another task
575 ///
576 /// ```
577 /// use tokio::sync::oneshot;
578 ///
579 /// # #[tokio::main(flavor = "current_thread")]
580 /// # async fn main() {
581 /// let (tx, rx) = oneshot::channel();
582 ///
583 /// tokio::spawn(async move {
584 /// if let Err(_) = tx.send(3) {
585 /// println!("the receiver dropped");
586 /// }
587 /// });
588 ///
589 /// match rx.await {
590 /// Ok(v) => println!("got = {:?}", v),
591 /// Err(_) => println!("the sender dropped"),
592 /// }
593 /// # }
594 /// ```
595 pub fn send(mut self, t: T) -> Result<(), T> {
596 let inner = self.inner.take().unwrap();
597
598 inner.value.with_mut(|ptr| unsafe {
599 // SAFETY: The receiver will not access the `UnsafeCell` unless the
600 // channel has been marked as "complete" (the `VALUE_SENT` state bit
601 // is set).
602 // That bit is only set by the sender later on in this method, and
603 // calling this method consumes `self`. Therefore, if it was possible to
604 // call this method, we know that the `VALUE_SENT` bit is unset, and
605 // the receiver is not currently accessing the `UnsafeCell`.
606 *ptr = Some(t);
607 });
608
609 if !inner.complete() {
610 unsafe {
611 // SAFETY: The receiver will not access the `UnsafeCell` unless
612 // the channel has been marked as "complete". Calling
613 // `complete()` will return true if this bit is set, and false
614 // if it is not set. Thus, if `complete()` returned false, it is
615 // safe for us to access the value, because we know that the
616 // receiver will not.
617 return Err(inner.consume_value().unwrap());
618 }
619 }
620
621 #[cfg(all(tokio_unstable, feature = "tracing"))]
622 self.resource_span.in_scope(|| {
623 tracing::trace!(
624 target: "runtime::resource::state_update",
625 value_sent = true,
626 value_sent.op = "override",
627 )
628 });
629
630 Ok(())
631 }
632
633 /// Waits for the associated [`Receiver`] handle to close.
634 ///
635 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
636 /// [`Receiver`] value is dropped.
637 ///
638 /// This function is useful when paired with `select!` to abort a
639 /// computation when the receiver is no longer interested in the result.
640 ///
641 /// # Return
642 ///
643 /// Returns a `Future` which must be awaited on.
644 ///
645 /// [`Receiver`]: Receiver
646 /// [`close`]: Receiver::close
647 ///
648 /// # Examples
649 ///
650 /// Basic usage
651 ///
652 /// ```
653 /// use tokio::sync::oneshot;
654 ///
655 /// # #[tokio::main(flavor = "current_thread")]
656 /// # async fn main() {
657 /// let (mut tx, rx) = oneshot::channel::<()>();
658 ///
659 /// tokio::spawn(async move {
660 /// drop(rx);
661 /// });
662 ///
663 /// tx.closed().await;
664 /// println!("the receiver dropped");
665 /// # }
666 /// ```
667 ///
668 /// Paired with select
669 ///
670 /// ```
671 /// use tokio::sync::oneshot;
672 /// use tokio::time::{self, Duration};
673 ///
674 /// async fn compute() -> String {
675 /// // Complex computation returning a `String`
676 /// # "hello".to_string()
677 /// }
678 ///
679 /// # #[tokio::main(flavor = "current_thread")]
680 /// # async fn main() {
681 /// let (mut tx, rx) = oneshot::channel();
682 ///
683 /// tokio::spawn(async move {
684 /// tokio::select! {
685 /// _ = tx.closed() => {
686 /// // The receiver dropped, no need to do any further work
687 /// }
688 /// value = compute() => {
689 /// // The send can fail if the channel was closed at the exact same
690 /// // time as when compute() finished, so just ignore the failure.
691 /// let _ = tx.send(value);
692 /// }
693 /// }
694 /// });
695 ///
696 /// // Wait for up to 10 seconds
697 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
698 /// # }
699 /// ```
700 pub async fn closed(&mut self) {
701 use std::future::poll_fn;
702
703 #[cfg(all(tokio_unstable, feature = "tracing"))]
704 let resource_span = self.resource_span.clone();
705 #[cfg(all(tokio_unstable, feature = "tracing"))]
706 let closed = trace::async_op(
707 || poll_fn(|cx| self.poll_closed(cx)),
708 resource_span,
709 "Sender::closed",
710 "poll_closed",
711 false,
712 );
713 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
714 let closed = poll_fn(|cx| self.poll_closed(cx));
715
716 closed.await;
717 }
718
719 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
720 ///
721 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
722 /// [`Receiver`] value is dropped.
723 ///
724 /// If `true` is returned, a call to `send` will always result in an error.
725 ///
726 /// [`Receiver`]: Receiver
727 /// [`close`]: Receiver::close
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// use tokio::sync::oneshot;
733 ///
734 /// # #[tokio::main(flavor = "current_thread")]
735 /// # async fn main() {
736 /// let (tx, rx) = oneshot::channel();
737 ///
738 /// assert!(!tx.is_closed());
739 ///
740 /// drop(rx);
741 ///
742 /// assert!(tx.is_closed());
743 /// assert!(tx.send("never received").is_err());
744 /// # }
745 /// ```
746 pub fn is_closed(&self) -> bool {
747 let inner = self.inner.as_ref().unwrap();
748
749 let state = State::load(&inner.state, Acquire);
750 state.is_closed()
751 }
752
753 /// Checks whether the `oneshot` channel has been closed, and if not, schedules the
754 /// `Waker` in the provided `Context` to receive a notification when the channel is
755 /// closed.
756 ///
757 /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
758 /// [`Receiver`] value is dropped.
759 ///
760 /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
761 /// to the most recent call will be scheduled to receive a wakeup.
762 ///
763 /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
764 /// [`close`]: fn@crate::sync::oneshot::Receiver::close
765 ///
766 /// # Return value
767 ///
768 /// This function returns:
769 ///
770 /// * `Poll::Pending` if the channel is still open.
771 /// * `Poll::Ready(())` if the channel is closed.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use tokio::sync::oneshot;
777 ///
778 /// use std::future::poll_fn;
779 ///
780 /// # #[tokio::main(flavor = "current_thread")]
781 /// # async fn main() {
782 /// let (mut tx, mut rx) = oneshot::channel::<()>();
783 ///
784 /// tokio::spawn(async move {
785 /// rx.close();
786 /// });
787 ///
788 /// poll_fn(|cx| tx.poll_closed(cx)).await;
789 ///
790 /// println!("the receiver dropped");
791 /// # }
792 /// ```
793 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
794 ready!(crate::trace::trace_leaf(cx));
795
796 // Keep track of task budget
797 let coop = ready!(crate::task::coop::poll_proceed(cx));
798
799 let inner = self.inner.as_ref().unwrap();
800
801 let mut state = State::load(&inner.state, Acquire);
802
803 if state.is_closed() {
804 coop.made_progress();
805 return Ready(());
806 }
807
808 if state.is_tx_task_set() {
809 let will_notify = unsafe { inner.tx_task.will_wake(cx) };
810
811 if !will_notify {
812 state = State::unset_tx_task(&inner.state);
813
814 if state.is_closed() {
815 // Set the flag again so that the waker is released in drop
816 State::set_tx_task(&inner.state);
817 coop.made_progress();
818 return Ready(());
819 } else {
820 unsafe { inner.tx_task.drop_task() };
821 }
822 }
823 }
824
825 if !state.is_tx_task_set() {
826 // Attempt to set the task
827 unsafe {
828 inner.tx_task.set_task(cx);
829 }
830
831 // Update the state
832 state = State::set_tx_task(&inner.state);
833
834 if state.is_closed() {
835 coop.made_progress();
836 return Ready(());
837 }
838 }
839
840 Pending
841 }
842}
843
844impl<T> Drop for Sender<T> {
845 fn drop(&mut self) {
846 if let Some(inner) = self.inner.as_ref() {
847 inner.complete();
848 #[cfg(all(tokio_unstable, feature = "tracing"))]
849 self.resource_span.in_scope(|| {
850 tracing::trace!(
851 target: "runtime::resource::state_update",
852 tx_dropped = true,
853 tx_dropped.op = "override",
854 )
855 });
856 }
857 }
858}
859
860impl<T> Receiver<T> {
861 /// Prevents the associated [`Sender`] handle from sending a value.
862 ///
863 /// Any `send` operation which happens after calling `close` is guaranteed
864 /// to fail. After calling `close`, [`try_recv`] should be called to
865 /// receive a value if one was sent **before** the call to `close`
866 /// completed.
867 ///
868 /// This function is useful to perform a graceful shutdown and ensure that a
869 /// value will not be sent into the channel and never received.
870 ///
871 /// `close` is no-op if a message is already received or the channel
872 /// is already closed.
873 ///
874 /// [`Sender`]: Sender
875 /// [`try_recv`]: Receiver::try_recv
876 ///
877 /// # Examples
878 ///
879 /// Prevent a value from being sent
880 ///
881 /// ```
882 /// use tokio::sync::oneshot;
883 /// use tokio::sync::oneshot::error::TryRecvError;
884 ///
885 /// # #[tokio::main(flavor = "current_thread")]
886 /// # async fn main() {
887 /// let (tx, mut rx) = oneshot::channel();
888 ///
889 /// assert!(!tx.is_closed());
890 ///
891 /// rx.close();
892 ///
893 /// assert!(tx.is_closed());
894 /// assert!(tx.send("never received").is_err());
895 ///
896 /// match rx.try_recv() {
897 /// Err(TryRecvError::Closed) => {}
898 /// _ => unreachable!(),
899 /// }
900 /// # }
901 /// ```
902 ///
903 /// Receive a value sent **before** calling `close`
904 ///
905 /// ```
906 /// use tokio::sync::oneshot;
907 ///
908 /// # #[tokio::main(flavor = "current_thread")]
909 /// # async fn main() {
910 /// let (tx, mut rx) = oneshot::channel();
911 ///
912 /// assert!(tx.send("will receive").is_ok());
913 ///
914 /// rx.close();
915 ///
916 /// let msg = rx.try_recv().unwrap();
917 /// assert_eq!(msg, "will receive");
918 /// # }
919 /// ```
920 pub fn close(&mut self) {
921 if let Some(inner) = self.inner.as_ref() {
922 inner.close();
923 #[cfg(all(tokio_unstable, feature = "tracing"))]
924 self.resource_span.in_scope(|| {
925 tracing::trace!(
926 target: "runtime::resource::state_update",
927 rx_dropped = true,
928 rx_dropped.op = "override",
929 )
930 });
931 }
932 }
933
934 /// Checks if this receiver is terminated.
935 ///
936 /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
937 /// If so, this receiver should no longer be polled.
938 ///
939 /// # Examples
940 ///
941 /// Sending a value and polling it.
942 ///
943 /// ```
944 /// use tokio::sync::oneshot;
945 ///
946 /// use std::task::Poll;
947 ///
948 /// # #[tokio::main(flavor = "current_thread")]
949 /// # async fn main() {
950 /// let (tx, mut rx) = oneshot::channel();
951 ///
952 /// // A receiver is not terminated when it is initialized.
953 /// assert!(!rx.is_terminated());
954 ///
955 /// // A receiver is not terminated it is polled and is still pending.
956 /// let poll = futures::poll!(&mut rx);
957 /// assert_eq!(poll, Poll::Pending);
958 /// assert!(!rx.is_terminated());
959 ///
960 /// // A receiver is not terminated if a value has been sent, but not yet read.
961 /// tx.send(0).unwrap();
962 /// assert!(!rx.is_terminated());
963 ///
964 /// // A receiver *is* terminated after it has been polled and yielded a value.
965 /// assert_eq!((&mut rx).await, Ok(0));
966 /// assert!(rx.is_terminated());
967 /// # }
968 /// ```
969 ///
970 /// Dropping the sender.
971 ///
972 /// ```
973 /// use tokio::sync::oneshot;
974 ///
975 /// # #[tokio::main(flavor = "current_thread")]
976 /// # async fn main() {
977 /// let (tx, mut rx) = oneshot::channel::<()>();
978 ///
979 /// // A receiver is not immediately terminated when the sender is dropped.
980 /// drop(tx);
981 /// assert!(!rx.is_terminated());
982 ///
983 /// // A receiver *is* terminated after it has been polled and yielded an error.
984 /// let _ = (&mut rx).await.unwrap_err();
985 /// assert!(rx.is_terminated());
986 /// # }
987 /// ```
988 pub fn is_terminated(&self) -> bool {
989 self.inner.is_none()
990 }
991
992 /// Checks if a channel is empty.
993 ///
994 /// This method returns `true` if the channel has no messages.
995 ///
996 /// It is not necessarily safe to poll an empty receiver, which may have
997 /// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
998 /// to check whether or not a receiver can be safely polled, instead.
999 ///
1000 /// # Examples
1001 ///
1002 /// Sending a value.
1003 ///
1004 /// ```
1005 /// use tokio::sync::oneshot;
1006 ///
1007 /// # #[tokio::main(flavor = "current_thread")]
1008 /// # async fn main() {
1009 /// let (tx, mut rx) = oneshot::channel();
1010 /// assert!(rx.is_empty());
1011 ///
1012 /// tx.send(0).unwrap();
1013 /// assert!(!rx.is_empty());
1014 ///
1015 /// let _ = (&mut rx).await;
1016 /// assert!(rx.is_empty());
1017 /// # }
1018 /// ```
1019 ///
1020 /// Dropping the sender.
1021 ///
1022 /// ```
1023 /// use tokio::sync::oneshot;
1024 ///
1025 /// # #[tokio::main(flavor = "current_thread")]
1026 /// # async fn main() {
1027 /// let (tx, mut rx) = oneshot::channel::<()>();
1028 ///
1029 /// // A channel is empty if the sender is dropped.
1030 /// drop(tx);
1031 /// assert!(rx.is_empty());
1032 ///
1033 /// // A closed channel still yields an error, however.
1034 /// (&mut rx).await.expect_err("should yield an error");
1035 /// assert!(rx.is_empty());
1036 /// # }
1037 /// ```
1038 ///
1039 /// Terminated channels are empty.
1040 ///
1041 /// ```should_panic,ignore-wasm
1042 /// use tokio::sync::oneshot;
1043 ///
1044 /// #[tokio::main]
1045 /// async fn main() {
1046 /// let (tx, mut rx) = oneshot::channel();
1047 /// tx.send(0).unwrap();
1048 /// let _ = (&mut rx).await;
1049 ///
1050 /// // NB: an empty channel is not necessarily safe to poll!
1051 /// assert!(rx.is_empty());
1052 /// let _ = (&mut rx).await;
1053 /// }
1054 /// ```
1055 pub fn is_empty(&self) -> bool {
1056 let Some(inner) = self.inner.as_ref() else {
1057 // The channel has already terminated.
1058 return true;
1059 };
1060
1061 let state = State::load(&inner.state, Acquire);
1062 if state.is_complete() {
1063 // SAFETY: If `state.is_complete()` returns true, then the
1064 // `VALUE_SENT` bit has been set and the sender side of the
1065 // channel will no longer attempt to access the inner
1066 // `UnsafeCell`. Therefore, it is now safe for us to access the
1067 // cell.
1068 //
1069 // The channel is empty if it does not have a value.
1070 unsafe { !inner.has_value() }
1071 } else {
1072 // The receiver closed the channel or no value has been sent yet.
1073 true
1074 }
1075 }
1076
1077 /// Attempts to receive a value.
1078 ///
1079 /// If a pending value exists in the channel, it is returned. If no value
1080 /// has been sent, the current task **will not** be registered for
1081 /// future notification.
1082 ///
1083 /// This function is useful to call from outside the context of an
1084 /// asynchronous task.
1085 ///
1086 /// Note that unlike the `poll` method, the `try_recv` method cannot fail
1087 /// spuriously. Any send or close event that happens before this call to
1088 /// `try_recv` will be correctly returned to the caller.
1089 ///
1090 /// # Return
1091 ///
1092 /// - `Ok(T)` if a value is pending in the channel.
1093 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
1094 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
1095 /// a value, or if the message has already been received.
1096 ///
1097 /// # Examples
1098 ///
1099 /// `try_recv` before a value is sent, then after.
1100 ///
1101 /// ```
1102 /// use tokio::sync::oneshot;
1103 /// use tokio::sync::oneshot::error::TryRecvError;
1104 ///
1105 /// # #[tokio::main(flavor = "current_thread")]
1106 /// # async fn main() {
1107 /// let (tx, mut rx) = oneshot::channel();
1108 ///
1109 /// match rx.try_recv() {
1110 /// // The channel is currently empty
1111 /// Err(TryRecvError::Empty) => {}
1112 /// _ => unreachable!(),
1113 /// }
1114 ///
1115 /// // Send a value
1116 /// tx.send("hello").unwrap();
1117 ///
1118 /// match rx.try_recv() {
1119 /// Ok(value) => assert_eq!(value, "hello"),
1120 /// _ => unreachable!(),
1121 /// }
1122 /// # }
1123 /// ```
1124 ///
1125 /// `try_recv` when the sender dropped before sending a value
1126 ///
1127 /// ```
1128 /// use tokio::sync::oneshot;
1129 /// use tokio::sync::oneshot::error::TryRecvError;
1130 ///
1131 /// # #[tokio::main(flavor = "current_thread")]
1132 /// # async fn main() {
1133 /// let (tx, mut rx) = oneshot::channel::<()>();
1134 ///
1135 /// drop(tx);
1136 ///
1137 /// match rx.try_recv() {
1138 /// // The channel will never receive a value.
1139 /// Err(TryRecvError::Closed) => {}
1140 /// _ => unreachable!(),
1141 /// }
1142 /// # }
1143 /// ```
1144 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1145 let result = if let Some(inner) = self.inner.as_ref() {
1146 let state = State::load(&inner.state, Acquire);
1147
1148 if state.is_complete() {
1149 // SAFETY: If `state.is_complete()` returns true, then the
1150 // `VALUE_SENT` bit has been set and the sender side of the
1151 // channel will no longer attempt to access the inner
1152 // `UnsafeCell`. Therefore, it is now safe for us to access the
1153 // cell.
1154 match unsafe { inner.consume_value() } {
1155 Some(value) => {
1156 #[cfg(all(tokio_unstable, feature = "tracing"))]
1157 self.resource_span.in_scope(|| {
1158 tracing::trace!(
1159 target: "runtime::resource::state_update",
1160 value_received = true,
1161 value_received.op = "override",
1162 )
1163 });
1164 Ok(value)
1165 }
1166 None => Err(TryRecvError::Closed),
1167 }
1168 } else if state.is_closed() {
1169 Err(TryRecvError::Closed)
1170 } else {
1171 // Not ready, this does not clear `inner`
1172 return Err(TryRecvError::Empty);
1173 }
1174 } else {
1175 Err(TryRecvError::Closed)
1176 };
1177
1178 self.inner = None;
1179 result
1180 }
1181
1182 /// Blocking receive to call outside of asynchronous contexts.
1183 ///
1184 /// # Panics
1185 ///
1186 /// This function panics if called within an asynchronous execution
1187 /// context.
1188 ///
1189 /// # Examples
1190 ///
1191 /// ```
1192 /// # #[cfg(not(target_family = "wasm"))]
1193 /// # {
1194 /// use std::thread;
1195 /// use tokio::sync::oneshot;
1196 ///
1197 /// #[tokio::main]
1198 /// async fn main() {
1199 /// let (tx, rx) = oneshot::channel::<u8>();
1200 ///
1201 /// let sync_code = thread::spawn(move || {
1202 /// assert_eq!(Ok(10), rx.blocking_recv());
1203 /// });
1204 ///
1205 /// let _ = tx.send(10);
1206 /// sync_code.join().unwrap();
1207 /// }
1208 /// # }
1209 /// ```
1210 #[track_caller]
1211 #[cfg(feature = "sync")]
1212 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
1213 pub fn blocking_recv(self) -> Result<T, RecvError> {
1214 crate::future::block_on(self)
1215 }
1216}
1217
1218impl<T> Drop for Receiver<T> {
1219 fn drop(&mut self) {
1220 if let Some(inner) = self.inner.as_ref() {
1221 let state = inner.close();
1222
1223 if state.is_complete() {
1224 // SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
1225 // so only the receiver can access the value.
1226 drop(unsafe { inner.consume_value() });
1227 }
1228
1229 #[cfg(all(tokio_unstable, feature = "tracing"))]
1230 self.resource_span.in_scope(|| {
1231 tracing::trace!(
1232 target: "runtime::resource::state_update",
1233 rx_dropped = true,
1234 rx_dropped.op = "override",
1235 )
1236 });
1237 }
1238 }
1239}
1240
1241impl<T> Future for Receiver<T> {
1242 type Output = Result<T, RecvError>;
1243
1244 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1245 // If `inner` is `None`, then `poll()` has already completed.
1246 #[cfg(all(tokio_unstable, feature = "tracing"))]
1247 let _res_span = self.resource_span.clone().entered();
1248 #[cfg(all(tokio_unstable, feature = "tracing"))]
1249 let _ao_span = self.async_op_span.clone().entered();
1250 #[cfg(all(tokio_unstable, feature = "tracing"))]
1251 let _ao_poll_span = self.async_op_poll_span.clone().entered();
1252
1253 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1254 #[cfg(all(tokio_unstable, feature = "tracing"))]
1255 let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
1256
1257 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1258 let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
1259
1260 res
1261 } else {
1262 panic!("called after complete");
1263 };
1264
1265 self.inner = None;
1266 Ready(ret)
1267 }
1268}
1269
1270impl<T> Inner<T> {
1271 fn complete(&self) -> bool {
1272 let prev = State::set_complete(&self.state);
1273
1274 if prev.is_closed() {
1275 return false;
1276 }
1277
1278 if prev.is_rx_task_set() {
1279 // TODO: Consume waker?
1280 unsafe {
1281 self.rx_task.with_task(Waker::wake_by_ref);
1282 }
1283 }
1284
1285 true
1286 }
1287
1288 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1289 ready!(crate::trace::trace_leaf(cx));
1290 // Keep track of task budget
1291 let coop = ready!(crate::task::coop::poll_proceed(cx));
1292
1293 // Load the state
1294 let mut state = State::load(&self.state, Acquire);
1295
1296 if state.is_complete() {
1297 coop.made_progress();
1298 match unsafe { self.consume_value() } {
1299 Some(value) => Ready(Ok(value)),
1300 None => Ready(Err(RecvError(()))),
1301 }
1302 } else if state.is_closed() {
1303 coop.made_progress();
1304 Ready(Err(RecvError(())))
1305 } else {
1306 if state.is_rx_task_set() {
1307 let will_notify = unsafe { self.rx_task.will_wake(cx) };
1308
1309 // Check if the task is still the same
1310 if !will_notify {
1311 // Unset the task
1312 state = State::unset_rx_task(&self.state);
1313 if state.is_complete() {
1314 // Set the flag again so that the waker is released in drop
1315 State::set_rx_task(&self.state);
1316
1317 coop.made_progress();
1318 // SAFETY: If `state.is_complete()` returns true, then the
1319 // `VALUE_SENT` bit has been set and the sender side of the
1320 // channel will no longer attempt to access the inner
1321 // `UnsafeCell`. Therefore, it is now safe for us to access the
1322 // cell.
1323 return match unsafe { self.consume_value() } {
1324 Some(value) => Ready(Ok(value)),
1325 None => Ready(Err(RecvError(()))),
1326 };
1327 } else {
1328 unsafe { self.rx_task.drop_task() };
1329 }
1330 }
1331 }
1332
1333 if !state.is_rx_task_set() {
1334 // Attempt to set the task
1335 unsafe {
1336 self.rx_task.set_task(cx);
1337 }
1338
1339 // Update the state
1340 state = State::set_rx_task(&self.state);
1341
1342 if state.is_complete() {
1343 coop.made_progress();
1344 match unsafe { self.consume_value() } {
1345 Some(value) => Ready(Ok(value)),
1346 None => Ready(Err(RecvError(()))),
1347 }
1348 } else {
1349 Pending
1350 }
1351 } else {
1352 Pending
1353 }
1354 }
1355 }
1356
1357 /// Called by `Receiver` to indicate that the value will never be received.
1358 fn close(&self) -> State {
1359 let prev = State::set_closed(&self.state);
1360
1361 if prev.is_tx_task_set() && !prev.is_complete() {
1362 unsafe {
1363 self.tx_task.with_task(Waker::wake_by_ref);
1364 }
1365 }
1366
1367 prev
1368 }
1369
1370 /// Consumes the value. This function does not check `state`.
1371 ///
1372 /// # Safety
1373 ///
1374 /// Calling this method concurrently on multiple threads will result in a
1375 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1376 /// sender *or* the receiver will call this method at a given point in time.
1377 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1378 /// if it is set, then only the receiver may call this method.
1379 unsafe fn consume_value(&self) -> Option<T> {
1380 self.value.with_mut(|ptr| (*ptr).take())
1381 }
1382
1383 /// Returns true if there is a value. This function does not check `state`.
1384 ///
1385 /// # Safety
1386 ///
1387 /// Calling this method concurrently on multiple threads will result in a
1388 /// data race. The `VALUE_SENT` state bit is used to ensure that only the
1389 /// sender *or* the receiver will call this method at a given point in time.
1390 /// If `VALUE_SENT` is not set, then only the sender may call this method;
1391 /// if it is set, then only the receiver may call this method.
1392 unsafe fn has_value(&self) -> bool {
1393 self.value.with(|ptr| (*ptr).is_some())
1394 }
1395}
1396
1397unsafe impl<T: Send> Send for Inner<T> {}
1398unsafe impl<T: Send> Sync for Inner<T> {}
1399
1400fn mut_load(this: &mut AtomicUsize) -> usize {
1401 this.with_mut(|v| *v)
1402}
1403
1404impl<T> Drop for Inner<T> {
1405 fn drop(&mut self) {
1406 let state = State(mut_load(&mut self.state));
1407
1408 if state.is_rx_task_set() {
1409 unsafe {
1410 self.rx_task.drop_task();
1411 }
1412 }
1413
1414 if state.is_tx_task_set() {
1415 unsafe {
1416 self.tx_task.drop_task();
1417 }
1418 }
1419
1420 // SAFETY: we have `&mut self`, and therefore we have
1421 // exclusive access to the value.
1422 unsafe {
1423 // Note: the assertion holds because if the value has been sent by sender,
1424 // we must ensure that the value must have been consumed by the receiver before
1425 // dropping the `Inner`.
1426 debug_assert!(self.consume_value().is_none());
1427 }
1428 }
1429}
1430
1431impl<T: fmt::Debug> fmt::Debug for Inner<T> {
1432 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1433 use std::sync::atomic::Ordering::Relaxed;
1434
1435 fmt.debug_struct("Inner")
1436 .field("state", &State::load(&self.state, Relaxed))
1437 .finish()
1438 }
1439}
1440
1441/// Indicates that a waker for the receiving task has been set.
1442///
1443/// # Safety
1444///
1445/// If this bit is not set, the `rx_task` field may be uninitialized.
1446const RX_TASK_SET: usize = 0b00001;
1447/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
1448///
1449/// # Safety
1450///
1451/// This bit controls which side of the channel is permitted to access the
1452/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
1453/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
1454/// the sender.
1455const VALUE_SENT: usize = 0b00010;
1456const CLOSED: usize = 0b00100;
1457
1458/// Indicates that a waker for the sending task has been set.
1459///
1460/// # Safety
1461///
1462/// If this bit is not set, the `tx_task` field may be uninitialized.
1463const TX_TASK_SET: usize = 0b01000;
1464
1465impl State {
1466 fn new() -> State {
1467 State(0)
1468 }
1469
1470 fn is_complete(self) -> bool {
1471 self.0 & VALUE_SENT == VALUE_SENT
1472 }
1473
1474 fn set_complete(cell: &AtomicUsize) -> State {
1475 // This method is a compare-and-swap loop rather than a fetch-or like
1476 // other `set_$WHATEVER` methods on `State`. This is because we must
1477 // check if the state has been closed before setting the `VALUE_SENT`
1478 // bit.
1479 //
1480 // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
1481 // bit is already set, because `VALUE_SENT` will tell the receiver that
1482 // it's okay to access the inner `UnsafeCell`. Immediately after calling
1483 // `set_complete`, if the channel was closed, the sender will _also_
1484 // access the `UnsafeCell` to take the value back out, so if a
1485 // `poll_recv` or `try_recv` call is occurring concurrently, both
1486 // threads may try to access the `UnsafeCell` if we were to set the
1487 // `VALUE_SENT` bit on a closed channel.
1488 let mut state = cell.load(Ordering::Relaxed);
1489 loop {
1490 if State(state).is_closed() {
1491 break;
1492 }
1493 // TODO: This could be `Release`, followed by an `Acquire` fence *if*
1494 // the `RX_TASK_SET` flag is set. However, `loom` does not support
1495 // fences yet.
1496 match cell.compare_exchange_weak(
1497 state,
1498 state | VALUE_SENT,
1499 Ordering::AcqRel,
1500 Ordering::Acquire,
1501 ) {
1502 Ok(_) => break,
1503 Err(actual) => state = actual,
1504 }
1505 }
1506 State(state)
1507 }
1508
1509 fn is_rx_task_set(self) -> bool {
1510 self.0 & RX_TASK_SET == RX_TASK_SET
1511 }
1512
1513 fn set_rx_task(cell: &AtomicUsize) -> State {
1514 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
1515 State(val | RX_TASK_SET)
1516 }
1517
1518 fn unset_rx_task(cell: &AtomicUsize) -> State {
1519 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
1520 State(val & !RX_TASK_SET)
1521 }
1522
1523 fn is_closed(self) -> bool {
1524 self.0 & CLOSED == CLOSED
1525 }
1526
1527 fn set_closed(cell: &AtomicUsize) -> State {
1528 // Acquire because we want all later writes (attempting to poll) to be
1529 // ordered after this.
1530 let val = cell.fetch_or(CLOSED, Acquire);
1531 State(val)
1532 }
1533
1534 fn set_tx_task(cell: &AtomicUsize) -> State {
1535 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
1536 State(val | TX_TASK_SET)
1537 }
1538
1539 fn unset_tx_task(cell: &AtomicUsize) -> State {
1540 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
1541 State(val & !TX_TASK_SET)
1542 }
1543
1544 fn is_tx_task_set(self) -> bool {
1545 self.0 & TX_TASK_SET == TX_TASK_SET
1546 }
1547
1548 fn as_usize(self) -> usize {
1549 self.0
1550 }
1551
1552 fn load(cell: &AtomicUsize, order: Ordering) -> State {
1553 let val = cell.load(order);
1554 State(val)
1555 }
1556}
1557
1558impl fmt::Debug for State {
1559 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1560 fmt.debug_struct("State")
1561 .field("is_complete", &self.is_complete())
1562 .field("is_closed", &self.is_closed())
1563 .field("is_rx_task_set", &self.is_rx_task_set())
1564 .field("is_tx_task_set", &self.is_tx_task_set())
1565 .finish()
1566 }
1567}