actix_server/
server.rs

1use std::{
2    future::Future,
3    io, mem,
4    pin::Pin,
5    task::{Context, Poll},
6    thread,
7    time::Duration,
8};
9
10use actix_rt::{time::sleep, System};
11use futures_core::{future::BoxFuture, Stream};
12use futures_util::stream::StreamExt as _;
13use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
14use tracing::{error, info};
15
16use crate::{
17    accept::Accept,
18    builder::ServerBuilder,
19    join_all::join_all,
20    service::InternalServiceFactory,
21    signals::{SignalKind, Signals},
22    waker_queue::{WakerInterest, WakerQueue},
23    worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
24    ServerHandle,
25};
26
27#[derive(Debug)]
28pub(crate) enum ServerCommand {
29    /// Worker failed to accept connection, indicating a probable panic.
30    ///
31    /// Contains index of faulted worker.
32    WorkerFaulted(usize),
33
34    /// Pause accepting connections.
35    ///
36    /// Contains return channel to notify caller of successful state change.
37    Pause(oneshot::Sender<()>),
38
39    /// Resume accepting connections.
40    ///
41    /// Contains return channel to notify caller of successful state change.
42    Resume(oneshot::Sender<()>),
43
44    /// Stop accepting connections and begin shutdown procedure.
45    Stop {
46        /// True if shut down should be graceful.
47        graceful: bool,
48
49        /// Return channel to notify caller that shutdown is complete.
50        completion: Option<oneshot::Sender<()>>,
51
52        /// Force System exit when true, overriding `ServerBuilder::system_exit()` if it is false.
53        force_system_stop: bool,
54    },
55}
56
57/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
58///
59/// Handles creating worker threads, restarting faulted workers, connection accepting, and
60/// back-pressure logic.
61///
62/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
63/// distributes connections with a round-robin strategy.
64///
65/// The [Server] must be awaited or polled in order to start running. It will resolve when the
66/// server has fully shut down.
67///
68/// # Shutdown Signals
69/// On UNIX systems, `SIGTERM` will start a graceful shutdown and `SIGQUIT` or `SIGINT` will start a
70/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
71///
72/// A graceful shutdown will wait for all workers to stop first.
73///
74/// # Examples
75/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
76///
77/// ```no_run
78/// use std::io;
79///
80/// use actix_rt::net::TcpStream;
81/// use actix_server::Server;
82/// use actix_service::{fn_service, ServiceFactoryExt as _};
83/// use bytes::BytesMut;
84/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
85///
86/// #[actix_rt::main]
87/// async fn main() -> io::Result<()> {
88///     let bind_addr = ("127.0.0.1", 8080);
89///
90///     Server::build()
91///         .bind("echo", bind_addr, move || {
92///             fn_service(move |mut stream: TcpStream| {
93///                 async move {
94///                     let mut size = 0;
95///                     let mut buf = BytesMut::new();
96///
97///                     loop {
98///                         match stream.read_buf(&mut buf).await {
99///                             // end of stream; bail from loop
100///                             Ok(0) => break,
101///
102///                             // write bytes back to stream
103///                             Ok(bytes_read) => {
104///                                 stream.write_all(&buf[size..]).await.unwrap();
105///                                 size += bytes_read;
106///                             }
107///
108///                             Err(err) => {
109///                                 eprintln!("Stream Error: {:?}", err);
110///                                 return Err(());
111///                             }
112///                         }
113///                     }
114///
115///                     Ok(())
116///                 }
117///             })
118///             .map_err(|err| eprintln!("Service Error: {:?}", err))
119///         })?
120///         .run()
121///         .await
122/// }
123/// ```
124#[must_use = "Server does nothing unless you `.await` or poll it"]
125pub struct Server {
126    handle: ServerHandle,
127    fut: BoxFuture<'static, io::Result<()>>,
128}
129
130impl Server {
131    /// Create server build.
132    pub fn build() -> ServerBuilder {
133        ServerBuilder::default()
134    }
135
136    pub(crate) fn new(builder: ServerBuilder) -> Self {
137        Server {
138            handle: ServerHandle::new(builder.cmd_tx.clone()),
139            fut: Box::pin(ServerInner::run(builder)),
140        }
141    }
142
143    /// Get a `Server` handle that can be used issue commands and change it's state.
144    ///
145    /// See [ServerHandle](ServerHandle) for usage.
146    pub fn handle(&self) -> ServerHandle {
147        self.handle.clone()
148    }
149}
150
151impl Future for Server {
152    type Output = io::Result<()>;
153
154    #[inline]
155    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156        Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
157    }
158}
159
160pub struct ServerInner {
161    worker_handles: Vec<WorkerHandleServer>,
162    accept_handle: Option<thread::JoinHandle<()>>,
163    worker_config: ServerWorkerConfig,
164    services: Vec<Box<dyn InternalServiceFactory>>,
165    waker_queue: WakerQueue,
166    system_stop: bool,
167    stopping: bool,
168}
169
170impl ServerInner {
171    async fn run(builder: ServerBuilder) -> io::Result<()> {
172        let (mut this, mut mux) = Self::run_sync(builder)?;
173
174        while let Some(cmd) = mux.next().await {
175            this.handle_cmd(cmd).await;
176
177            if this.stopping {
178                break;
179            }
180        }
181
182        Ok(())
183    }
184
185    fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
186        // Give log information on what runtime will be used.
187        let is_actix = actix_rt::System::try_current().is_some();
188        let is_tokio = tokio::runtime::Handle::try_current().is_ok();
189
190        match (is_actix, is_tokio) {
191            (true, _) => info!("Actix runtime found; starting in Actix runtime"),
192            (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
193            (_, false) => panic!("Actix or Tokio runtime not found; halting"),
194        }
195
196        for (_, name, lst) in &builder.sockets {
197            info!(
198                r#"starting service: "{}", workers: {}, listening on: {}"#,
199                name,
200                builder.threads,
201                lst.local_addr()
202            );
203        }
204
205        let sockets = mem::take(&mut builder.sockets)
206            .into_iter()
207            .map(|t| (t.0, t.2))
208            .collect();
209
210        let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
211
212        let mux = ServerEventMultiplexer {
213            signal_fut: (builder.listen_os_signals).then(Signals::new),
214            cmd_rx: builder.cmd_rx,
215        };
216
217        let server = ServerInner {
218            waker_queue,
219            accept_handle: Some(accept_handle),
220            worker_handles,
221            worker_config: builder.worker_config,
222            services: builder.factories,
223            system_stop: builder.exit,
224            stopping: false,
225        };
226
227        Ok((server, mux))
228    }
229
230    async fn handle_cmd(&mut self, item: ServerCommand) {
231        match item {
232            ServerCommand::Pause(tx) => {
233                self.waker_queue.wake(WakerInterest::Pause);
234                let _ = tx.send(());
235            }
236
237            ServerCommand::Resume(tx) => {
238                self.waker_queue.wake(WakerInterest::Resume);
239                let _ = tx.send(());
240            }
241
242            ServerCommand::Stop {
243                graceful,
244                completion,
245                force_system_stop,
246            } => {
247                self.stopping = true;
248
249                // Signal accept thread to stop.
250                // Signal is non-blocking; we wait for thread to stop later.
251                self.waker_queue.wake(WakerInterest::Stop);
252
253                // send stop signal to workers
254                let workers_stop = self
255                    .worker_handles
256                    .iter()
257                    .map(|worker| worker.stop(graceful))
258                    .collect::<Vec<_>>();
259
260                if graceful {
261                    // wait for all workers to shut down
262                    let _ = join_all(workers_stop).await;
263                }
264
265                // wait for accept thread stop
266                self.accept_handle
267                    .take()
268                    .unwrap()
269                    .join()
270                    .expect("Accept thread must not panic in any case");
271
272                if let Some(tx) = completion {
273                    let _ = tx.send(());
274                }
275
276                if self.system_stop || force_system_stop {
277                    sleep(Duration::from_millis(300)).await;
278                    System::try_current().as_ref().map(System::stop);
279                }
280            }
281
282            ServerCommand::WorkerFaulted(idx) => {
283                // TODO: maybe just return with warning log if not found ?
284                assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
285
286                error!("worker {} has died; restarting", idx);
287
288                let factories = self
289                    .services
290                    .iter()
291                    .map(|service| service.clone_factory())
292                    .collect();
293
294                match ServerWorker::start(
295                    idx,
296                    factories,
297                    self.waker_queue.clone(),
298                    self.worker_config,
299                ) {
300                    Ok((handle_accept, handle_server)) => {
301                        *self
302                            .worker_handles
303                            .iter_mut()
304                            .find(|wrk| wrk.idx == idx)
305                            .unwrap() = handle_server;
306
307                        self.waker_queue.wake(WakerInterest::Worker(handle_accept));
308                    }
309
310                    Err(err) => error!("can not restart worker {}: {}", idx, err),
311                };
312            }
313        }
314    }
315
316    fn map_signal(signal: SignalKind) -> ServerCommand {
317        match signal {
318            SignalKind::Int => {
319                info!("SIGINT received; starting forced shutdown");
320                ServerCommand::Stop {
321                    graceful: false,
322                    completion: None,
323                    force_system_stop: true,
324                }
325            }
326
327            SignalKind::Term => {
328                info!("SIGTERM received; starting graceful shutdown");
329                ServerCommand::Stop {
330                    graceful: true,
331                    completion: None,
332                    force_system_stop: true,
333                }
334            }
335
336            SignalKind::Quit => {
337                info!("SIGQUIT received; starting forced shutdown");
338                ServerCommand::Stop {
339                    graceful: false,
340                    completion: None,
341                    force_system_stop: true,
342                }
343            }
344        }
345    }
346}
347
348struct ServerEventMultiplexer {
349    cmd_rx: UnboundedReceiver<ServerCommand>,
350    signal_fut: Option<Signals>,
351}
352
353impl Stream for ServerEventMultiplexer {
354    type Item = ServerCommand;
355
356    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357        let this = Pin::into_inner(self);
358
359        if let Some(signal_fut) = &mut this.signal_fut {
360            if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
361                this.signal_fut = None;
362                return Poll::Ready(Some(ServerInner::map_signal(signal)));
363            }
364        }
365
366        this.cmd_rx.poll_recv(cx)
367    }
368}