1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 future::Future,
5 io,
6 pin::Pin,
7 sync::atomic::{AtomicUsize, Ordering},
8 task::{Context, Poll},
9};
10
11use futures_core::ready;
12use tokio::sync::{mpsc, oneshot};
13
14use crate::{arbiter::ArbiterHandle, Arbiter};
15
16static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18thread_local!(
19 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
20);
21
22#[derive(Clone, Debug)]
24pub struct System {
25 id: usize,
26 sys_tx: mpsc::UnboundedSender<SystemCommand>,
27
28 arbiter_handle: ArbiterHandle,
30}
31
32#[cfg(not(feature = "io-uring"))]
33impl System {
34 #[allow(clippy::new_ret_no_self)]
39 pub fn new() -> SystemRunner {
40 Self::with_tokio_rt(|| {
41 crate::runtime::default_tokio_runtime()
42 .expect("Default Actix (Tokio) runtime could not be created.")
43 })
44 }
45
46 pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
50 where
51 F: FnOnce() -> tokio::runtime::Runtime,
52 {
53 let (stop_tx, stop_rx) = oneshot::channel();
54 let (sys_tx, sys_rx) = mpsc::unbounded_channel();
55
56 let rt = crate::runtime::Runtime::from(runtime_factory());
57 let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
58 let system = System::construct(sys_tx, sys_arbiter.clone());
59
60 system
61 .tx()
62 .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
63 .unwrap();
64
65 let sys_ctrl = SystemController::new(sys_rx, stop_tx);
67 rt.spawn(sys_ctrl);
68
69 SystemRunner { rt, stop_rx }
70 }
71}
72
73#[cfg(feature = "io-uring")]
74impl System {
75 #[allow(clippy::new_ret_no_self)]
80 pub fn new() -> SystemRunner {
81 SystemRunner
82 }
83
84 #[doc(hidden)]
88 pub fn with_tokio_rt<F>(_: F) -> SystemRunner
89 where
90 F: FnOnce() -> tokio::runtime::Runtime,
91 {
92 unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
93 }
94}
95
96impl System {
97 pub(crate) fn construct(
99 sys_tx: mpsc::UnboundedSender<SystemCommand>,
100 arbiter_handle: ArbiterHandle,
101 ) -> Self {
102 let sys = System {
103 sys_tx,
104 arbiter_handle,
105 id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
106 };
107
108 System::set_current(sys.clone());
109
110 sys
111 }
112
113 pub fn current() -> System {
118 CURRENT.with(|cell| match *cell.borrow() {
119 Some(ref sys) => sys.clone(),
120 None => panic!("System is not running"),
121 })
122 }
123
124 pub fn try_current() -> Option<System> {
130 CURRENT.with(|cell| cell.borrow().clone())
131 }
132
133 pub fn arbiter(&self) -> &ArbiterHandle {
135 &self.arbiter_handle
136 }
137
138 pub fn is_registered() -> bool {
140 CURRENT.with(|sys| sys.borrow().is_some())
141 }
142
143 #[doc(hidden)]
145 pub fn set_current(sys: System) {
146 CURRENT.with(|cell| {
147 *cell.borrow_mut() = Some(sys);
148 })
149 }
150
151 pub fn id(&self) -> usize {
155 self.id
156 }
157
158 pub fn stop(&self) {
160 self.stop_with_code(0)
161 }
162
163 pub fn stop_with_code(&self, code: i32) {
165 let _ = self.sys_tx.send(SystemCommand::Exit(code));
166 }
167
168 pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
169 &self.sys_tx
170 }
171}
172
173#[cfg(not(feature = "io-uring"))]
175#[must_use = "A SystemRunner does nothing unless `run` is called."]
176#[derive(Debug)]
177pub struct SystemRunner {
178 rt: crate::runtime::Runtime,
179 stop_rx: oneshot::Receiver<i32>,
180}
181
182#[cfg(not(feature = "io-uring"))]
183impl SystemRunner {
184 pub fn run(self) -> io::Result<()> {
186 let exit_code = self.run_with_code()?;
187
188 match exit_code {
189 0 => Ok(()),
190 nonzero => Err(io::Error::new(
191 io::ErrorKind::Other,
192 format!("Non-zero exit code: {}", nonzero),
193 )),
194 }
195 }
196
197 pub fn run_with_code(self) -> io::Result<i32> {
199 let SystemRunner { rt, stop_rx, .. } = self;
200
201 rt.block_on(stop_rx)
203 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
204 }
205
206 pub fn runtime(&self) -> &crate::runtime::Runtime {
237 &self.rt
238 }
239
240 #[track_caller]
242 #[inline]
243 pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
244 self.rt.block_on(fut)
245 }
246}
247
248#[cfg(feature = "io-uring")]
250#[must_use = "A SystemRunner does nothing unless `run` is called."]
251#[derive(Debug)]
252pub struct SystemRunner;
253
254#[cfg(feature = "io-uring")]
255impl SystemRunner {
256 pub fn run(self) -> io::Result<()> {
258 unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
259 }
260
261 pub fn run_with_code(self) -> io::Result<i32> {
263 unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
264 }
265
266 #[inline]
268 pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
269 tokio_uring::start(async move {
270 let (stop_tx, stop_rx) = oneshot::channel();
271 let (sys_tx, sys_rx) = mpsc::unbounded_channel();
272
273 let sys_arbiter = Arbiter::in_new_system();
274 let system = System::construct(sys_tx, sys_arbiter.clone());
275
276 system
277 .tx()
278 .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
279 .unwrap();
280
281 let sys_ctrl = SystemController::new(sys_rx, stop_tx);
283 tokio_uring::spawn(sys_ctrl);
284
285 let res = fut.await;
286 drop(stop_rx);
287 res
288 })
289 }
290}
291
292#[derive(Debug)]
293pub(crate) enum SystemCommand {
294 Exit(i32),
295 RegisterArbiter(usize, ArbiterHandle),
296 DeregisterArbiter(usize),
297}
298
299#[derive(Debug)]
302pub(crate) struct SystemController {
303 stop_tx: Option<oneshot::Sender<i32>>,
304 cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
305 arbiters: HashMap<usize, ArbiterHandle>,
306}
307
308impl SystemController {
309 pub(crate) fn new(
310 cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
311 stop_tx: oneshot::Sender<i32>,
312 ) -> Self {
313 SystemController {
314 cmd_rx,
315 stop_tx: Some(stop_tx),
316 arbiters: HashMap::with_capacity(4),
317 }
318 }
319}
320
321impl Future for SystemController {
322 type Output = ();
323
324 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
325 loop {
327 match ready!(self.cmd_rx.poll_recv(cx)) {
328 None => return Poll::Ready(()),
330
331 Some(cmd) => match cmd {
333 SystemCommand::Exit(code) => {
334 for arb in self.arbiters.values() {
336 arb.stop();
337 }
338
339 if let Some(stop_tx) = self.stop_tx.take() {
342 let _ = stop_tx.send(code);
343 }
344 }
345
346 SystemCommand::RegisterArbiter(id, arb) => {
347 self.arbiters.insert(id, arb);
348 }
349
350 SystemCommand::DeregisterArbiter(id) => {
351 self.arbiters.remove(&id);
352 }
353 },
354 }
355 }
356 }
357}