actix_server/builder.rs
1use std::{io, num::NonZeroUsize, time::Duration};
2
3use actix_rt::net::TcpStream;
4use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
5
6use crate::{
7 server::ServerCommand,
8 service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
9 socket::{create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs},
10 worker::ServerWorkerConfig,
11 Server,
12};
13
14/// Multipath TCP (MPTCP) preference.
15///
16/// Currently only useful on Linux.
17///
18#[cfg_attr(target_os = "linux", doc = "Also see [`ServerBuilder::mptcp()`].")]
19#[derive(Debug, Clone)]
20pub enum MpTcp {
21 /// MPTCP will not be used when binding sockets.
22 Disabled,
23
24 /// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
25 /// attempted, too.
26 TcpFallback,
27
28 /// MPTCP will be used when binding sockets (with no fallback).
29 NoFallback,
30}
31
32/// [Server] builder.
33pub struct ServerBuilder {
34 pub(crate) threads: usize,
35 pub(crate) token: usize,
36 pub(crate) backlog: u32,
37 pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
38 pub(crate) sockets: Vec<(usize, String, MioListener)>,
39 pub(crate) mptcp: MpTcp,
40 pub(crate) exit: bool,
41 pub(crate) listen_os_signals: bool,
42 pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
43 pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
44 pub(crate) worker_config: ServerWorkerConfig,
45}
46
47impl Default for ServerBuilder {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl ServerBuilder {
54 /// Create new Server builder instance
55 pub fn new() -> ServerBuilder {
56 let (cmd_tx, cmd_rx) = unbounded_channel();
57
58 ServerBuilder {
59 threads: std::thread::available_parallelism().map_or(2, NonZeroUsize::get),
60 token: 0,
61 factories: Vec::new(),
62 sockets: Vec::new(),
63 backlog: 2048,
64 mptcp: MpTcp::Disabled,
65 exit: false,
66 listen_os_signals: true,
67 cmd_tx,
68 cmd_rx,
69 worker_config: ServerWorkerConfig::default(),
70 }
71 }
72
73 /// Sets number of workers to start.
74 ///
75 /// See [`bind()`](Self::bind()) for more details on how worker count affects the number of
76 /// server factory instantiations.
77 ///
78 /// The default worker count is the determined by [`std::thread::available_parallelism()`]. See
79 /// its documentation to determine what behavior you should expect when server is run.
80 ///
81 /// `num` must be greater than 0.
82 ///
83 /// # Panics
84 ///
85 /// Panics if `num` is 0.
86 pub fn workers(mut self, num: usize) -> Self {
87 assert_ne!(num, 0, "workers must be greater than 0");
88 self.threads = num;
89 self
90 }
91
92 /// Set max number of threads for each worker's blocking task thread pool.
93 ///
94 /// One thread pool is set up **per worker**; not shared across workers.
95 ///
96 /// # Examples:
97 /// ```
98 /// # use actix_server::ServerBuilder;
99 /// let builder = ServerBuilder::new()
100 /// .workers(4) // server has 4 worker thread.
101 /// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
102 /// ```
103 ///
104 /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
105 pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
106 self.worker_config.max_blocking_threads(num);
107 self
108 }
109
110 /// Set the maximum number of pending connections.
111 ///
112 /// This refers to the number of clients that can be waiting to be served. Exceeding this number
113 /// results in the client getting an error when attempting to connect. It should only affect
114 /// servers under significant load.
115 ///
116 /// Generally set in the 64-2048 range. Default value is 2048.
117 ///
118 /// This method should be called before `bind()` method call.
119 pub fn backlog(mut self, num: u32) -> Self {
120 self.backlog = num;
121 self
122 }
123
124 /// Sets MultiPath TCP (MPTCP) preference on bound sockets.
125 ///
126 /// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
127 /// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
128 /// for more info about MPTCP itself.
129 ///
130 /// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
131 /// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
132 ///
133 /// This method will have no effect if called after a `bind()`.
134 ///
135 /// [mptcp.dev]: https://www.mptcp.dev
136 #[cfg(target_os = "linux")]
137 pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
138 self.mptcp = mptcp_enabled;
139 self
140 }
141
142 /// Sets the maximum per-worker number of concurrent connections.
143 ///
144 /// All socket listeners will stop accepting connections when this limit is reached for
145 /// each worker.
146 ///
147 /// By default max connections is set to a 25k per worker.
148 pub fn max_concurrent_connections(mut self, num: usize) -> Self {
149 self.worker_config.max_concurrent_connections(num);
150 self
151 }
152
153 #[doc(hidden)]
154 #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
155 pub fn maxconn(self, num: usize) -> Self {
156 self.max_concurrent_connections(num)
157 }
158
159 /// Sets flag to stop Actix `System` after server shutdown.
160 ///
161 /// This has no effect when server is running in a Tokio-only runtime.
162 pub fn system_exit(mut self) -> Self {
163 self.exit = true;
164 self
165 }
166
167 /// Disables OS signal handling.
168 pub fn disable_signals(mut self) -> Self {
169 self.listen_os_signals = false;
170 self
171 }
172
173 /// Timeout for graceful workers shutdown in seconds.
174 ///
175 /// After receiving a stop signal, workers have this much time to finish serving requests.
176 /// Workers still alive after the timeout are force dropped.
177 ///
178 /// By default shutdown timeout sets to 30 seconds.
179 pub fn shutdown_timeout(mut self, sec: u64) -> Self {
180 self.worker_config
181 .shutdown_timeout(Duration::from_secs(sec));
182 self
183 }
184
185 /// Adds new service to the server.
186 ///
187 /// Note that, if a DNS lookup is required, resolving hostnames is a blocking operation.
188 ///
189 /// # Worker Count
190 ///
191 /// The `factory` will be instantiated multiple times in most scenarios. The number of
192 /// instantiations is number of [`workers`](Self::workers()) × number of sockets resolved by
193 /// `addrs`.
194 ///
195 /// For example, if you've manually set [`workers`](Self::workers()) to 2, and use `127.0.0.1`
196 /// as the bind `addrs`, then `factory` will be instantiated twice. However, using `localhost`
197 /// as the bind `addrs` can often resolve to both `127.0.0.1` (IPv4) _and_ `::1` (IPv6), causing
198 /// the `factory` to be instantiated 4 times (2 workers × 2 bind addresses).
199 ///
200 /// Using a bind address of `0.0.0.0`, which signals to use all interfaces, may also multiple
201 /// the number of instantiations in a similar way.
202 ///
203 /// # Errors
204 ///
205 /// Returns an `io::Error` if:
206 /// - `addrs` cannot be resolved into one or more socket addresses;
207 /// - all the resolved socket addresses are already bound.
208 pub fn bind<F, U, N>(mut self, name: N, addrs: U, factory: F) -> io::Result<Self>
209 where
210 F: ServerServiceFactory<TcpStream>,
211 U: ToSocketAddrs,
212 N: AsRef<str>,
213 {
214 let sockets = bind_addr(addrs, self.backlog, &self.mptcp)?;
215
216 tracing::trace!("binding server to: {sockets:?}");
217
218 for lst in sockets {
219 let token = self.next_token();
220
221 self.factories.push(StreamNewService::create(
222 name.as_ref().to_string(),
223 token,
224 factory.clone(),
225 lst.local_addr()?,
226 ));
227
228 self.sockets
229 .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
230 }
231
232 Ok(self)
233 }
234
235 /// Adds service to the server using a socket listener already bound.
236 ///
237 /// # Worker Count
238 ///
239 /// The `factory` will be instantiated multiple times in most scenarios. The number of
240 /// instantiations is: number of [`workers`](Self::workers()).
241 pub fn listen<F, N: AsRef<str>>(
242 mut self,
243 name: N,
244 lst: StdTcpListener,
245 factory: F,
246 ) -> io::Result<Self>
247 where
248 F: ServerServiceFactory<TcpStream>,
249 {
250 lst.set_nonblocking(true)?;
251 let addr = lst.local_addr()?;
252
253 let token = self.next_token();
254 self.factories.push(StreamNewService::create(
255 name.as_ref().to_string(),
256 token,
257 factory,
258 addr,
259 ));
260
261 self.sockets
262 .push((token, name.as_ref().to_string(), MioListener::from(lst)));
263
264 Ok(self)
265 }
266
267 /// Starts processing incoming connections and return server controller.
268 pub fn run(self) -> Server {
269 if self.sockets.is_empty() {
270 panic!("Server should have at least one bound socket");
271 } else {
272 tracing::info!("starting {} workers", self.threads);
273 Server::new(self)
274 }
275 }
276
277 fn next_token(&mut self) -> usize {
278 let token = self.token;
279 self.token += 1;
280 token
281 }
282}
283
284#[cfg(unix)]
285impl ServerBuilder {
286 /// Adds new service to the server using a UDS (unix domain socket) address.
287 ///
288 /// # Worker Count
289 ///
290 /// The `factory` will be instantiated multiple times in most scenarios. The number of
291 /// instantiations is: number of [`workers`](Self::workers()).
292 pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
293 where
294 F: ServerServiceFactory<actix_rt::net::UnixStream>,
295 N: AsRef<str>,
296 U: AsRef<std::path::Path>,
297 {
298 // The path must not exist when we try to bind.
299 // Try to remove it to avoid bind error.
300 if let Err(err) = std::fs::remove_file(addr.as_ref()) {
301 // NotFound is expected and not an issue. Anything else is.
302 if err.kind() != std::io::ErrorKind::NotFound {
303 return Err(err);
304 }
305 }
306
307 let lst = crate::socket::StdUnixListener::bind(addr)?;
308 self.listen_uds(name, lst, factory)
309 }
310
311 /// Adds new service to the server using a UDS (unix domain socket) listener already bound.
312 ///
313 /// Useful when running as a systemd service and a socket FD is acquired externally.
314 ///
315 /// # Worker Count
316 ///
317 /// The `factory` will be instantiated multiple times in most scenarios. The number of
318 /// instantiations is: number of [`workers`](Self::workers()).
319 pub fn listen_uds<F, N: AsRef<str>>(
320 mut self,
321 name: N,
322 lst: crate::socket::StdUnixListener,
323 factory: F,
324 ) -> io::Result<Self>
325 where
326 F: ServerServiceFactory<actix_rt::net::UnixStream>,
327 {
328 use std::net::{IpAddr, Ipv4Addr};
329
330 lst.set_nonblocking(true)?;
331
332 let token = self.next_token();
333 let addr = crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
334
335 self.factories.push(StreamNewService::create(
336 name.as_ref().to_string(),
337 token,
338 factory,
339 addr,
340 ));
341
342 self.sockets
343 .push((token, name.as_ref().to_string(), MioListener::from(lst)));
344
345 Ok(self)
346 }
347}
348
349pub(super) fn bind_addr<S: ToSocketAddrs>(
350 addr: S,
351 backlog: u32,
352 mptcp: &MpTcp,
353) -> io::Result<Vec<MioTcpListener>> {
354 let mut opt_err = None;
355 let mut success = false;
356 let mut sockets = Vec::new();
357
358 for addr in addr.to_socket_addrs()? {
359 match create_mio_tcp_listener(addr, backlog, mptcp) {
360 Ok(lst) => {
361 success = true;
362 sockets.push(lst);
363 }
364 Err(err) => opt_err = Some(err),
365 }
366 }
367
368 if success {
369 Ok(sockets)
370 } else if let Some(err) = opt_err.take() {
371 Err(err)
372 } else {
373 Err(io::Error::new(
374 io::ErrorKind::Other,
375 "Can not bind to address.",
376 ))
377 }
378}