Module watch

Source
Expand description

A multi-producer, multi-consumer channel that only retains the last sent value.

This channel is useful for watching for changes to a value from multiple points in the code base, for example, changes to configuration values.

§Usage

channel returns a Sender / Receiver pair. These are the producer and consumer halves of the channel. The channel is created with an initial value.

Each Receiver independently tracks the last value seen by its caller.

To access the current value stored in the channel and mark it as seen by a given Receiver, use Receiver::borrow_and_update().

To access the current value without marking it as seen, use Receiver::borrow(). (If the value has already been marked seen, Receiver::borrow() is equivalent to Receiver::borrow_and_update().)

For more information on when to use these methods, see here.

§Change notifications

The Receiver half provides an asynchronous changed method. This method is ready when a new, unseen value is sent via the Sender half.

  • Receiver::changed() returns:
    • Ok(()) on receiving a new value.
    • Err(RecvError) if the channel has been closed AND the current value is seen.
  • If the current value is unseen when calling changed, then changed will return immediately. If the current value is seen, then it will sleep until either a new message is sent via the Sender half, or the Sender is dropped.
  • On completion, the changed method marks the new value as seen.
  • At creation, the initial value is considered seen. In other words, Receiver::changed() will not return until a subsequent value is sent.
  • New Receiver instances can be created with Sender::subscribe(). The current value at the time the Receiver is created is considered seen.

§changed versus has_changed

The Receiver half provides two methods for checking for changes in the channel, has_changed and changed.

  • has_changed is a synchronous method that checks whether the current value is seen or not and returns a boolean. This method does not mark the value as seen.

  • changed is an asynchronous method that will return once an unseen value is in the channel. This method does mark the value as seen.

Note there are two behavioral differences on when these two methods return an error.

  • has_changed errors if and only if the channel is closed.
  • changed errors if the channel has been closed AND the current value is seen.

See the example below that shows how these methods have different fallibility.

§borrow_and_update versus borrow

If the receiver intends to await notifications from changed in a loop, Receiver::borrow_and_update() should be preferred over Receiver::borrow(). This avoids a potential race where a new value is sent between changed being ready and the value being read. (If Receiver::borrow() is used, the loop may run twice with the same value.)

If the receiver is only interested in the current value, and does not intend to wait for changes, then Receiver::borrow() can be used. It may be more convenient to use borrow since it’s an &self method—borrow_and_update requires &mut self.

§Examples

The following example prints hello! world! .

use tokio::sync::watch;
use tokio::time::{Duration, sleep};

let (tx, mut rx) = watch::channel("hello");

tokio::spawn(async move {
    // Use the equivalent of a "do-while" loop so the initial value is
    // processed before awaiting the `changed()` future.
    loop {
        println!("{}! ", *rx.borrow_and_update());
        if rx.changed().await.is_err() {
            break;
        }
    }
});

sleep(Duration::from_millis(100)).await;
tx.send("world")?;

Difference on fallibility of changed versus has_changed.

use tokio::sync::watch;

#[tokio::main(flavor = "current_thread")]
let (tx, mut rx) = watch::channel("hello");
tx.send("goodbye").unwrap();
drop(tx);

// `has_changed` does not mark the value as seen and errors
// since the channel is closed.
assert!(rx.has_changed().is_err());

// `changed` returns Ok since the value is not already marked as seen
// even if the channel is closed.
assert!(rx.changed().await.is_ok());

// The `changed` call above marks the value as seen.
// The next `changed` call now returns an error as the channel is closed
// AND the current value is seen.
assert!(rx.changed().await.is_err());

§Closing

Sender::is_closed and Sender::closed allow the producer to detect when all Receiver handles have been dropped. This indicates that there is no further interest in the values being produced and work can be stopped.

The value in the channel will not be dropped until all senders and all receivers have been dropped.

§Thread safety

Both Sender and Receiver are thread safe. They can be moved to other threads and can be used in a concurrent environment. Clones of Receiver handles may be moved to separate threads and also used concurrently.

Modules§

error
Watch error types.

Structs§

Receiver
Receives values from the associated Sender.
Ref
Returns a reference to the inner value.
Sender
Sends values to the associated Receiver.

Functions§

channel
Creates a new watch channel, returning the “send” and “receive” handles.