tokio/sync/
set_once.rs

1use super::Notify;
2
3use crate::loom::cell::UnsafeCell;
4use crate::loom::sync::atomic::AtomicBool;
5
6use std::error::Error;
7use std::fmt;
8use std::future::{poll_fn, Future};
9use std::mem::MaybeUninit;
10use std::ops::Drop;
11use std::ptr;
12use std::sync::atomic::Ordering;
13use std::task::Poll;
14
15// This file contains an implementation of an SetOnce. The value of SetOnce
16// can only be modified once during initialization.
17//
18//  1. When `value_set` is false, the `value` is not initialized and wait()
19//      future will keep on waiting.
20//  2. When `value_set` is true, the wait() future completes, get() will return
21//      Some(&T)
22//
23// The value cannot be changed after set() is called. Subsequent calls to set()
24// will return a `SetOnceError`.
25
26/// A thread-safe cell that can be written to only once.
27///
28/// A `SetOnce` is inspired from python's [`asyncio.Event`] type. It can be
29/// used to wait until the value of the `SetOnce` is set like a "Event" mechanism.
30///
31/// # Example
32///
33/// ```
34/// use tokio::sync::{SetOnce, SetOnceError};
35///
36/// static ONCE: SetOnce<u32> = SetOnce::const_new();
37///
38/// # #[tokio::main(flavor = "current_thread")]
39/// # async fn main() -> Result<(), SetOnceError<u32>> {
40///
41/// // set the value inside a task somewhere...
42/// tokio::spawn(async move { ONCE.set(20) });
43///
44/// // checking with .get doesn't block main thread
45/// println!("{:?}", ONCE.get());
46///
47/// // wait until the value is set, blocks the thread
48/// println!("{:?}", ONCE.wait().await);
49///
50/// Ok(())
51/// # }
52/// ```
53///
54/// A `SetOnce` is typically used for global variables that need to be
55/// initialized once on first use, but need no further changes. The `SetOnce`
56/// in Tokio allows the initialization procedure to be asynchronous.
57///
58/// # Example
59///
60/// ```
61/// use tokio::sync::{SetOnce, SetOnceError};
62/// use std::sync::Arc;
63///
64/// # #[tokio::main(flavor = "current_thread")]
65/// # async fn main() -> Result<(), SetOnceError<u32>> {
66/// let once = SetOnce::new();
67///
68/// let arc = Arc::new(once);
69/// let first_cl = Arc::clone(&arc);
70/// let second_cl = Arc::clone(&arc);
71///
72/// // set the value inside a task
73/// tokio::spawn(async move { first_cl.set(20) }).await.unwrap()?;
74///
75/// // wait inside task to not block the main thread
76/// tokio::spawn(async move {
77///     // wait inside async context for the value to be set
78///     assert_eq!(*second_cl.wait().await, 20);
79/// }).await.unwrap();
80///
81/// // subsequent set calls will fail
82/// assert!(arc.set(30).is_err());
83///
84/// println!("{:?}", arc.get());
85///
86/// Ok(())
87/// # }
88/// ```
89///
90/// [`asyncio.Event`]: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event
91pub struct SetOnce<T> {
92    value_set: AtomicBool,
93    value: UnsafeCell<MaybeUninit<T>>,
94    notify: Notify,
95}
96
97impl<T> Default for SetOnce<T> {
98    fn default() -> SetOnce<T> {
99        SetOnce::new()
100    }
101}
102
103impl<T: fmt::Debug> fmt::Debug for SetOnce<T> {
104    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
105        fmt.debug_struct("SetOnce")
106            .field("value", &self.get())
107            .finish()
108    }
109}
110
111impl<T: Clone> Clone for SetOnce<T> {
112    fn clone(&self) -> SetOnce<T> {
113        SetOnce::new_with(self.get().cloned())
114    }
115}
116
117impl<T: PartialEq> PartialEq for SetOnce<T> {
118    fn eq(&self, other: &SetOnce<T>) -> bool {
119        self.get() == other.get()
120    }
121}
122
123impl<T: Eq> Eq for SetOnce<T> {}
124
125impl<T> Drop for SetOnce<T> {
126    fn drop(&mut self) {
127        // TODO: Use get_mut()
128        if self.value_set.load(Ordering::Relaxed) {
129            // SAFETY: If the value_set is true, then the value is initialized
130            // then there is a value to be dropped and this is safe
131            unsafe { self.value.with_mut(|ptr| ptr::drop_in_place(ptr as *mut T)) }
132        }
133    }
134}
135
136impl<T> From<T> for SetOnce<T> {
137    fn from(value: T) -> Self {
138        SetOnce {
139            value_set: AtomicBool::new(true),
140            value: UnsafeCell::new(MaybeUninit::new(value)),
141            notify: Notify::new(),
142        }
143    }
144}
145
146impl<T> SetOnce<T> {
147    /// Creates a new empty `SetOnce` instance.
148    pub fn new() -> Self {
149        Self {
150            value_set: AtomicBool::new(false),
151            value: UnsafeCell::new(MaybeUninit::uninit()),
152            notify: Notify::new(),
153        }
154    }
155
156    /// Creates a new empty `SetOnce` instance.
157    ///
158    /// Equivalent to `SetOnce::new`, except that it can be used in static
159    /// variables.
160    ///
161    /// When using the `tracing` [unstable feature], a `SetOnce` created with
162    /// `const_new` will not be instrumented. As such, it will not be visible
163    /// in [`tokio-console`]. Instead, [`SetOnce::new`] should be used to
164    /// create an instrumented object if that is needed.
165    ///
166    /// # Example
167    ///
168    /// ```
169    /// use tokio::sync::{SetOnce, SetOnceError};
170    ///
171    /// static ONCE: SetOnce<u32> = SetOnce::const_new();
172    ///
173    /// fn get_global_integer() -> Result<Option<&'static u32>, SetOnceError<u32>> {
174    ///     ONCE.set(2)?;
175    ///     Ok(ONCE.get())
176    /// }
177    ///
178    /// # #[tokio::main(flavor = "current_thread")]
179    /// # async fn main() -> Result<(), SetOnceError<u32>> {
180    /// let result = get_global_integer()?;
181    ///
182    /// assert_eq!(result, Some(&2));
183    /// Ok(())
184    /// # }
185    /// ```
186    ///
187    /// [`tokio-console`]: https://github.com/tokio-rs/console
188    /// [unstable feature]: crate#unstable-features
189    #[cfg(not(all(loom, test)))]
190    pub const fn const_new() -> Self {
191        Self {
192            value_set: AtomicBool::new(false),
193            value: UnsafeCell::new(MaybeUninit::uninit()),
194            notify: Notify::const_new(),
195        }
196    }
197
198    /// Creates a new `SetOnce` that contains the provided value, if any.
199    ///
200    /// If the `Option` is `None`, this is equivalent to `SetOnce::new`.
201    ///
202    /// [`SetOnce::new`]: crate::sync::SetOnce::new
203    pub fn new_with(value: Option<T>) -> Self {
204        if let Some(v) = value {
205            SetOnce::from(v)
206        } else {
207            SetOnce::new()
208        }
209    }
210
211    /// Creates a new `SetOnce` that contains the provided value.
212    ///
213    /// # Example
214    ///
215    /// When using the `tracing` [unstable feature], a `SetOnce` created with
216    /// `const_new_with` will not be instrumented. As such, it will not be
217    /// visible in [`tokio-console`]. Instead, [`SetOnce::new_with`] should be
218    /// used to create an instrumented object if that is needed.
219    ///
220    /// ```
221    /// use tokio::sync::SetOnce;
222    ///
223    /// static ONCE: SetOnce<u32> = SetOnce::const_new_with(1);
224    ///
225    /// fn get_global_integer() -> Option<&'static u32> {
226    ///     ONCE.get()
227    /// }
228    ///
229    /// # #[tokio::main(flavor = "current_thread")]
230    /// # async fn main() {
231    /// let result = get_global_integer();
232    ///
233    /// assert_eq!(result, Some(&1));
234    /// # }
235    /// ```
236    ///
237    /// [`tokio-console`]: https://github.com/tokio-rs/console
238    /// [unstable feature]: crate#unstable-features
239    #[cfg(not(all(loom, test)))]
240    pub const fn const_new_with(value: T) -> Self {
241        Self {
242            value_set: AtomicBool::new(true),
243            value: UnsafeCell::new(MaybeUninit::new(value)),
244            notify: Notify::const_new(),
245        }
246    }
247
248    /// Returns `true` if the `SetOnce` currently contains a value, and `false`
249    /// otherwise.
250    pub fn initialized(&self) -> bool {
251        // Using acquire ordering so we're able to read/catch any writes that
252        // are done with `Ordering::Release`
253        self.value_set.load(Ordering::Acquire)
254    }
255
256    // SAFETY: The SetOnce must not be empty.
257    unsafe fn get_unchecked(&self) -> &T {
258        &*self.value.with(|ptr| (*ptr).as_ptr())
259    }
260
261    /// Returns a reference to the value currently stored in the `SetOnce`, or
262    /// `None` if the `SetOnce` is empty.
263    pub fn get(&self) -> Option<&T> {
264        if self.initialized() {
265            // SAFETY: the SetOnce is initialized, so we can safely
266            // call get_unchecked and return the value
267            Some(unsafe { self.get_unchecked() })
268        } else {
269            None
270        }
271    }
272
273    /// Sets the value of the `SetOnce` to the given value if the `SetOnce` is
274    /// empty.
275    ///
276    /// If the `SetOnce` already has a value, this call will fail with an
277    /// [`SetOnceError`].
278    ///
279    /// [`SetOnceError`]: crate::sync::SetOnceError
280    pub fn set(&self, value: T) -> Result<(), SetOnceError<T>> {
281        if self.initialized() {
282            return Err(SetOnceError(value));
283        }
284
285        // SAFETY: lock notify to ensure only one caller of set
286        // can run at a time.
287        let guard = self.notify.lock_waiter_list();
288
289        if self.initialized() {
290            return Err(SetOnceError(value));
291        }
292
293        // SAFETY: We have locked the mutex and checked if the value is
294        // initialized or not, so we can safely write to the value
295        unsafe {
296            self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
297        }
298
299        // Using release ordering so any threads that read a true from this
300        // atomic is able to read the value we just stored.
301        self.value_set.store(true, Ordering::Release);
302
303        // notify the waiting wakers that the value is set
304        guard.notify_waiters();
305
306        Ok(())
307    }
308
309    /// Takes the value from the cell, destroying the cell in the process.
310    /// Returns `None` if the cell is empty.
311    pub fn into_inner(self) -> Option<T> {
312        // TODO: Use get_mut()
313        let value_set = self.value_set.load(Ordering::Relaxed);
314
315        if value_set {
316            // Since we have taken ownership of self, its drop implementation
317            // will be called by the end of this function, to prevent a double
318            // free we will set the value_set to false so that the drop
319            // implementation does not try to drop the value again.
320            self.value_set.store(false, Ordering::Relaxed);
321
322            // SAFETY: The SetOnce is currently initialized, we can assume the
323            // value is initialized and return that, when we return the value
324            // we give the drop handler to the return scope.
325            Some(unsafe { self.value.with_mut(|ptr| ptr::read(ptr).assume_init()) })
326        } else {
327            None
328        }
329    }
330
331    /// Waits until the value is set.
332    ///
333    /// If the `SetOnce` is already initialized, it will return the value
334    /// immediately.
335    ///
336    /// # Cancel safety
337    ///
338    /// This method is cancel safe.
339    pub async fn wait(&self) -> &T {
340        loop {
341            if let Some(val) = self.get() {
342                return val;
343            }
344
345            let notify_fut = self.notify.notified();
346            pin!(notify_fut);
347
348            poll_fn(|cx| {
349                // Register under the notify's internal lock.
350                let ret = notify_fut.as_mut().poll(cx);
351                if self.value_set.load(Ordering::Relaxed) {
352                    return Poll::Ready(());
353                }
354                ret
355            })
356            .await;
357        }
358    }
359}
360
361// Since `get` gives us access to immutable references of the SetOnce, SetOnce
362// can only be Sync if T is Sync, otherwise SetOnce would allow sharing
363// references of !Sync values across threads. We need T to be Send in order for
364// SetOnce to by Sync because we can use `set` on `&SetOnce<T>` to send values
365// (of type T) across threads.
366unsafe impl<T: Sync + Send> Sync for SetOnce<T> {}
367
368// Access to SetOnce's value is guarded by the Atomic boolean flag
369// and atomic operations on `value_set`, so as long as T itself is Send
370// it's safe to send it to another thread
371unsafe impl<T: Send> Send for SetOnce<T> {}
372
373/// Error that can be returned from [`SetOnce::set`].
374///
375/// This error means that the `SetOnce` was already initialized when
376/// set was called
377///
378/// [`SetOnce::set`]: crate::sync::SetOnce::set
379#[derive(Debug, PartialEq, Eq)]
380pub struct SetOnceError<T>(pub T);
381
382impl<T> fmt::Display for SetOnceError<T> {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        write!(f, "SetOnceError")
385    }
386}
387
388impl<T: fmt::Debug> Error for SetOnceError<T> {}