actix_server/
builder.rs

1use std::{io, num::NonZeroUsize, time::Duration};
2
3use actix_rt::net::TcpStream;
4use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
5
6use crate::{
7    server::ServerCommand,
8    service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
9    socket::{create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs},
10    worker::ServerWorkerConfig,
11    Server,
12};
13
14/// Multipath TCP (MPTCP) preference.
15///
16/// Currently only useful on Linux.
17///
18#[cfg_attr(target_os = "linux", doc = "Also see [`ServerBuilder::mptcp()`].")]
19#[derive(Debug, Clone)]
20pub enum MpTcp {
21    /// MPTCP will not be used when binding sockets.
22    Disabled,
23
24    /// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
25    /// attempted, too.
26    TcpFallback,
27
28    /// MPTCP will be used when binding sockets (with no fallback).
29    NoFallback,
30}
31
32/// [Server] builder.
33pub struct ServerBuilder {
34    pub(crate) threads: usize,
35    pub(crate) token: usize,
36    pub(crate) backlog: u32,
37    pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
38    pub(crate) sockets: Vec<(usize, String, MioListener)>,
39    pub(crate) mptcp: MpTcp,
40    pub(crate) exit: bool,
41    pub(crate) listen_os_signals: bool,
42    pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
43    pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
44    pub(crate) worker_config: ServerWorkerConfig,
45}
46
47impl Default for ServerBuilder {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl ServerBuilder {
54    /// Create new Server builder instance
55    pub fn new() -> ServerBuilder {
56        let (cmd_tx, cmd_rx) = unbounded_channel();
57
58        ServerBuilder {
59            threads: std::thread::available_parallelism().map_or(2, NonZeroUsize::get),
60            token: 0,
61            factories: Vec::new(),
62            sockets: Vec::new(),
63            backlog: 2048,
64            mptcp: MpTcp::Disabled,
65            exit: false,
66            listen_os_signals: true,
67            cmd_tx,
68            cmd_rx,
69            worker_config: ServerWorkerConfig::default(),
70        }
71    }
72
73    /// Sets number of workers to start.
74    ///
75    /// See [`bind()`](Self::bind()) for more details on how worker count affects the number of
76    /// server factory instantiations.
77    ///
78    /// The default worker count is the determined by [`std::thread::available_parallelism()`]. See
79    /// its documentation to determine what behavior you should expect when server is run.
80    ///
81    /// `num` must be greater than 0.
82    ///
83    /// # Panics
84    ///
85    /// Panics if `num` is 0.
86    pub fn workers(mut self, num: usize) -> Self {
87        assert_ne!(num, 0, "workers must be greater than 0");
88        self.threads = num;
89        self
90    }
91
92    /// Set max number of threads for each worker's blocking task thread pool.
93    ///
94    /// One thread pool is set up **per worker**; not shared across workers.
95    ///
96    /// # Examples:
97    /// ```
98    /// # use actix_server::ServerBuilder;
99    /// let builder = ServerBuilder::new()
100    ///     .workers(4) // server has 4 worker thread.
101    ///     .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
102    /// ```
103    ///
104    /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
105    pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
106        self.worker_config.max_blocking_threads(num);
107        self
108    }
109
110    /// Set the maximum number of pending connections.
111    ///
112    /// This refers to the number of clients that can be waiting to be served. Exceeding this number
113    /// results in the client getting an error when attempting to connect. It should only affect
114    /// servers under significant load.
115    ///
116    /// Generally set in the 64-2048 range. Default value is 2048.
117    ///
118    /// This method should be called before `bind()` method call.
119    pub fn backlog(mut self, num: u32) -> Self {
120        self.backlog = num;
121        self
122    }
123
124    /// Sets MultiPath TCP (MPTCP) preference on bound sockets.
125    ///
126    /// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
127    /// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
128    /// for more info about MPTCP itself.
129    ///
130    /// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
131    /// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
132    ///
133    /// This method will have no effect if called after a `bind()`.
134    ///
135    /// [mptcp.dev]: https://www.mptcp.dev
136    #[cfg(target_os = "linux")]
137    pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
138        self.mptcp = mptcp_enabled;
139        self
140    }
141
142    /// Sets the maximum per-worker number of concurrent connections.
143    ///
144    /// All socket listeners will stop accepting connections when this limit is reached for
145    /// each worker.
146    ///
147    /// By default max connections is set to a 25k per worker.
148    pub fn max_concurrent_connections(mut self, num: usize) -> Self {
149        self.worker_config.max_concurrent_connections(num);
150        self
151    }
152
153    #[doc(hidden)]
154    #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
155    pub fn maxconn(self, num: usize) -> Self {
156        self.max_concurrent_connections(num)
157    }
158
159    /// Sets flag to stop Actix `System` after server shutdown.
160    ///
161    /// This has no effect when server is running in a Tokio-only runtime.
162    pub fn system_exit(mut self) -> Self {
163        self.exit = true;
164        self
165    }
166
167    /// Disables OS signal handling.
168    pub fn disable_signals(mut self) -> Self {
169        self.listen_os_signals = false;
170        self
171    }
172
173    /// Timeout for graceful workers shutdown in seconds.
174    ///
175    /// After receiving a stop signal, workers have this much time to finish serving requests.
176    /// Workers still alive after the timeout are force dropped.
177    ///
178    /// By default shutdown timeout sets to 30 seconds.
179    pub fn shutdown_timeout(mut self, sec: u64) -> Self {
180        self.worker_config
181            .shutdown_timeout(Duration::from_secs(sec));
182        self
183    }
184
185    /// Adds new service to the server.
186    ///
187    /// Note that, if a DNS lookup is required, resolving hostnames is a blocking operation.
188    ///
189    /// # Worker Count
190    ///
191    /// The `factory` will be instantiated multiple times in most scenarios. The number of
192    /// instantiations is number of [`workers`](Self::workers()) × number of sockets resolved by
193    /// `addrs`.
194    ///
195    /// For example, if you've manually set [`workers`](Self::workers()) to 2, and use `127.0.0.1`
196    /// as the bind `addrs`, then `factory` will be instantiated twice. However, using `localhost`
197    /// as the bind `addrs` can often resolve to both `127.0.0.1` (IPv4) _and_ `::1` (IPv6), causing
198    /// the `factory` to be instantiated 4 times (2 workers × 2 bind addresses).
199    ///
200    /// Using a bind address of `0.0.0.0`, which signals to use all interfaces, may also multiple
201    /// the number of instantiations in a similar way.
202    ///
203    /// # Errors
204    ///
205    /// Returns an `io::Error` if:
206    /// - `addrs` cannot be resolved into one or more socket addresses;
207    /// - all the resolved socket addresses are already bound.
208    pub fn bind<F, U, N>(mut self, name: N, addrs: U, factory: F) -> io::Result<Self>
209    where
210        F: ServerServiceFactory<TcpStream>,
211        U: ToSocketAddrs,
212        N: AsRef<str>,
213    {
214        let sockets = bind_addr(addrs, self.backlog, &self.mptcp)?;
215
216        tracing::trace!("binding server to: {sockets:?}");
217
218        for lst in sockets {
219            let token = self.next_token();
220
221            self.factories.push(StreamNewService::create(
222                name.as_ref().to_string(),
223                token,
224                factory.clone(),
225                lst.local_addr()?,
226            ));
227
228            self.sockets
229                .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
230        }
231
232        Ok(self)
233    }
234
235    /// Adds service to the server using a socket listener already bound.
236    ///
237    /// # Worker Count
238    ///
239    /// The `factory` will be instantiated multiple times in most scenarios. The number of
240    /// instantiations is: number of [`workers`](Self::workers()).
241    pub fn listen<F, N: AsRef<str>>(
242        mut self,
243        name: N,
244        lst: StdTcpListener,
245        factory: F,
246    ) -> io::Result<Self>
247    where
248        F: ServerServiceFactory<TcpStream>,
249    {
250        lst.set_nonblocking(true)?;
251        let addr = lst.local_addr()?;
252
253        let token = self.next_token();
254        self.factories.push(StreamNewService::create(
255            name.as_ref().to_string(),
256            token,
257            factory,
258            addr,
259        ));
260
261        self.sockets
262            .push((token, name.as_ref().to_string(), MioListener::from(lst)));
263
264        Ok(self)
265    }
266
267    /// Starts processing incoming connections and return server controller.
268    pub fn run(self) -> Server {
269        if self.sockets.is_empty() {
270            panic!("Server should have at least one bound socket");
271        } else {
272            tracing::info!("starting {} workers", self.threads);
273            Server::new(self)
274        }
275    }
276
277    fn next_token(&mut self) -> usize {
278        let token = self.token;
279        self.token += 1;
280        token
281    }
282}
283
284#[cfg(unix)]
285impl ServerBuilder {
286    /// Adds new service to the server using a UDS (unix domain socket) address.
287    ///
288    /// # Worker Count
289    ///
290    /// The `factory` will be instantiated multiple times in most scenarios. The number of
291    /// instantiations is: number of [`workers`](Self::workers()).
292    pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
293    where
294        F: ServerServiceFactory<actix_rt::net::UnixStream>,
295        N: AsRef<str>,
296        U: AsRef<std::path::Path>,
297    {
298        // The path must not exist when we try to bind.
299        // Try to remove it to avoid bind error.
300        if let Err(err) = std::fs::remove_file(addr.as_ref()) {
301            // NotFound is expected and not an issue. Anything else is.
302            if err.kind() != std::io::ErrorKind::NotFound {
303                return Err(err);
304            }
305        }
306
307        let lst = crate::socket::StdUnixListener::bind(addr)?;
308        self.listen_uds(name, lst, factory)
309    }
310
311    /// Adds new service to the server using a UDS (unix domain socket) listener already bound.
312    ///
313    /// Useful when running as a systemd service and a socket FD is acquired externally.
314    ///
315    /// # Worker Count
316    ///
317    /// The `factory` will be instantiated multiple times in most scenarios. The number of
318    /// instantiations is: number of [`workers`](Self::workers()).
319    pub fn listen_uds<F, N: AsRef<str>>(
320        mut self,
321        name: N,
322        lst: crate::socket::StdUnixListener,
323        factory: F,
324    ) -> io::Result<Self>
325    where
326        F: ServerServiceFactory<actix_rt::net::UnixStream>,
327    {
328        use std::net::{IpAddr, Ipv4Addr};
329
330        lst.set_nonblocking(true)?;
331
332        let token = self.next_token();
333        let addr = crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
334
335        self.factories.push(StreamNewService::create(
336            name.as_ref().to_string(),
337            token,
338            factory,
339            addr,
340        ));
341
342        self.sockets
343            .push((token, name.as_ref().to_string(), MioListener::from(lst)));
344
345        Ok(self)
346    }
347}
348
349pub(super) fn bind_addr<S: ToSocketAddrs>(
350    addr: S,
351    backlog: u32,
352    mptcp: &MpTcp,
353) -> io::Result<Vec<MioTcpListener>> {
354    let mut opt_err = None;
355    let mut success = false;
356    let mut sockets = Vec::new();
357
358    for addr in addr.to_socket_addrs()? {
359        match create_mio_tcp_listener(addr, backlog, mptcp) {
360            Ok(lst) => {
361                success = true;
362                sockets.push(lst);
363            }
364            Err(err) => opt_err = Some(err),
365        }
366    }
367
368    if success {
369        Ok(sockets)
370    } else if let Some(err) = opt_err.take() {
371        Err(err)
372    } else {
373        Err(io::Error::new(
374            io::ErrorKind::Other,
375            "Can not bind to address.",
376        ))
377    }
378}