actix_rt/
system.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    future::Future,
5    io,
6    pin::Pin,
7    sync::atomic::{AtomicUsize, Ordering},
8    task::{Context, Poll},
9};
10
11use futures_core::ready;
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{arbiter::ArbiterHandle, Arbiter};
15
16static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18thread_local!(
19    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
20);
21
22/// A manager for a per-thread distributed async runtime.
23#[derive(Clone, Debug)]
24pub struct System {
25    id: usize,
26    sys_tx: mpsc::UnboundedSender<SystemCommand>,
27
28    /// Handle to the first [Arbiter] that is created with the System.
29    arbiter_handle: ArbiterHandle,
30}
31
32#[cfg(not(feature = "io-uring"))]
33impl System {
34    /// Create a new system.
35    ///
36    /// # Panics
37    /// Panics if underlying Tokio runtime can not be created.
38    #[allow(clippy::new_ret_no_self)]
39    pub fn new() -> SystemRunner {
40        Self::with_tokio_rt(|| {
41            crate::runtime::default_tokio_runtime()
42                .expect("Default Actix (Tokio) runtime could not be created.")
43        })
44    }
45
46    /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
47    ///
48    /// [tokio-runtime]: tokio::runtime::Runtime
49    pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
50    where
51        F: FnOnce() -> tokio::runtime::Runtime,
52    {
53        let (stop_tx, stop_rx) = oneshot::channel();
54        let (sys_tx, sys_rx) = mpsc::unbounded_channel();
55
56        let rt = crate::runtime::Runtime::from(runtime_factory());
57        let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
58        let system = System::construct(sys_tx, sys_arbiter.clone());
59
60        system
61            .tx()
62            .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
63            .unwrap();
64
65        // init background system arbiter
66        let sys_ctrl = SystemController::new(sys_rx, stop_tx);
67        rt.spawn(sys_ctrl);
68
69        SystemRunner { rt, stop_rx }
70    }
71}
72
73#[cfg(feature = "io-uring")]
74impl System {
75    /// Create a new system.
76    ///
77    /// # Panics
78    /// Panics if underlying Tokio runtime can not be created.
79    #[allow(clippy::new_ret_no_self)]
80    pub fn new() -> SystemRunner {
81        SystemRunner
82    }
83
84    /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
85    ///
86    /// [tokio-runtime]: tokio::runtime::Runtime
87    #[doc(hidden)]
88    pub fn with_tokio_rt<F>(_: F) -> SystemRunner
89    where
90        F: FnOnce() -> tokio::runtime::Runtime,
91    {
92        unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
93    }
94}
95
96impl System {
97    /// Constructs new system and registers it on the current thread.
98    pub(crate) fn construct(
99        sys_tx: mpsc::UnboundedSender<SystemCommand>,
100        arbiter_handle: ArbiterHandle,
101    ) -> Self {
102        let sys = System {
103            sys_tx,
104            arbiter_handle,
105            id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
106        };
107
108        System::set_current(sys.clone());
109
110        sys
111    }
112
113    /// Get current running system.
114    ///
115    /// # Panics
116    /// Panics if no system is registered on the current thread.
117    pub fn current() -> System {
118        CURRENT.with(|cell| match *cell.borrow() {
119            Some(ref sys) => sys.clone(),
120            None => panic!("System is not running"),
121        })
122    }
123
124    /// Try to get current running system.
125    ///
126    /// Returns `None` if no System has been started.
127    ///
128    /// Unlike [`current`](Self::current), this never panics.
129    pub fn try_current() -> Option<System> {
130        CURRENT.with(|cell| cell.borrow().clone())
131    }
132
133    /// Get handle to a the System's initial [Arbiter].
134    pub fn arbiter(&self) -> &ArbiterHandle {
135        &self.arbiter_handle
136    }
137
138    /// Check if there is a System registered on the current thread.
139    pub fn is_registered() -> bool {
140        CURRENT.with(|sys| sys.borrow().is_some())
141    }
142
143    /// Register given system on current thread.
144    #[doc(hidden)]
145    pub fn set_current(sys: System) {
146        CURRENT.with(|cell| {
147            *cell.borrow_mut() = Some(sys);
148        })
149    }
150
151    /// Numeric system identifier.
152    ///
153    /// Useful when using multiple Systems.
154    pub fn id(&self) -> usize {
155        self.id
156    }
157
158    /// Stop the system (with code 0).
159    pub fn stop(&self) {
160        self.stop_with_code(0)
161    }
162
163    /// Stop the system with a given exit code.
164    pub fn stop_with_code(&self, code: i32) {
165        let _ = self.sys_tx.send(SystemCommand::Exit(code));
166    }
167
168    pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
169        &self.sys_tx
170    }
171}
172
173/// Runner that keeps a [System]'s event loop alive until stop message is received.
174#[cfg(not(feature = "io-uring"))]
175#[must_use = "A SystemRunner does nothing unless `run` is called."]
176#[derive(Debug)]
177pub struct SystemRunner {
178    rt: crate::runtime::Runtime,
179    stop_rx: oneshot::Receiver<i32>,
180}
181
182#[cfg(not(feature = "io-uring"))]
183impl SystemRunner {
184    /// Starts event loop and will return once [System] is [stopped](System::stop).
185    pub fn run(self) -> io::Result<()> {
186        let exit_code = self.run_with_code()?;
187
188        match exit_code {
189            0 => Ok(()),
190            nonzero => Err(io::Error::new(
191                io::ErrorKind::Other,
192                format!("Non-zero exit code: {}", nonzero),
193            )),
194        }
195    }
196
197    /// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
198    pub fn run_with_code(self) -> io::Result<i32> {
199        let SystemRunner { rt, stop_rx, .. } = self;
200
201        // run loop
202        rt.block_on(stop_rx)
203            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
204    }
205
206    /// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this
207    /// `SystemRunner` instance.
208    ///
209    /// The Actix runtime is responsible for managing the event loop for an Actix system and
210    /// executing asynchronous tasks. This method provides access to the runtime, allowing direct
211    /// interaction with its features.
212    ///
213    /// In a typical use case, you might need to share the same runtime between different
214    /// parts of your project. For example, some components might require a [`Runtime`] to spawn
215    /// tasks on the same runtime.
216    ///
217    /// Read more in the documentation for [`Runtime`].
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// let system_runner = actix_rt::System::new();
223    /// let actix_runtime = system_runner.runtime();
224    ///
225    /// // Use the runtime to spawn an async task or perform other operations
226    /// ```
227    ///
228    /// # Note
229    ///
230    /// While this method provides an immutable reference to the Actix runtime, which is safe to
231    /// share across threads, be aware that spawning blocking tasks on the Actix runtime could
232    /// potentially impact system performance. This is because the Actix runtime is responsible for
233    /// driving the system, and blocking tasks could delay other tasks in the run loop.
234    ///
235    /// [`Runtime`]: crate::Runtime
236    pub fn runtime(&self) -> &crate::runtime::Runtime {
237        &self.rt
238    }
239
240    /// Runs the provided future, blocking the current thread until the future completes.
241    #[track_caller]
242    #[inline]
243    pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
244        self.rt.block_on(fut)
245    }
246}
247
248/// Runner that keeps a [System]'s event loop alive until stop message is received.
249#[cfg(feature = "io-uring")]
250#[must_use = "A SystemRunner does nothing unless `run` is called."]
251#[derive(Debug)]
252pub struct SystemRunner;
253
254#[cfg(feature = "io-uring")]
255impl SystemRunner {
256    /// Starts event loop and will return once [System] is [stopped](System::stop).
257    pub fn run(self) -> io::Result<()> {
258        unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
259    }
260
261    /// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
262    pub fn run_with_code(self) -> io::Result<i32> {
263        unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
264    }
265
266    /// Runs the provided future, blocking the current thread until the future completes.
267    #[inline]
268    pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
269        tokio_uring::start(async move {
270            let (stop_tx, stop_rx) = oneshot::channel();
271            let (sys_tx, sys_rx) = mpsc::unbounded_channel();
272
273            let sys_arbiter = Arbiter::in_new_system();
274            let system = System::construct(sys_tx, sys_arbiter.clone());
275
276            system
277                .tx()
278                .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
279                .unwrap();
280
281            // init background system arbiter
282            let sys_ctrl = SystemController::new(sys_rx, stop_tx);
283            tokio_uring::spawn(sys_ctrl);
284
285            let res = fut.await;
286            drop(stop_rx);
287            res
288        })
289    }
290}
291
292#[derive(Debug)]
293pub(crate) enum SystemCommand {
294    Exit(i32),
295    RegisterArbiter(usize, ArbiterHandle),
296    DeregisterArbiter(usize),
297}
298
299/// There is one `SystemController` per [System]. It runs in the background, keeping track of
300/// [Arbiter]s and is able to distribute a system-wide stop command.
301#[derive(Debug)]
302pub(crate) struct SystemController {
303    stop_tx: Option<oneshot::Sender<i32>>,
304    cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
305    arbiters: HashMap<usize, ArbiterHandle>,
306}
307
308impl SystemController {
309    pub(crate) fn new(
310        cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
311        stop_tx: oneshot::Sender<i32>,
312    ) -> Self {
313        SystemController {
314            cmd_rx,
315            stop_tx: Some(stop_tx),
316            arbiters: HashMap::with_capacity(4),
317        }
318    }
319}
320
321impl Future for SystemController {
322    type Output = ();
323
324    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
325        // process all items currently buffered in channel
326        loop {
327            match ready!(self.cmd_rx.poll_recv(cx)) {
328                // channel closed; no more messages can be received
329                None => return Poll::Ready(()),
330
331                // process system command
332                Some(cmd) => match cmd {
333                    SystemCommand::Exit(code) => {
334                        // stop all arbiters
335                        for arb in self.arbiters.values() {
336                            arb.stop();
337                        }
338
339                        // stop event loop
340                        // will only fire once
341                        if let Some(stop_tx) = self.stop_tx.take() {
342                            let _ = stop_tx.send(code);
343                        }
344                    }
345
346                    SystemCommand::RegisterArbiter(id, arb) => {
347                        self.arbiters.insert(id, arb);
348                    }
349
350                    SystemCommand::DeregisterArbiter(id) => {
351                        self.arbiters.remove(&id);
352                    }
353                },
354            }
355        }
356    }
357}