1use std::{
2 fmt,
3 marker::PhantomData,
4 net,
5 rc::Rc,
6 task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite, Framed};
10use actix_rt::net::TcpStream;
11use actix_service::{
12 fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
13};
14use actix_utils::future::ready;
15use futures_core::future::LocalBoxFuture;
16use tracing::error;
17
18use super::{codec::Codec, dispatcher::Dispatcher, ExpectHandler, UpgradeHandler};
19use crate::{
20 body::{BoxBody, MessageBody},
21 config::ServiceConfig,
22 error::DispatchError,
23 service::HttpServiceHandler,
24 ConnectCallback, OnConnectData, Request, Response,
25};
26
27pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler> {
29 srv: S,
30 cfg: ServiceConfig,
31 expect: X,
32 upgrade: Option<U>,
33 on_connect_ext: Option<Rc<ConnectCallback<T>>>,
34 _phantom: PhantomData<B>,
35}
36
37impl<T, S, B> H1Service<T, S, B>
38where
39 S: ServiceFactory<Request, Config = ()>,
40 S::Error: Into<Response<BoxBody>>,
41 S::InitError: fmt::Debug,
42 S::Response: Into<Response<B>>,
43 B: MessageBody,
44{
45 pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
47 cfg: ServiceConfig,
48 service: F,
49 ) -> Self {
50 H1Service {
51 cfg,
52 srv: service.into_factory(),
53 expect: ExpectHandler,
54 upgrade: None,
55 on_connect_ext: None,
56 _phantom: PhantomData,
57 }
58 }
59}
60
61impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
62where
63 S: ServiceFactory<Request, Config = ()>,
64 S::Future: 'static,
65 S::Error: Into<Response<BoxBody>>,
66 S::InitError: fmt::Debug,
67 S::Response: Into<Response<B>>,
68
69 B: MessageBody,
70
71 X: ServiceFactory<Request, Config = (), Response = Request>,
72 X::Future: 'static,
73 X::Error: Into<Response<BoxBody>>,
74 X::InitError: fmt::Debug,
75
76 U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
77 U::Future: 'static,
78 U::Error: fmt::Display + Into<Response<BoxBody>>,
79 U::InitError: fmt::Debug,
80{
81 pub fn tcp(
83 self,
84 ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
85 {
86 fn_service(|io: TcpStream| {
87 let peer_addr = io.peer_addr().ok();
88 ready(Ok((io, peer_addr)))
89 })
90 .and_then(self)
91 }
92}
93
94#[cfg(feature = "openssl")]
95mod openssl {
96 use actix_tls::accept::{
97 openssl::{
98 reexports::{Error as SslError, SslAcceptor},
99 Acceptor, TlsStream,
100 },
101 TlsError,
102 };
103
104 use super::*;
105
106 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
107 where
108 S: ServiceFactory<Request, Config = ()>,
109 S::Future: 'static,
110 S::Error: Into<Response<BoxBody>>,
111 S::InitError: fmt::Debug,
112 S::Response: Into<Response<B>>,
113
114 B: MessageBody,
115
116 X: ServiceFactory<Request, Config = (), Response = Request>,
117 X::Future: 'static,
118 X::Error: Into<Response<BoxBody>>,
119 X::InitError: fmt::Debug,
120
121 U: ServiceFactory<
122 (Request, Framed<TlsStream<TcpStream>, Codec>),
123 Config = (),
124 Response = (),
125 >,
126 U::Future: 'static,
127 U::Error: fmt::Display + Into<Response<BoxBody>>,
128 U::InitError: fmt::Debug,
129 {
130 pub fn openssl(
132 self,
133 acceptor: SslAcceptor,
134 ) -> impl ServiceFactory<
135 TcpStream,
136 Config = (),
137 Response = (),
138 Error = TlsError<SslError, DispatchError>,
139 InitError = (),
140 > {
141 Acceptor::new(acceptor)
142 .map_init_err(|_| {
143 unreachable!("TLS acceptor service factory does not error on init")
144 })
145 .map_err(TlsError::into_service_error)
146 .map(|io: TlsStream<TcpStream>| {
147 let peer_addr = io.get_ref().peer_addr().ok();
148 (io, peer_addr)
149 })
150 .and_then(self.map_err(TlsError::Service))
151 }
152 }
153}
154
155#[cfg(feature = "rustls-0_20")]
156mod rustls_0_20 {
157 use std::io;
158
159 use actix_service::ServiceFactoryExt as _;
160 use actix_tls::accept::{
161 rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
162 TlsError,
163 };
164
165 use super::*;
166
167 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
168 where
169 S: ServiceFactory<Request, Config = ()>,
170 S::Future: 'static,
171 S::Error: Into<Response<BoxBody>>,
172 S::InitError: fmt::Debug,
173 S::Response: Into<Response<B>>,
174
175 B: MessageBody,
176
177 X: ServiceFactory<Request, Config = (), Response = Request>,
178 X::Future: 'static,
179 X::Error: Into<Response<BoxBody>>,
180 X::InitError: fmt::Debug,
181
182 U: ServiceFactory<
183 (Request, Framed<TlsStream<TcpStream>, Codec>),
184 Config = (),
185 Response = (),
186 >,
187 U::Future: 'static,
188 U::Error: fmt::Display + Into<Response<BoxBody>>,
189 U::InitError: fmt::Debug,
190 {
191 pub fn rustls(
193 self,
194 config: ServerConfig,
195 ) -> impl ServiceFactory<
196 TcpStream,
197 Config = (),
198 Response = (),
199 Error = TlsError<io::Error, DispatchError>,
200 InitError = (),
201 > {
202 Acceptor::new(config)
203 .map_init_err(|_| {
204 unreachable!("TLS acceptor service factory does not error on init")
205 })
206 .map_err(TlsError::into_service_error)
207 .map(|io: TlsStream<TcpStream>| {
208 let peer_addr = io.get_ref().0.peer_addr().ok();
209 (io, peer_addr)
210 })
211 .and_then(self.map_err(TlsError::Service))
212 }
213 }
214}
215
216#[cfg(feature = "rustls-0_21")]
217mod rustls_0_21 {
218 use std::io;
219
220 use actix_service::ServiceFactoryExt as _;
221 use actix_tls::accept::{
222 rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
223 TlsError,
224 };
225
226 use super::*;
227
228 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
229 where
230 S: ServiceFactory<Request, Config = ()>,
231 S::Future: 'static,
232 S::Error: Into<Response<BoxBody>>,
233 S::InitError: fmt::Debug,
234 S::Response: Into<Response<B>>,
235
236 B: MessageBody,
237
238 X: ServiceFactory<Request, Config = (), Response = Request>,
239 X::Future: 'static,
240 X::Error: Into<Response<BoxBody>>,
241 X::InitError: fmt::Debug,
242
243 U: ServiceFactory<
244 (Request, Framed<TlsStream<TcpStream>, Codec>),
245 Config = (),
246 Response = (),
247 >,
248 U::Future: 'static,
249 U::Error: fmt::Display + Into<Response<BoxBody>>,
250 U::InitError: fmt::Debug,
251 {
252 pub fn rustls_021(
254 self,
255 config: ServerConfig,
256 ) -> impl ServiceFactory<
257 TcpStream,
258 Config = (),
259 Response = (),
260 Error = TlsError<io::Error, DispatchError>,
261 InitError = (),
262 > {
263 Acceptor::new(config)
264 .map_init_err(|_| {
265 unreachable!("TLS acceptor service factory does not error on init")
266 })
267 .map_err(TlsError::into_service_error)
268 .map(|io: TlsStream<TcpStream>| {
269 let peer_addr = io.get_ref().0.peer_addr().ok();
270 (io, peer_addr)
271 })
272 .and_then(self.map_err(TlsError::Service))
273 }
274 }
275}
276
277#[cfg(feature = "rustls-0_22")]
278mod rustls_0_22 {
279 use std::io;
280
281 use actix_service::ServiceFactoryExt as _;
282 use actix_tls::accept::{
283 rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
284 TlsError,
285 };
286
287 use super::*;
288
289 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
290 where
291 S: ServiceFactory<Request, Config = ()>,
292 S::Future: 'static,
293 S::Error: Into<Response<BoxBody>>,
294 S::InitError: fmt::Debug,
295 S::Response: Into<Response<B>>,
296
297 B: MessageBody,
298
299 X: ServiceFactory<Request, Config = (), Response = Request>,
300 X::Future: 'static,
301 X::Error: Into<Response<BoxBody>>,
302 X::InitError: fmt::Debug,
303
304 U: ServiceFactory<
305 (Request, Framed<TlsStream<TcpStream>, Codec>),
306 Config = (),
307 Response = (),
308 >,
309 U::Future: 'static,
310 U::Error: fmt::Display + Into<Response<BoxBody>>,
311 U::InitError: fmt::Debug,
312 {
313 pub fn rustls_0_22(
315 self,
316 config: ServerConfig,
317 ) -> impl ServiceFactory<
318 TcpStream,
319 Config = (),
320 Response = (),
321 Error = TlsError<io::Error, DispatchError>,
322 InitError = (),
323 > {
324 Acceptor::new(config)
325 .map_init_err(|_| {
326 unreachable!("TLS acceptor service factory does not error on init")
327 })
328 .map_err(TlsError::into_service_error)
329 .map(|io: TlsStream<TcpStream>| {
330 let peer_addr = io.get_ref().0.peer_addr().ok();
331 (io, peer_addr)
332 })
333 .and_then(self.map_err(TlsError::Service))
334 }
335 }
336}
337
338#[cfg(feature = "rustls-0_23")]
339mod rustls_0_23 {
340 use std::io;
341
342 use actix_service::ServiceFactoryExt as _;
343 use actix_tls::accept::{
344 rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
345 TlsError,
346 };
347
348 use super::*;
349
350 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
351 where
352 S: ServiceFactory<Request, Config = ()>,
353 S::Future: 'static,
354 S::Error: Into<Response<BoxBody>>,
355 S::InitError: fmt::Debug,
356 S::Response: Into<Response<B>>,
357
358 B: MessageBody,
359
360 X: ServiceFactory<Request, Config = (), Response = Request>,
361 X::Future: 'static,
362 X::Error: Into<Response<BoxBody>>,
363 X::InitError: fmt::Debug,
364
365 U: ServiceFactory<
366 (Request, Framed<TlsStream<TcpStream>, Codec>),
367 Config = (),
368 Response = (),
369 >,
370 U::Future: 'static,
371 U::Error: fmt::Display + Into<Response<BoxBody>>,
372 U::InitError: fmt::Debug,
373 {
374 pub fn rustls_0_23(
376 self,
377 config: ServerConfig,
378 ) -> impl ServiceFactory<
379 TcpStream,
380 Config = (),
381 Response = (),
382 Error = TlsError<io::Error, DispatchError>,
383 InitError = (),
384 > {
385 Acceptor::new(config)
386 .map_init_err(|_| {
387 unreachable!("TLS acceptor service factory does not error on init")
388 })
389 .map_err(TlsError::into_service_error)
390 .map(|io: TlsStream<TcpStream>| {
391 let peer_addr = io.get_ref().0.peer_addr().ok();
392 (io, peer_addr)
393 })
394 .and_then(self.map_err(TlsError::Service))
395 }
396 }
397}
398
399impl<T, S, B, X, U> H1Service<T, S, B, X, U>
400where
401 S: ServiceFactory<Request, Config = ()>,
402 S::Error: Into<Response<BoxBody>>,
403 S::Response: Into<Response<B>>,
404 S::InitError: fmt::Debug,
405 B: MessageBody,
406{
407 pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
408 where
409 X1: ServiceFactory<Request, Response = Request>,
410 X1::Error: Into<Response<BoxBody>>,
411 X1::InitError: fmt::Debug,
412 {
413 H1Service {
414 expect,
415 cfg: self.cfg,
416 srv: self.srv,
417 upgrade: self.upgrade,
418 on_connect_ext: self.on_connect_ext,
419 _phantom: PhantomData,
420 }
421 }
422
423 pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
424 where
425 U1: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
426 U1::Error: fmt::Display,
427 U1::InitError: fmt::Debug,
428 {
429 H1Service {
430 upgrade,
431 cfg: self.cfg,
432 srv: self.srv,
433 expect: self.expect,
434 on_connect_ext: self.on_connect_ext,
435 _phantom: PhantomData,
436 }
437 }
438
439 pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
441 self.on_connect_ext = f;
442 self
443 }
444}
445
446impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)> for H1Service<T, S, B, X, U>
447where
448 T: AsyncRead + AsyncWrite + Unpin + 'static,
449
450 S: ServiceFactory<Request, Config = ()>,
451 S::Future: 'static,
452 S::Error: Into<Response<BoxBody>>,
453 S::Response: Into<Response<B>>,
454 S::InitError: fmt::Debug,
455
456 B: MessageBody,
457
458 X: ServiceFactory<Request, Config = (), Response = Request>,
459 X::Future: 'static,
460 X::Error: Into<Response<BoxBody>>,
461 X::InitError: fmt::Debug,
462
463 U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
464 U::Future: 'static,
465 U::Error: fmt::Display + Into<Response<BoxBody>>,
466 U::InitError: fmt::Debug,
467{
468 type Response = ();
469 type Error = DispatchError;
470 type Config = ();
471 type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
472 type InitError = ();
473 type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
474
475 fn new_service(&self, _: ()) -> Self::Future {
476 let service = self.srv.new_service(());
477 let expect = self.expect.new_service(());
478 let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
479 let on_connect_ext = self.on_connect_ext.clone();
480 let cfg = self.cfg.clone();
481
482 Box::pin(async move {
483 let expect = expect.await.map_err(|err| {
484 tracing::error!("Initialization of HTTP expect service error: {err:?}");
485 })?;
486
487 let upgrade = match upgrade {
488 Some(upgrade) => {
489 let upgrade = upgrade.await.map_err(|err| {
490 tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
491 })?;
492 Some(upgrade)
493 }
494 None => None,
495 };
496
497 let service = service
498 .await
499 .map_err(|err| error!("Initialization of HTTP service error: {err:?}"))?;
500
501 Ok(H1ServiceHandler::new(
502 cfg,
503 service,
504 expect,
505 upgrade,
506 on_connect_ext,
507 ))
508 })
509 }
510}
511
512pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
514
515impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)> for HttpServiceHandler<T, S, B, X, U>
516where
517 T: AsyncRead + AsyncWrite + Unpin,
518
519 S: Service<Request>,
520 S::Error: Into<Response<BoxBody>>,
521 S::Response: Into<Response<B>>,
522
523 B: MessageBody,
524
525 X: Service<Request, Response = Request>,
526 X::Error: Into<Response<BoxBody>>,
527
528 U: Service<(Request, Framed<T, Codec>), Response = ()>,
529 U::Error: fmt::Display + Into<Response<BoxBody>>,
530{
531 type Response = ();
532 type Error = DispatchError;
533 type Future = Dispatcher<T, S, B, X, U>;
534
535 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
536 self._poll_ready(cx).map_err(|err| {
537 error!("HTTP/1 service readiness error: {:?}", err);
538 DispatchError::Service(err)
539 })
540 }
541
542 fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
543 let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
544 Dispatcher::new(io, Rc::clone(&self.flow), self.cfg.clone(), addr, conn_data)
545 }
546}