tokio\sync/
watch.rs

1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A multi-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns:
33//!     * `Ok(())` on receiving a new value.
34//!     * `Err(`[`RecvError`](error::RecvError)`)` if the
35//!       channel has been closed __AND__ the current value is *seen*.
36//! * If the current value is *unseen* when calling [`changed`], then
37//!   [`changed`] will return immediately. If the current value is *seen*, then
38//!   it will sleep until either a new message is sent via the [`Sender`] half,
39//!   or the [`Sender`] is dropped.
40//! * On completion, the [`changed`] method marks the new value as *seen*.
41//! * At creation, the initial value is considered *seen*. In other words,
42//!   [`Receiver::changed()`] will not return until a subsequent value is sent.
43//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
44//!   The current value at the time the [`Receiver`] is created is considered
45//!   *seen*.
46//!
47//! ## [`changed`] versus [`has_changed`]
48//!
49//! The [`Receiver`] half provides two methods for checking for changes
50//! in the channel, [`has_changed`] and [`changed`].
51//!
52//! * [`has_changed`] is a *synchronous* method that checks whether the current
53//!   value is seen or not and returns a boolean. This method does __not__ mark the
54//!   value as seen.
55//!
56//! * [`changed`] is an *asynchronous* method that will return once an unseen
57//!   value is in the channel. This method does mark the value as seen.
58//!
59//! Note there are two behavioral differences on when these two methods return
60//! an error.
61//!
62//! - [`has_changed`] errors if and only if the channel is closed.
63//! - [`changed`] errors if the channel has been closed __AND__
64//!   the current value is seen.
65//!
66//! See the example below that shows how these methods have different fallibility.
67//!
68//! ## [`borrow_and_update`] versus [`borrow`]
69//!
70//! If the receiver intends to await notifications from [`changed`] in a loop,
71//! [`Receiver::borrow_and_update()`] should be preferred over
72//! [`Receiver::borrow()`].  This avoids a potential race where a new value is
73//! sent between [`changed`] being ready and the value being read. (If
74//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
75//!
76//! If the receiver is only interested in the current value, and does not intend
77//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
78//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
79//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
80//! self`.
81//!
82//! # Examples
83//!
84//! The following example prints `hello! world! `.
85//!
86//! ```
87//! use tokio::sync::watch;
88//! use tokio::time::{Duration, sleep};
89//!
90//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
91//! let (tx, mut rx) = watch::channel("hello");
92//!
93//! tokio::spawn(async move {
94//!     // Use the equivalent of a "do-while" loop so the initial value is
95//!     // processed before awaiting the `changed()` future.
96//!     loop {
97//!         println!("{}! ", *rx.borrow_and_update());
98//!         if rx.changed().await.is_err() {
99//!             break;
100//!         }
101//!     }
102//! });
103//!
104//! sleep(Duration::from_millis(100)).await;
105//! tx.send("world")?;
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! Difference on fallibility of [`changed`] versus [`has_changed`].
111//! ```
112//! use tokio::sync::watch;
113//!
114//! #[tokio::main(flavor = "current_thread")]
115//! # async fn main() {
116//! let (tx, mut rx) = watch::channel("hello");
117//! tx.send("goodbye").unwrap();
118//! drop(tx);
119//!
120//! // `has_changed` does not mark the value as seen and errors
121//! // since the channel is closed.
122//! assert!(rx.has_changed().is_err());
123//!
124//! // `changed` returns Ok since the value is not already marked as seen
125//! // even if the channel is closed.
126//! assert!(rx.changed().await.is_ok());
127//!
128//! // The `changed` call above marks the value as seen.
129//! // The next `changed` call now returns an error as the channel is closed
130//! // AND the current value is seen.
131//! assert!(rx.changed().await.is_err());
132//! # }
133//! ```
134//!
135//! # Closing
136//!
137//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
138//! when all [`Receiver`] handles have been dropped. This indicates that there
139//! is no further interest in the values being produced and work can be stopped.
140//!
141//! The value in the channel will not be dropped until all senders and all
142//! receivers have been dropped.
143//!
144//! # Thread safety
145//!
146//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
147//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
148//! handles may be moved to separate threads and also used concurrently.
149//!
150//! [`Sender`]: crate::sync::watch::Sender
151//! [`Receiver`]: crate::sync::watch::Receiver
152//! [`changed`]: crate::sync::watch::Receiver::changed
153//! [`has_changed`]: crate::sync::watch::Receiver::has_changed
154//! [`borrow`]: crate::sync::watch::Receiver::borrow
155//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update
156//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
157//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
158//! [`Receiver::borrow_and_update()`]:
159//!     crate::sync::watch::Receiver::borrow_and_update
160//! [`channel`]: crate::sync::watch::channel
161//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
162//! [`Sender::closed`]: crate::sync::watch::Sender::closed
163//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
164
165use crate::sync::notify::Notify;
166use crate::task::coop::cooperative;
167
168use crate::loom::sync::atomic::AtomicUsize;
169use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
170use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
171use std::fmt;
172use std::mem;
173use std::ops;
174use std::panic;
175
176/// Receives values from the associated [`Sender`](struct@Sender).
177///
178/// Instances are created by the [`channel`](fn@channel) function.
179///
180/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
181/// wrapper.
182///
183/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
184#[derive(Debug)]
185pub struct Receiver<T> {
186    /// Pointer to the shared state
187    shared: Arc<Shared<T>>,
188
189    /// Last observed version
190    version: Version,
191}
192
193/// Sends values to the associated [`Receiver`](struct@Receiver).
194///
195/// Instances are created by the [`channel`](fn@channel) function.
196#[derive(Debug)]
197pub struct Sender<T> {
198    shared: Arc<Shared<T>>,
199}
200
201impl<T> Clone for Sender<T> {
202    fn clone(&self) -> Self {
203        self.shared.ref_count_tx.fetch_add(1, Relaxed);
204
205        Self {
206            shared: self.shared.clone(),
207        }
208    }
209}
210
211impl<T: Default> Default for Sender<T> {
212    fn default() -> Self {
213        Self::new(T::default())
214    }
215}
216
217/// Returns a reference to the inner value.
218///
219/// Outstanding borrows hold a read lock on the inner value. This means that
220/// long-lived borrows could cause the producer half to block. It is recommended
221/// to keep the borrow as short-lived as possible. Additionally, if you are
222/// running in an environment that allows `!Send` futures, you must ensure that
223/// the returned `Ref` type is never held alive across an `.await` point,
224/// otherwise, it can lead to a deadlock.
225///
226/// The priority policy of the lock is dependent on the underlying lock
227/// implementation, and this type does not guarantee that any particular policy
228/// will be used. In particular, a producer which is waiting to acquire the lock
229/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
230///
231/// <details><summary>Potential deadlock example</summary>
232///
233/// ```text
234/// // Task 1 (on thread A)    |  // Task 2 (on thread B)
235/// let _ref1 = rx.borrow();   |
236///                            |  // will block
237///                            |  let _ = tx.send(());
238/// // may deadlock            |
239/// let _ref2 = rx.borrow();   |
240/// ```
241/// </details>
242#[derive(Debug)]
243pub struct Ref<'a, T> {
244    inner: RwLockReadGuard<'a, T>,
245    has_changed: bool,
246}
247
248impl<'a, T> Ref<'a, T> {
249    /// Indicates if the borrowed value is considered as _changed_ since the last
250    /// time it has been marked as seen.
251    ///
252    /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
253    ///
254    /// When borrowed from the [`Sender`] this function will always return `false`.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use tokio::sync::watch;
260    ///
261    /// # #[tokio::main(flavor = "current_thread")]
262    /// # async fn main() {
263    /// let (tx, mut rx) = watch::channel("hello");
264    ///
265    /// tx.send("goodbye").unwrap();
266    /// // The sender does never consider the value as changed.
267    /// assert!(!tx.borrow().has_changed());
268    ///
269    /// // Drop the sender immediately, just for testing purposes.
270    /// drop(tx);
271    ///
272    /// // Even if the sender has already been dropped...
273    /// assert!(rx.has_changed().is_err());
274    /// // ...the modified value is still readable and detected as changed.
275    /// assert_eq!(*rx.borrow(), "goodbye");
276    /// assert!(rx.borrow().has_changed());
277    ///
278    /// // Read the changed value and mark it as seen.
279    /// {
280    ///     let received = rx.borrow_and_update();
281    ///     assert_eq!(*received, "goodbye");
282    ///     assert!(received.has_changed());
283    ///     // Release the read lock when leaving this scope.
284    /// }
285    ///
286    /// // Now the value has already been marked as seen and could
287    /// // never be modified again (after the sender has been dropped).
288    /// assert!(!rx.borrow().has_changed());
289    /// # }
290    /// ```
291    pub fn has_changed(&self) -> bool {
292        self.has_changed
293    }
294}
295
296struct Shared<T> {
297    /// The most recent value.
298    value: RwLock<T>,
299
300    /// The current version.
301    ///
302    /// The lowest bit represents a "closed" state. The rest of the bits
303    /// represent the current version.
304    state: AtomicState,
305
306    /// Tracks the number of `Receiver` instances.
307    ref_count_rx: AtomicUsize,
308
309    /// Tracks the number of `Sender` instances.
310    ref_count_tx: AtomicUsize,
311
312    /// Notifies waiting receivers that the value changed.
313    notify_rx: big_notify::BigNotify,
314
315    /// Notifies any task listening for `Receiver` dropped events.
316    notify_tx: Notify,
317}
318
319impl<T: fmt::Debug> fmt::Debug for Shared<T> {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        let state = self.state.load();
322        f.debug_struct("Shared")
323            .field("value", &self.value)
324            .field("version", &state.version())
325            .field("is_closed", &state.is_closed())
326            .field("ref_count_rx", &self.ref_count_rx)
327            .finish()
328    }
329}
330
331pub mod error {
332    //! Watch error types.
333
334    use std::error::Error;
335    use std::fmt;
336
337    /// Error produced when sending a value fails.
338    #[derive(PartialEq, Eq, Clone, Copy)]
339    pub struct SendError<T>(pub T);
340
341    // ===== impl SendError =====
342
343    impl<T> fmt::Debug for SendError<T> {
344        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345            f.debug_struct("SendError").finish_non_exhaustive()
346        }
347    }
348
349    impl<T> fmt::Display for SendError<T> {
350        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
351            write!(fmt, "channel closed")
352        }
353    }
354
355    impl<T> Error for SendError<T> {}
356
357    /// Error produced when receiving a change notification.
358    #[derive(Debug, Clone)]
359    pub struct RecvError(pub(super) ());
360
361    // ===== impl RecvError =====
362
363    impl fmt::Display for RecvError {
364        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
365            write!(fmt, "channel closed")
366        }
367    }
368
369    impl Error for RecvError {}
370}
371
372mod big_notify {
373    use super::Notify;
374    use crate::sync::notify::Notified;
375
376    // To avoid contention on the lock inside the `Notify`, we store multiple
377    // copies of it. Then, we use either circular access or randomness to spread
378    // out threads over different `Notify` objects.
379    //
380    // Some simple benchmarks show that randomness performs slightly better than
381    // circular access (probably due to contention on `next`), so we prefer to
382    // use randomness when Tokio is compiled with a random number generator.
383    //
384    // When the random number generator is not available, we fall back to
385    // circular access.
386
387    pub(super) struct BigNotify {
388        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
389        next: std::sync::atomic::AtomicUsize,
390        inner: [Notify; 8],
391    }
392
393    impl BigNotify {
394        pub(super) fn new() -> Self {
395            Self {
396                #[cfg(not(all(
397                    not(loom),
398                    feature = "sync",
399                    any(feature = "rt", feature = "macros")
400                )))]
401                next: std::sync::atomic::AtomicUsize::new(0),
402                inner: Default::default(),
403            }
404        }
405
406        pub(super) fn notify_waiters(&self) {
407            for notify in &self.inner {
408                notify.notify_waiters();
409            }
410        }
411
412        /// This function implements the case where randomness is not available.
413        #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
414        pub(super) fn notified(&self) -> Notified<'_> {
415            let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
416            self.inner[i].notified()
417        }
418
419        /// This function implements the case where randomness is available.
420        #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
421        pub(super) fn notified(&self) -> Notified<'_> {
422            let i = crate::runtime::context::thread_rng_n(8) as usize;
423            self.inner[i].notified()
424        }
425    }
426}
427
428use self::state::{AtomicState, Version};
429mod state {
430    use crate::loom::sync::atomic::AtomicUsize;
431    use crate::loom::sync::atomic::Ordering;
432
433    const CLOSED_BIT: usize = 1;
434
435    // Using 2 as the step size preserves the `CLOSED_BIT`.
436    const STEP_SIZE: usize = 2;
437
438    /// The version part of the state. The lowest bit is always zero.
439    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
440    pub(super) struct Version(usize);
441
442    /// Snapshot of the state. The first bit is used as the CLOSED bit.
443    /// The remaining bits are used as the version.
444    ///
445    /// The CLOSED bit tracks whether all senders have been dropped. Dropping all
446    /// receivers does not set it.
447    #[derive(Copy, Clone, Debug)]
448    pub(super) struct StateSnapshot(usize);
449
450    /// The state stored in an atomic integer.
451    ///
452    /// The `Sender` uses `Release` ordering for storing a new state
453    /// and the `Receiver`s use `Acquire` ordering for loading the
454    /// current state. This ensures that written values are seen by
455    /// the `Receiver`s for a proper handover.
456    #[derive(Debug)]
457    pub(super) struct AtomicState(AtomicUsize);
458
459    impl Version {
460        /// Decrements the version.
461        pub(super) fn decrement(&mut self) {
462            // Using a wrapping decrement here is required to ensure that the
463            // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
464            // which wraps on overflow.
465            self.0 = self.0.wrapping_sub(STEP_SIZE);
466        }
467
468        pub(super) const INITIAL: Self = Version(0);
469    }
470
471    impl StateSnapshot {
472        /// Extract the version from the state.
473        pub(super) fn version(self) -> Version {
474            Version(self.0 & !CLOSED_BIT)
475        }
476
477        /// Is the closed bit set?
478        pub(super) fn is_closed(self) -> bool {
479            (self.0 & CLOSED_BIT) == CLOSED_BIT
480        }
481    }
482
483    impl AtomicState {
484        /// Create a new `AtomicState` that is not closed and which has the
485        /// version set to `Version::INITIAL`.
486        pub(super) fn new() -> Self {
487            AtomicState(AtomicUsize::new(Version::INITIAL.0))
488        }
489
490        /// Load the current value of the state.
491        ///
492        /// Only used by the receiver and for debugging purposes.
493        ///
494        /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
495        /// of the shared value with the sender side (single writer). The state is always
496        /// updated after modifying and before releasing the (exclusive) lock on the
497        /// shared value.
498        pub(super) fn load(&self) -> StateSnapshot {
499            StateSnapshot(self.0.load(Ordering::Acquire))
500        }
501
502        /// Increment the version counter.
503        pub(super) fn increment_version_while_locked(&self) {
504            // Use `Release` ordering to ensure that the shared value
505            // has been written before updating the version. The shared
506            // value is still protected by an exclusive lock during this
507            // method.
508            self.0.fetch_add(STEP_SIZE, Ordering::Release);
509        }
510
511        /// Set the closed bit in the state.
512        pub(super) fn set_closed(&self) {
513            self.0.fetch_or(CLOSED_BIT, Ordering::Release);
514        }
515    }
516}
517
518/// Creates a new watch channel, returning the "send" and "receive" handles.
519///
520/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
521/// Only the last value sent is made available to the [`Receiver`] half. All
522/// intermediate values are dropped.
523///
524/// # Examples
525///
526/// The following example prints `hello! world! `.
527///
528/// ```
529/// use tokio::sync::watch;
530/// use tokio::time::{Duration, sleep};
531///
532/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
533/// let (tx, mut rx) = watch::channel("hello");
534///
535/// tokio::spawn(async move {
536///     // Use the equivalent of a "do-while" loop so the initial value is
537///     // processed before awaiting the `changed()` future.
538///     loop {
539///         println!("{}! ", *rx.borrow_and_update());
540///         if rx.changed().await.is_err() {
541///             break;
542///         }
543///     }
544/// });
545///
546/// sleep(Duration::from_millis(100)).await;
547/// tx.send("world")?;
548/// # Ok(())
549/// # }
550/// ```
551///
552/// [`Sender`]: struct@Sender
553/// [`Receiver`]: struct@Receiver
554pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
555    let shared = Arc::new(Shared {
556        value: RwLock::new(init),
557        state: AtomicState::new(),
558        ref_count_rx: AtomicUsize::new(1),
559        ref_count_tx: AtomicUsize::new(1),
560        notify_rx: big_notify::BigNotify::new(),
561        notify_tx: Notify::new(),
562    });
563
564    let tx = Sender {
565        shared: shared.clone(),
566    };
567
568    let rx = Receiver {
569        shared,
570        version: Version::INITIAL,
571    };
572
573    (tx, rx)
574}
575
576impl<T> Receiver<T> {
577    fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
578        // No synchronization necessary as this is only used as a counter and
579        // not memory access.
580        shared.ref_count_rx.fetch_add(1, Relaxed);
581
582        Self { shared, version }
583    }
584
585    /// Returns a reference to the most recently sent value.
586    ///
587    /// This method does not mark the returned value as seen, so future calls to
588    /// [`changed`] may return immediately even if you have already seen the
589    /// value with a call to `borrow`.
590    ///
591    /// Outstanding borrows hold a read lock on the inner value. This means that
592    /// long-lived borrows could cause the producer half to block. It is recommended
593    /// to keep the borrow as short-lived as possible. Additionally, if you are
594    /// running in an environment that allows `!Send` futures, you must ensure that
595    /// the returned `Ref` type is never held alive across an `.await` point,
596    /// otherwise, it can lead to a deadlock.
597    ///
598    /// The priority policy of the lock is dependent on the underlying lock
599    /// implementation, and this type does not guarantee that any particular policy
600    /// will be used. In particular, a producer which is waiting to acquire the lock
601    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
602    ///
603    /// <details><summary>Potential deadlock example</summary>
604    ///
605    /// ```text
606    /// // Task 1 (on thread A)    |  // Task 2 (on thread B)
607    /// let _ref1 = rx.borrow();   |
608    ///                            |  // will block
609    ///                            |  let _ = tx.send(());
610    /// // may deadlock            |
611    /// let _ref2 = rx.borrow();   |
612    /// ```
613    /// </details>
614    ///
615    /// For more information on when to use this method versus
616    /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
617    ///
618    /// [`changed`]: Receiver::changed
619    /// [`borrow_and_update`]: Receiver::borrow_and_update
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// use tokio::sync::watch;
625    ///
626    /// let (_, rx) = watch::channel("hello");
627    /// assert_eq!(*rx.borrow(), "hello");
628    /// ```
629    pub fn borrow(&self) -> Ref<'_, T> {
630        let inner = self.shared.value.read();
631
632        // After obtaining a read-lock no concurrent writes could occur
633        // and the loaded version matches that of the borrowed reference.
634        let new_version = self.shared.state.load().version();
635        let has_changed = self.version != new_version;
636
637        Ref { inner, has_changed }
638    }
639
640    /// Returns a reference to the most recently sent value and marks that value
641    /// as seen.
642    ///
643    /// This method marks the current value as seen. Subsequent calls to [`changed`]
644    /// will not return immediately until the [`Sender`] has modified the shared
645    /// value again.
646    ///
647    /// Outstanding borrows hold a read lock on the inner value. This means that
648    /// long-lived borrows could cause the producer half to block. It is recommended
649    /// to keep the borrow as short-lived as possible. Additionally, if you are
650    /// running in an environment that allows `!Send` futures, you must ensure that
651    /// the returned `Ref` type is never held alive across an `.await` point,
652    /// otherwise, it can lead to a deadlock.
653    ///
654    /// The priority policy of the lock is dependent on the underlying lock
655    /// implementation, and this type does not guarantee that any particular policy
656    /// will be used. In particular, a producer which is waiting to acquire the lock
657    /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
658    ///
659    /// <details><summary>Potential deadlock example</summary>
660    ///
661    /// ```text
662    /// // Task 1 (on thread A)                |  // Task 2 (on thread B)
663    /// let _ref1 = rx1.borrow_and_update();   |
664    ///                                        |  // will block
665    ///                                        |  let _ = tx.send(());
666    /// // may deadlock                        |
667    /// let _ref2 = rx2.borrow_and_update();   |
668    /// ```
669    /// </details>
670    ///
671    /// For more information on when to use this method versus [`borrow`], see
672    /// [here](self#borrow_and_update-versus-borrow).
673    ///
674    /// [`changed`]: Receiver::changed
675    /// [`borrow`]: Receiver::borrow
676    pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
677        let inner = self.shared.value.read();
678
679        // After obtaining a read-lock no concurrent writes could occur
680        // and the loaded version matches that of the borrowed reference.
681        let new_version = self.shared.state.load().version();
682        let has_changed = self.version != new_version;
683
684        // Mark the shared value as seen by updating the version
685        self.version = new_version;
686
687        Ref { inner, has_changed }
688    }
689
690    /// Checks if this channel contains a message that this receiver has not yet
691    /// seen. The current value will not be marked as seen.
692    ///
693    /// Although this method is called `has_changed`, it does not check
694    /// messages for equality, so this call will return true even if the current
695    /// message is equal to the previous message.
696    ///
697    /// # Errors
698    ///
699    /// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed.
700    ///
701    /// # Examples
702    ///
703    /// ## Basic usage
704    ///
705    /// ```
706    /// use tokio::sync::watch;
707    ///
708    /// # #[tokio::main(flavor = "current_thread")]
709    /// # async fn main() {
710    /// let (tx, mut rx) = watch::channel("hello");
711    ///
712    /// tx.send("goodbye").unwrap();
713    ///
714    /// assert!(rx.has_changed().unwrap());
715    /// assert_eq!(*rx.borrow_and_update(), "goodbye");
716    ///
717    /// // The value has been marked as seen
718    /// assert!(!rx.has_changed().unwrap());
719    /// # }
720    /// ```
721    ///
722    /// ## Closed channel example
723    ///
724    /// ```
725    /// use tokio::sync::watch;
726    ///
727    /// # #[tokio::main(flavor = "current_thread")]
728    /// # async fn main() {
729    /// let (tx, rx) = watch::channel("hello");
730    /// tx.send("goodbye").unwrap();
731    ///
732    /// drop(tx);
733    ///
734    /// // The channel is closed
735    /// assert!(rx.has_changed().is_err());
736    /// # }
737    /// ```
738    pub fn has_changed(&self) -> Result<bool, error::RecvError> {
739        // Load the version from the state
740        let state = self.shared.state.load();
741        if state.is_closed() {
742            // All senders have dropped.
743            return Err(error::RecvError(()));
744        }
745        let new_version = state.version();
746
747        Ok(self.version != new_version)
748    }
749
750    /// Marks the state as changed.
751    ///
752    /// After invoking this method [`has_changed()`](Self::has_changed)
753    /// returns `true` and [`changed()`](Self::changed) returns
754    /// immediately, regardless of whether a new value has been sent.
755    ///
756    /// This is useful for triggering an initial change notification after
757    /// subscribing to synchronize new receivers.
758    pub fn mark_changed(&mut self) {
759        self.version.decrement();
760    }
761
762    /// Marks the state as unchanged.
763    ///
764    /// The current value will be considered seen by the receiver.
765    ///
766    /// This is useful if you are not interested in the current value
767    /// visible in the receiver.
768    pub fn mark_unchanged(&mut self) {
769        let current_version = self.shared.state.load().version();
770        self.version = current_version;
771    }
772
773    /// Waits for a change notification, then marks the current value as seen.
774    ///
775    /// If the current value in the channel has not yet been marked seen when
776    /// this method is called, the method marks that value seen and returns
777    /// immediately. If the newest value has already been marked seen, then the
778    /// method sleeps until a new message is sent by a [`Sender`] connected to
779    /// this `Receiver`, or until all [`Sender`]s are dropped.
780    ///
781    /// For more information, see
782    /// [*Change notifications*](self#change-notifications) in the module-level documentation.
783    ///
784    /// # Errors
785    ///
786    /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
787    /// the current value is seen.
788    ///
789    /// # Cancel safety
790    ///
791    /// This method is cancel safe. If you use it as the event in a
792    /// [`tokio::select!`](crate::select) statement and some other branch
793    /// completes first, then it is guaranteed that no values have been marked
794    /// seen by this call to `changed`.
795    ///
796    /// [`Sender`]: struct@Sender
797    ///
798    /// # Examples
799    ///
800    /// ```
801    /// use tokio::sync::watch;
802    ///
803    /// # #[tokio::main(flavor = "current_thread")]
804    /// # async fn main() {
805    /// let (tx, mut rx) = watch::channel("hello");
806    ///
807    /// tokio::spawn(async move {
808    ///     tx.send("goodbye").unwrap();
809    /// });
810    ///
811    /// assert!(rx.changed().await.is_ok());
812    /// assert_eq!(*rx.borrow_and_update(), "goodbye");
813    ///
814    /// // The `tx` handle has been dropped
815    /// assert!(rx.changed().await.is_err());
816    /// # }
817    /// ```
818    pub async fn changed(&mut self) -> Result<(), error::RecvError> {
819        cooperative(changed_impl(&self.shared, &mut self.version)).await
820    }
821
822    /// Waits for a value that satisfies the provided condition.
823    ///
824    /// This method will call the provided closure whenever something is sent on
825    /// the channel. Once the closure returns `true`, this method will return a
826    /// reference to the value that was passed to the closure.
827    ///
828    /// Before `wait_for` starts waiting for changes, it will call the closure
829    /// on the current value. If the closure returns `true` when given the
830    /// current value, then `wait_for` will immediately return a reference to
831    /// the current value. This is the case even if the current value is already
832    /// considered seen.
833    ///
834    /// The watch channel only keeps track of the most recent value, so if
835    /// several messages are sent faster than `wait_for` is able to call the
836    /// closure, then it may skip some updates. Whenever the closure is called,
837    /// it will be called with the most recent value.
838    ///
839    /// When this function returns, the value that was passed to the closure
840    /// when it returned `true` will be considered seen.
841    ///
842    /// If the channel is closed, then `wait_for` will return a [`RecvError`].
843    /// Once this happens, no more messages can ever be sent on the channel.
844    /// When an error is returned, it is guaranteed that the closure has been
845    /// called on the last value, and that it returned `false` for that value.
846    /// (If the closure returned `true`, then the last value would have been
847    /// returned instead of the error.)
848    ///
849    /// Like the [`borrow`] method, the returned borrow holds a read lock on the
850    /// inner value. This means that long-lived borrows could cause the producer
851    /// half to block. It is recommended to keep the borrow as short-lived as
852    /// possible. See the documentation of `borrow` for more information on
853    /// this.
854    ///
855    /// [`borrow`]: Receiver::borrow
856    /// [`RecvError`]: error::RecvError
857    ///
858    /// # Cancel safety
859    ///
860    /// This method is cancel safe. If you use it as the event in a
861    /// [`tokio::select!`](crate::select) statement and some other branch
862    /// completes first, then it is guaranteed that the last seen value `val`
863    /// (if any) satisfies `f(val) == false`.
864    ///
865    /// # Panics
866    ///
867    /// If and only if the closure `f` panics. In that case, no resource owned
868    /// or shared by this [`Receiver`] will be poisoned.
869    ///
870    /// # Examples
871    ///
872    /// ```
873    /// use tokio::sync::watch;
874    /// use tokio::time::{sleep, Duration};
875    ///
876    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
877    /// async fn main() {
878    ///     let (tx, mut rx) = watch::channel("hello");
879    ///
880    ///     tokio::spawn(async move {
881    ///         sleep(Duration::from_secs(1)).await;
882    ///         tx.send("goodbye").unwrap();
883    ///     });
884    ///
885    ///     assert!(rx.wait_for(|val| *val == "goodbye").await.is_ok());
886    ///     assert_eq!(*rx.borrow(), "goodbye");
887    /// }
888    /// ```
889    pub async fn wait_for(
890        &mut self,
891        f: impl FnMut(&T) -> bool,
892    ) -> Result<Ref<'_, T>, error::RecvError> {
893        cooperative(self.wait_for_inner(f)).await
894    }
895
896    async fn wait_for_inner(
897        &mut self,
898        mut f: impl FnMut(&T) -> bool,
899    ) -> Result<Ref<'_, T>, error::RecvError> {
900        let mut closed = false;
901        loop {
902            {
903                let inner = self.shared.value.read();
904
905                let new_version = self.shared.state.load().version();
906                let has_changed = self.version != new_version;
907                self.version = new_version;
908
909                if !closed || has_changed {
910                    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
911                    match result {
912                        Ok(true) => {
913                            return Ok(Ref { inner, has_changed });
914                        }
915                        Ok(false) => {
916                            // Skip the value.
917                        }
918                        Err(panicked) => {
919                            // Drop the read-lock to avoid poisoning it.
920                            drop(inner);
921                            // Forward the panic to the caller.
922                            panic::resume_unwind(panicked);
923                            // Unreachable
924                        }
925                    };
926                }
927            }
928
929            if closed {
930                return Err(error::RecvError(()));
931            }
932
933            // Wait for the value to change.
934            closed = changed_impl(&self.shared, &mut self.version).await.is_err();
935        }
936    }
937
938    /// Returns `true` if receivers belong to the same channel.
939    ///
940    /// # Examples
941    ///
942    /// ```
943    /// let (tx, rx) = tokio::sync::watch::channel(true);
944    /// let rx2 = rx.clone();
945    /// assert!(rx.same_channel(&rx2));
946    ///
947    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
948    /// assert!(!rx3.same_channel(&rx2));
949    /// ```
950    pub fn same_channel(&self, other: &Self) -> bool {
951        Arc::ptr_eq(&self.shared, &other.shared)
952    }
953
954    cfg_process_driver! {
955        pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
956            maybe_changed(&self.shared, &mut self.version)
957        }
958    }
959}
960
961fn maybe_changed<T>(
962    shared: &Shared<T>,
963    version: &mut Version,
964) -> Option<Result<(), error::RecvError>> {
965    // Load the version from the state
966    let state = shared.state.load();
967    let new_version = state.version();
968
969    if *version != new_version {
970        // Observe the new version and return
971        *version = new_version;
972        return Some(Ok(()));
973    }
974
975    if state.is_closed() {
976        // All senders have been dropped.
977        return Some(Err(error::RecvError(())));
978    }
979
980    None
981}
982
983async fn changed_impl<T>(
984    shared: &Shared<T>,
985    version: &mut Version,
986) -> Result<(), error::RecvError> {
987    crate::trace::async_trace_leaf().await;
988
989    loop {
990        // In order to avoid a race condition, we first request a notification,
991        // **then** check the current value's version. If a new version exists,
992        // the notification request is dropped.
993        let notified = shared.notify_rx.notified();
994
995        if let Some(ret) = maybe_changed(shared, version) {
996            return ret;
997        }
998
999        notified.await;
1000        // loop around again in case the wake-up was spurious
1001    }
1002}
1003
1004impl<T> Clone for Receiver<T> {
1005    fn clone(&self) -> Self {
1006        let version = self.version;
1007        let shared = self.shared.clone();
1008
1009        Self::from_shared(version, shared)
1010    }
1011}
1012
1013impl<T> Drop for Receiver<T> {
1014    fn drop(&mut self) {
1015        // No synchronization necessary as this is only used as a counter and
1016        // not memory access.
1017        if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
1018            // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
1019            self.shared.notify_tx.notify_waiters();
1020        }
1021    }
1022}
1023
1024impl<T> Sender<T> {
1025    /// Creates the sending-half of the [`watch`] channel.
1026    ///
1027    /// See documentation of [`watch::channel`] for errors when calling this function.
1028    /// Beware that attempting to send a value when there are no receivers will
1029    /// return an error.
1030    ///
1031    /// [`watch`]: crate::sync::watch
1032    /// [`watch::channel`]: crate::sync::watch
1033    ///
1034    /// # Examples
1035    /// ```
1036    /// let sender = tokio::sync::watch::Sender::new(0u8);
1037    /// assert!(sender.send(3).is_err());
1038    /// let _rec = sender.subscribe();
1039    /// assert!(sender.send(4).is_ok());
1040    /// ```
1041    pub fn new(init: T) -> Self {
1042        let (tx, _) = channel(init);
1043        tx
1044    }
1045
1046    /// Sends a new value via the channel, notifying all receivers.
1047    ///
1048    /// This method fails if the channel is closed, which is the case when
1049    /// every receiver has been dropped. It is possible to reopen the channel
1050    /// using the [`subscribe`] method. However, when `send` fails, the value
1051    /// isn't made available for future receivers (but returned with the
1052    /// [`SendError`]).
1053    ///
1054    /// To always make a new value available for future receivers, even if no
1055    /// receiver currently exists, one of the other send methods
1056    /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
1057    /// used instead.
1058    ///
1059    /// [`subscribe`]: Sender::subscribe
1060    /// [`SendError`]: error::SendError
1061    /// [`send_if_modified`]: Sender::send_if_modified
1062    /// [`send_modify`]: Sender::send_modify
1063    /// [`send_replace`]: Sender::send_replace
1064    pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
1065        // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
1066        if 0 == self.receiver_count() {
1067            return Err(error::SendError(value));
1068        }
1069
1070        self.send_replace(value);
1071        Ok(())
1072    }
1073
1074    /// Modifies the watched value **unconditionally** in-place,
1075    /// notifying all receivers.
1076    ///
1077    /// This can be useful for modifying the watched value, without
1078    /// having to allocate a new instance. Additionally, this
1079    /// method permits sending values even when there are no receivers.
1080    ///
1081    /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1082    /// if the value is only modified conditionally during the mutable borrow
1083    /// to prevent unneeded change notifications for unmodified values.
1084    ///
1085    /// # Panics
1086    ///
1087    /// This function panics when the invocation of the `modify` closure panics.
1088    /// No receivers are notified when panicking. All changes of the watched
1089    /// value applied by the closure before panicking will be visible in
1090    /// subsequent calls to `borrow`.
1091    ///
1092    /// # Examples
1093    ///
1094    /// ```
1095    /// use tokio::sync::watch;
1096    ///
1097    /// struct State {
1098    ///     counter: usize,
1099    /// }
1100    /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1101    /// state_tx.send_modify(|state| state.counter += 1);
1102    /// assert_eq!(state_rx.borrow().counter, 1);
1103    /// ```
1104    pub fn send_modify<F>(&self, modify: F)
1105    where
1106        F: FnOnce(&mut T),
1107    {
1108        self.send_if_modified(|value| {
1109            modify(value);
1110            true
1111        });
1112    }
1113
1114    /// Modifies the watched value **conditionally** in-place,
1115    /// notifying all receivers only if modified.
1116    ///
1117    /// This can be useful for modifying the watched value, without
1118    /// having to allocate a new instance. Additionally, this
1119    /// method permits sending values even when there are no receivers.
1120    ///
1121    /// The `modify` closure must return `true` if the value has actually
1122    /// been modified during the mutable borrow. It should only return `false`
1123    /// if the value is guaranteed to be unmodified despite the mutable
1124    /// borrow.
1125    ///
1126    /// Receivers are only notified if the closure returned `true`. If the
1127    /// closure has modified the value but returned `false` this results
1128    /// in a *silent modification*, i.e. the modified value will be visible
1129    /// in subsequent calls to `borrow`, but receivers will not receive
1130    /// a change notification.
1131    ///
1132    /// Returns the result of the closure, i.e. `true` if the value has
1133    /// been modified and `false` otherwise.
1134    ///
1135    /// # Panics
1136    ///
1137    /// This function panics when the invocation of the `modify` closure panics.
1138    /// No receivers are notified when panicking. All changes of the watched
1139    /// value applied by the closure before panicking will be visible in
1140    /// subsequent calls to `borrow`.
1141    ///
1142    /// # Examples
1143    ///
1144    /// ```
1145    /// use tokio::sync::watch;
1146    ///
1147    /// struct State {
1148    ///     counter: usize,
1149    /// }
1150    /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1151    /// let inc_counter_if_odd = |state: &mut State| {
1152    ///     if state.counter % 2 == 1 {
1153    ///         state.counter += 1;
1154    ///         return true;
1155    ///     }
1156    ///     false
1157    /// };
1158    ///
1159    /// assert_eq!(state_rx.borrow().counter, 1);
1160    ///
1161    /// assert!(!state_rx.has_changed().unwrap());
1162    /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1163    /// assert!(state_rx.has_changed().unwrap());
1164    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1165    ///
1166    /// assert!(!state_rx.has_changed().unwrap());
1167    /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1168    /// assert!(!state_rx.has_changed().unwrap());
1169    /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1170    /// ```
1171    pub fn send_if_modified<F>(&self, modify: F) -> bool
1172    where
1173        F: FnOnce(&mut T) -> bool,
1174    {
1175        {
1176            // Acquire the write lock and update the value.
1177            let mut lock = self.shared.value.write();
1178
1179            // Update the value and catch possible panic inside func.
1180            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1181            match result {
1182                Ok(modified) => {
1183                    if !modified {
1184                        // Abort, i.e. don't notify receivers if unmodified
1185                        return false;
1186                    }
1187                    // Continue if modified
1188                }
1189                Err(panicked) => {
1190                    // Drop the lock to avoid poisoning it.
1191                    drop(lock);
1192                    // Forward the panic to the caller.
1193                    panic::resume_unwind(panicked);
1194                    // Unreachable
1195                }
1196            };
1197
1198            self.shared.state.increment_version_while_locked();
1199
1200            // Release the write lock.
1201            //
1202            // Incrementing the version counter while holding the lock ensures
1203            // that receivers are able to figure out the version number of the
1204            // value they are currently looking at.
1205            drop(lock);
1206        }
1207
1208        self.shared.notify_rx.notify_waiters();
1209
1210        true
1211    }
1212
1213    /// Sends a new value via the channel, notifying all receivers and returning
1214    /// the previous value in the channel.
1215    ///
1216    /// This can be useful for reusing the buffers inside a watched value.
1217    /// Additionally, this method permits sending values even when there are no
1218    /// receivers.
1219    ///
1220    /// # Examples
1221    ///
1222    /// ```
1223    /// use tokio::sync::watch;
1224    ///
1225    /// let (tx, _rx) = watch::channel(1);
1226    /// assert_eq!(tx.send_replace(2), 1);
1227    /// assert_eq!(tx.send_replace(3), 2);
1228    /// ```
1229    pub fn send_replace(&self, mut value: T) -> T {
1230        // swap old watched value with the new one
1231        self.send_modify(|old| mem::swap(old, &mut value));
1232
1233        value
1234    }
1235
1236    /// Returns a reference to the most recently sent value
1237    ///
1238    /// Outstanding borrows hold a read lock on the inner value. This means that
1239    /// long-lived borrows could cause the producer half to block. It is recommended
1240    /// to keep the borrow as short-lived as possible. Additionally, if you are
1241    /// running in an environment that allows `!Send` futures, you must ensure that
1242    /// the returned `Ref` type is never held alive across an `.await` point,
1243    /// otherwise, it can lead to a deadlock.
1244    ///
1245    /// # Examples
1246    ///
1247    /// ```
1248    /// use tokio::sync::watch;
1249    ///
1250    /// let (tx, _) = watch::channel("hello");
1251    /// assert_eq!(*tx.borrow(), "hello");
1252    /// ```
1253    pub fn borrow(&self) -> Ref<'_, T> {
1254        let inner = self.shared.value.read();
1255
1256        // The sender/producer always sees the current version
1257        let has_changed = false;
1258
1259        Ref { inner, has_changed }
1260    }
1261
1262    /// Checks if the channel has been closed. This happens when all receivers
1263    /// have dropped.
1264    ///
1265    /// # Examples
1266    ///
1267    /// ```
1268    /// let (tx, rx) = tokio::sync::watch::channel(());
1269    /// assert!(!tx.is_closed());
1270    ///
1271    /// drop(rx);
1272    /// assert!(tx.is_closed());
1273    /// ```
1274    pub fn is_closed(&self) -> bool {
1275        self.receiver_count() == 0
1276    }
1277
1278    /// Completes when all receivers have dropped.
1279    ///
1280    /// This allows the producer to get notified when interest in the produced
1281    /// values is canceled and immediately stop doing work. Once a channel is
1282    /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1283    /// get a new receiver.
1284    ///
1285    /// If the channel becomes closed for a brief amount of time (e.g., the last
1286    /// receiver is dropped and then `subscribe` is called), then this call to
1287    /// `closed` might return, but it is also possible that it does not "notice"
1288    /// that the channel was closed for a brief amount of time.
1289    ///
1290    /// # Cancel safety
1291    ///
1292    /// This method is cancel safe.
1293    ///
1294    /// # Examples
1295    ///
1296    /// ```
1297    /// use tokio::sync::watch;
1298    ///
1299    /// # #[tokio::main(flavor = "current_thread")]
1300    /// # async fn main() {
1301    /// let (tx, rx) = watch::channel("hello");
1302    ///
1303    /// tokio::spawn(async move {
1304    ///     // use `rx`
1305    ///     drop(rx);
1306    /// });
1307    ///
1308    /// // Waits for `rx` to drop
1309    /// tx.closed().await;
1310    /// println!("the `rx` handles dropped")
1311    /// # }
1312    /// ```
1313    pub async fn closed(&self) {
1314        cooperative(async {
1315            crate::trace::async_trace_leaf().await;
1316
1317            while self.receiver_count() > 0 {
1318                let notified = self.shared.notify_tx.notified();
1319
1320                if self.receiver_count() == 0 {
1321                    return;
1322                }
1323
1324                notified.await;
1325                // The channel could have been reopened in the meantime by calling
1326                // `subscribe`, so we loop again.
1327            }
1328        })
1329        .await;
1330    }
1331
1332    /// Creates a new [`Receiver`] connected to this `Sender`.
1333    ///
1334    /// All messages sent before this call to `subscribe` are initially marked
1335    /// as seen by the new `Receiver`.
1336    ///
1337    /// This method can be called even if there are no other receivers. In this
1338    /// case, the channel is reopened.
1339    ///
1340    /// # Examples
1341    ///
1342    /// The new channel will receive messages sent on this `Sender`.
1343    ///
1344    /// ```
1345    /// use tokio::sync::watch;
1346    ///
1347    /// # #[tokio::main(flavor = "current_thread")]
1348    /// # async fn main() {
1349    /// let (tx, _rx) = watch::channel(0u64);
1350    ///
1351    /// tx.send(5).unwrap();
1352    ///
1353    /// let rx = tx.subscribe();
1354    /// assert_eq!(5, *rx.borrow());
1355    ///
1356    /// tx.send(10).unwrap();
1357    /// assert_eq!(10, *rx.borrow());
1358    /// # }
1359    /// ```
1360    ///
1361    /// The most recent message is considered seen by the channel, so this test
1362    /// is guaranteed to pass.
1363    ///
1364    /// ```
1365    /// use tokio::sync::watch;
1366    /// use tokio::time::Duration;
1367    ///
1368    /// # #[tokio::main(flavor = "current_thread")]
1369    /// # async fn main() {
1370    /// let (tx, _rx) = watch::channel(0u64);
1371    /// tx.send(5).unwrap();
1372    /// let mut rx = tx.subscribe();
1373    ///
1374    /// tokio::spawn(async move {
1375    ///     // by spawning and sleeping, the message is sent after `main`
1376    ///     // hits the call to `changed`.
1377    ///     # if false {
1378    ///     tokio::time::sleep(Duration::from_millis(10)).await;
1379    ///     # }
1380    ///     tx.send(100).unwrap();
1381    /// });
1382    ///
1383    /// rx.changed().await.unwrap();
1384    /// assert_eq!(100, *rx.borrow());
1385    /// # }
1386    /// ```
1387    pub fn subscribe(&self) -> Receiver<T> {
1388        let shared = self.shared.clone();
1389        let version = shared.state.load().version();
1390
1391        // The CLOSED bit in the state tracks only whether the sender is
1392        // dropped, so we do not need to unset it if this reopens the channel.
1393        Receiver::from_shared(version, shared)
1394    }
1395
1396    /// Returns the number of receivers that currently exist.
1397    ///
1398    /// # Examples
1399    ///
1400    /// ```
1401    /// use tokio::sync::watch;
1402    ///
1403    /// # #[tokio::main(flavor = "current_thread")]
1404    /// # async fn main() {
1405    /// let (tx, rx1) = watch::channel("hello");
1406    ///
1407    /// assert_eq!(1, tx.receiver_count());
1408    ///
1409    /// let mut _rx2 = rx1.clone();
1410    ///
1411    /// assert_eq!(2, tx.receiver_count());
1412    /// # }
1413    /// ```
1414    pub fn receiver_count(&self) -> usize {
1415        self.shared.ref_count_rx.load(Relaxed)
1416    }
1417
1418    /// Returns the number of senders that currently exist.
1419    ///
1420    /// # Examples
1421    ///
1422    /// ```
1423    /// use tokio::sync::watch;
1424    ///
1425    /// # #[tokio::main(flavor = "current_thread")]
1426    /// # async fn main() {
1427    /// let (tx1, rx) = watch::channel("hello");
1428    ///
1429    /// assert_eq!(1, tx1.sender_count());
1430    ///
1431    /// let tx2 = tx1.clone();
1432    ///
1433    /// assert_eq!(2, tx1.sender_count());
1434    /// assert_eq!(2, tx2.sender_count());
1435    /// # }
1436    /// ```
1437    pub fn sender_count(&self) -> usize {
1438        self.shared.ref_count_tx.load(Relaxed)
1439    }
1440
1441    /// Returns `true` if senders belong to the same channel.
1442    ///
1443    /// # Examples
1444    ///
1445    /// ```
1446    /// let (tx, rx) = tokio::sync::watch::channel(true);
1447    /// let tx2 = tx.clone();
1448    /// assert!(tx.same_channel(&tx2));
1449    ///
1450    /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1451    /// assert!(!tx3.same_channel(&tx2));
1452    /// ```
1453    pub fn same_channel(&self, other: &Self) -> bool {
1454        Arc::ptr_eq(&self.shared, &other.shared)
1455    }
1456}
1457
1458impl<T> Drop for Sender<T> {
1459    fn drop(&mut self) {
1460        if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1461            self.shared.state.set_closed();
1462            self.shared.notify_rx.notify_waiters();
1463        }
1464    }
1465}
1466
1467// ===== impl Ref =====
1468
1469impl<T> ops::Deref for Ref<'_, T> {
1470    type Target = T;
1471
1472    fn deref(&self) -> &T {
1473        self.inner.deref()
1474    }
1475}
1476
1477#[cfg(all(test, loom))]
1478mod tests {
1479    use futures::future::FutureExt;
1480    use loom::thread;
1481
1482    // test for https://github.com/tokio-rs/tokio/issues/3168
1483    #[test]
1484    fn watch_spurious_wakeup() {
1485        loom::model(|| {
1486            let (send, mut recv) = crate::sync::watch::channel(0i32);
1487
1488            send.send(1).unwrap();
1489
1490            let send_thread = thread::spawn(move || {
1491                send.send(2).unwrap();
1492                send
1493            });
1494
1495            recv.changed().now_or_never();
1496
1497            let send = send_thread.join().unwrap();
1498            let recv_thread = thread::spawn(move || {
1499                recv.changed().now_or_never();
1500                recv.changed().now_or_never();
1501                recv
1502            });
1503
1504            send.send(3).unwrap();
1505
1506            let mut recv = recv_thread.join().unwrap();
1507            let send_thread = thread::spawn(move || {
1508                send.send(2).unwrap();
1509            });
1510
1511            recv.changed().now_or_never();
1512
1513            send_thread.join().unwrap();
1514        });
1515    }
1516
1517    #[test]
1518    fn watch_borrow() {
1519        loom::model(|| {
1520            let (send, mut recv) = crate::sync::watch::channel(0i32);
1521
1522            assert!(send.borrow().eq(&0));
1523            assert!(recv.borrow().eq(&0));
1524
1525            send.send(1).unwrap();
1526            assert!(send.borrow().eq(&1));
1527
1528            let send_thread = thread::spawn(move || {
1529                send.send(2).unwrap();
1530                send
1531            });
1532
1533            recv.changed().now_or_never();
1534
1535            let send = send_thread.join().unwrap();
1536            let recv_thread = thread::spawn(move || {
1537                recv.changed().now_or_never();
1538                recv.changed().now_or_never();
1539                recv
1540            });
1541
1542            send.send(3).unwrap();
1543
1544            let recv = recv_thread.join().unwrap();
1545            assert!(recv.borrow().eq(&3));
1546            assert!(send.borrow().eq(&3));
1547
1548            send.send(2).unwrap();
1549
1550            thread::spawn(move || {
1551                assert!(recv.borrow().eq(&2));
1552            });
1553            assert!(send.borrow().eq(&2));
1554        });
1555    }
1556}