1use std::{
2 future::Future,
3 io, mem,
4 pin::Pin,
5 task::{Context, Poll},
6 thread,
7 time::Duration,
8};
9
10use actix_rt::{time::sleep, System};
11use futures_core::{future::BoxFuture, Stream};
12use futures_util::stream::StreamExt as _;
13use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
14use tracing::{error, info};
15
16use crate::{
17 accept::Accept,
18 builder::ServerBuilder,
19 join_all::join_all,
20 service::InternalServiceFactory,
21 signals::{SignalKind, Signals},
22 waker_queue::{WakerInterest, WakerQueue},
23 worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
24 ServerHandle,
25};
26
27#[derive(Debug)]
28pub(crate) enum ServerCommand {
29 WorkerFaulted(usize),
33
34 Pause(oneshot::Sender<()>),
38
39 Resume(oneshot::Sender<()>),
43
44 Stop {
46 graceful: bool,
48
49 completion: Option<oneshot::Sender<()>>,
51
52 force_system_stop: bool,
54 },
55}
56
57#[must_use = "Server does nothing unless you `.await` or poll it"]
125pub struct Server {
126 handle: ServerHandle,
127 fut: BoxFuture<'static, io::Result<()>>,
128}
129
130impl Server {
131 pub fn build() -> ServerBuilder {
133 ServerBuilder::default()
134 }
135
136 pub(crate) fn new(builder: ServerBuilder) -> Self {
137 Server {
138 handle: ServerHandle::new(builder.cmd_tx.clone()),
139 fut: Box::pin(ServerInner::run(builder)),
140 }
141 }
142
143 pub fn handle(&self) -> ServerHandle {
147 self.handle.clone()
148 }
149}
150
151impl Future for Server {
152 type Output = io::Result<()>;
153
154 #[inline]
155 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156 Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
157 }
158}
159
160pub struct ServerInner {
161 worker_handles: Vec<WorkerHandleServer>,
162 accept_handle: Option<thread::JoinHandle<()>>,
163 worker_config: ServerWorkerConfig,
164 services: Vec<Box<dyn InternalServiceFactory>>,
165 waker_queue: WakerQueue,
166 system_stop: bool,
167 stopping: bool,
168}
169
170impl ServerInner {
171 async fn run(builder: ServerBuilder) -> io::Result<()> {
172 let (mut this, mut mux) = Self::run_sync(builder)?;
173
174 while let Some(cmd) = mux.next().await {
175 this.handle_cmd(cmd).await;
176
177 if this.stopping {
178 break;
179 }
180 }
181
182 Ok(())
183 }
184
185 fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
186 let is_actix = actix_rt::System::try_current().is_some();
188 let is_tokio = tokio::runtime::Handle::try_current().is_ok();
189
190 match (is_actix, is_tokio) {
191 (true, _) => info!("Actix runtime found; starting in Actix runtime"),
192 (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
193 (_, false) => panic!("Actix or Tokio runtime not found; halting"),
194 }
195
196 for (_, name, lst) in &builder.sockets {
197 info!(
198 r#"starting service: "{}", workers: {}, listening on: {}"#,
199 name,
200 builder.threads,
201 lst.local_addr()
202 );
203 }
204
205 let sockets = mem::take(&mut builder.sockets)
206 .into_iter()
207 .map(|t| (t.0, t.2))
208 .collect();
209
210 let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
211
212 let mux = ServerEventMultiplexer {
213 signal_fut: (builder.listen_os_signals).then(Signals::new),
214 cmd_rx: builder.cmd_rx,
215 };
216
217 let server = ServerInner {
218 waker_queue,
219 accept_handle: Some(accept_handle),
220 worker_handles,
221 worker_config: builder.worker_config,
222 services: builder.factories,
223 system_stop: builder.exit,
224 stopping: false,
225 };
226
227 Ok((server, mux))
228 }
229
230 async fn handle_cmd(&mut self, item: ServerCommand) {
231 match item {
232 ServerCommand::Pause(tx) => {
233 self.waker_queue.wake(WakerInterest::Pause);
234 let _ = tx.send(());
235 }
236
237 ServerCommand::Resume(tx) => {
238 self.waker_queue.wake(WakerInterest::Resume);
239 let _ = tx.send(());
240 }
241
242 ServerCommand::Stop {
243 graceful,
244 completion,
245 force_system_stop,
246 } => {
247 self.stopping = true;
248
249 self.waker_queue.wake(WakerInterest::Stop);
252
253 let workers_stop = self
255 .worker_handles
256 .iter()
257 .map(|worker| worker.stop(graceful))
258 .collect::<Vec<_>>();
259
260 if graceful {
261 let _ = join_all(workers_stop).await;
263 }
264
265 self.accept_handle
267 .take()
268 .unwrap()
269 .join()
270 .expect("Accept thread must not panic in any case");
271
272 if let Some(tx) = completion {
273 let _ = tx.send(());
274 }
275
276 if self.system_stop || force_system_stop {
277 sleep(Duration::from_millis(300)).await;
278 System::try_current().as_ref().map(System::stop);
279 }
280 }
281
282 ServerCommand::WorkerFaulted(idx) => {
283 assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
285
286 error!("worker {} has died; restarting", idx);
287
288 let factories = self
289 .services
290 .iter()
291 .map(|service| service.clone_factory())
292 .collect();
293
294 match ServerWorker::start(
295 idx,
296 factories,
297 self.waker_queue.clone(),
298 self.worker_config,
299 ) {
300 Ok((handle_accept, handle_server)) => {
301 *self
302 .worker_handles
303 .iter_mut()
304 .find(|wrk| wrk.idx == idx)
305 .unwrap() = handle_server;
306
307 self.waker_queue.wake(WakerInterest::Worker(handle_accept));
308 }
309
310 Err(err) => error!("can not restart worker {}: {}", idx, err),
311 };
312 }
313 }
314 }
315
316 fn map_signal(signal: SignalKind) -> ServerCommand {
317 match signal {
318 SignalKind::Int => {
319 info!("SIGINT received; starting forced shutdown");
320 ServerCommand::Stop {
321 graceful: false,
322 completion: None,
323 force_system_stop: true,
324 }
325 }
326
327 SignalKind::Term => {
328 info!("SIGTERM received; starting graceful shutdown");
329 ServerCommand::Stop {
330 graceful: true,
331 completion: None,
332 force_system_stop: true,
333 }
334 }
335
336 SignalKind::Quit => {
337 info!("SIGQUIT received; starting forced shutdown");
338 ServerCommand::Stop {
339 graceful: false,
340 completion: None,
341 force_system_stop: true,
342 }
343 }
344 }
345 }
346}
347
348struct ServerEventMultiplexer {
349 cmd_rx: UnboundedReceiver<ServerCommand>,
350 signal_fut: Option<Signals>,
351}
352
353impl Stream for ServerEventMultiplexer {
354 type Item = ServerCommand;
355
356 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357 let this = Pin::into_inner(self);
358
359 if let Some(signal_fut) = &mut this.signal_fut {
360 if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
361 this.signal_fut = None;
362 return Poll::Ready(Some(ServerInner::map_signal(signal)));
363 }
364 }
365
366 this.cmd_rx.poll_recv(cx)
367 }
368}