actix_http/h1/
service.rs

1use std::{
2    fmt,
3    marker::PhantomData,
4    net,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite, Framed};
10use actix_rt::net::TcpStream;
11use actix_service::{
12    fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
13};
14use actix_utils::future::ready;
15use futures_core::future::LocalBoxFuture;
16use tracing::error;
17
18use super::{codec::Codec, dispatcher::Dispatcher, ExpectHandler, UpgradeHandler};
19use crate::{
20    body::{BoxBody, MessageBody},
21    config::ServiceConfig,
22    error::DispatchError,
23    service::HttpServiceHandler,
24    ConnectCallback, OnConnectData, Request, Response,
25};
26
27/// `ServiceFactory` implementation for HTTP1 transport
28pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler> {
29    srv: S,
30    cfg: ServiceConfig,
31    expect: X,
32    upgrade: Option<U>,
33    on_connect_ext: Option<Rc<ConnectCallback<T>>>,
34    _phantom: PhantomData<B>,
35}
36
37impl<T, S, B> H1Service<T, S, B>
38where
39    S: ServiceFactory<Request, Config = ()>,
40    S::Error: Into<Response<BoxBody>>,
41    S::InitError: fmt::Debug,
42    S::Response: Into<Response<B>>,
43    B: MessageBody,
44{
45    /// Create new `HttpService` instance with config.
46    pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
47        cfg: ServiceConfig,
48        service: F,
49    ) -> Self {
50        H1Service {
51            cfg,
52            srv: service.into_factory(),
53            expect: ExpectHandler,
54            upgrade: None,
55            on_connect_ext: None,
56            _phantom: PhantomData,
57        }
58    }
59}
60
61impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
62where
63    S: ServiceFactory<Request, Config = ()>,
64    S::Future: 'static,
65    S::Error: Into<Response<BoxBody>>,
66    S::InitError: fmt::Debug,
67    S::Response: Into<Response<B>>,
68
69    B: MessageBody,
70
71    X: ServiceFactory<Request, Config = (), Response = Request>,
72    X::Future: 'static,
73    X::Error: Into<Response<BoxBody>>,
74    X::InitError: fmt::Debug,
75
76    U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
77    U::Future: 'static,
78    U::Error: fmt::Display + Into<Response<BoxBody>>,
79    U::InitError: fmt::Debug,
80{
81    /// Create simple tcp stream service
82    pub fn tcp(
83        self,
84    ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
85    {
86        fn_service(|io: TcpStream| {
87            let peer_addr = io.peer_addr().ok();
88            ready(Ok((io, peer_addr)))
89        })
90        .and_then(self)
91    }
92}
93
94#[cfg(feature = "openssl")]
95mod openssl {
96    use actix_tls::accept::{
97        openssl::{
98            reexports::{Error as SslError, SslAcceptor},
99            Acceptor, TlsStream,
100        },
101        TlsError,
102    };
103
104    use super::*;
105
106    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
107    where
108        S: ServiceFactory<Request, Config = ()>,
109        S::Future: 'static,
110        S::Error: Into<Response<BoxBody>>,
111        S::InitError: fmt::Debug,
112        S::Response: Into<Response<B>>,
113
114        B: MessageBody,
115
116        X: ServiceFactory<Request, Config = (), Response = Request>,
117        X::Future: 'static,
118        X::Error: Into<Response<BoxBody>>,
119        X::InitError: fmt::Debug,
120
121        U: ServiceFactory<
122            (Request, Framed<TlsStream<TcpStream>, Codec>),
123            Config = (),
124            Response = (),
125        >,
126        U::Future: 'static,
127        U::Error: fmt::Display + Into<Response<BoxBody>>,
128        U::InitError: fmt::Debug,
129    {
130        /// Create OpenSSL based service.
131        pub fn openssl(
132            self,
133            acceptor: SslAcceptor,
134        ) -> impl ServiceFactory<
135            TcpStream,
136            Config = (),
137            Response = (),
138            Error = TlsError<SslError, DispatchError>,
139            InitError = (),
140        > {
141            Acceptor::new(acceptor)
142                .map_init_err(|_| {
143                    unreachable!("TLS acceptor service factory does not error on init")
144                })
145                .map_err(TlsError::into_service_error)
146                .map(|io: TlsStream<TcpStream>| {
147                    let peer_addr = io.get_ref().peer_addr().ok();
148                    (io, peer_addr)
149                })
150                .and_then(self.map_err(TlsError::Service))
151        }
152    }
153}
154
155#[cfg(feature = "rustls-0_20")]
156mod rustls_0_20 {
157    use std::io;
158
159    use actix_service::ServiceFactoryExt as _;
160    use actix_tls::accept::{
161        rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
162        TlsError,
163    };
164
165    use super::*;
166
167    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
168    where
169        S: ServiceFactory<Request, Config = ()>,
170        S::Future: 'static,
171        S::Error: Into<Response<BoxBody>>,
172        S::InitError: fmt::Debug,
173        S::Response: Into<Response<B>>,
174
175        B: MessageBody,
176
177        X: ServiceFactory<Request, Config = (), Response = Request>,
178        X::Future: 'static,
179        X::Error: Into<Response<BoxBody>>,
180        X::InitError: fmt::Debug,
181
182        U: ServiceFactory<
183            (Request, Framed<TlsStream<TcpStream>, Codec>),
184            Config = (),
185            Response = (),
186        >,
187        U::Future: 'static,
188        U::Error: fmt::Display + Into<Response<BoxBody>>,
189        U::InitError: fmt::Debug,
190    {
191        /// Create Rustls v0.20 based service.
192        pub fn rustls(
193            self,
194            config: ServerConfig,
195        ) -> impl ServiceFactory<
196            TcpStream,
197            Config = (),
198            Response = (),
199            Error = TlsError<io::Error, DispatchError>,
200            InitError = (),
201        > {
202            Acceptor::new(config)
203                .map_init_err(|_| {
204                    unreachable!("TLS acceptor service factory does not error on init")
205                })
206                .map_err(TlsError::into_service_error)
207                .map(|io: TlsStream<TcpStream>| {
208                    let peer_addr = io.get_ref().0.peer_addr().ok();
209                    (io, peer_addr)
210                })
211                .and_then(self.map_err(TlsError::Service))
212        }
213    }
214}
215
216#[cfg(feature = "rustls-0_21")]
217mod rustls_0_21 {
218    use std::io;
219
220    use actix_service::ServiceFactoryExt as _;
221    use actix_tls::accept::{
222        rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
223        TlsError,
224    };
225
226    use super::*;
227
228    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
229    where
230        S: ServiceFactory<Request, Config = ()>,
231        S::Future: 'static,
232        S::Error: Into<Response<BoxBody>>,
233        S::InitError: fmt::Debug,
234        S::Response: Into<Response<B>>,
235
236        B: MessageBody,
237
238        X: ServiceFactory<Request, Config = (), Response = Request>,
239        X::Future: 'static,
240        X::Error: Into<Response<BoxBody>>,
241        X::InitError: fmt::Debug,
242
243        U: ServiceFactory<
244            (Request, Framed<TlsStream<TcpStream>, Codec>),
245            Config = (),
246            Response = (),
247        >,
248        U::Future: 'static,
249        U::Error: fmt::Display + Into<Response<BoxBody>>,
250        U::InitError: fmt::Debug,
251    {
252        /// Create Rustls v0.21 based service.
253        pub fn rustls_021(
254            self,
255            config: ServerConfig,
256        ) -> impl ServiceFactory<
257            TcpStream,
258            Config = (),
259            Response = (),
260            Error = TlsError<io::Error, DispatchError>,
261            InitError = (),
262        > {
263            Acceptor::new(config)
264                .map_init_err(|_| {
265                    unreachable!("TLS acceptor service factory does not error on init")
266                })
267                .map_err(TlsError::into_service_error)
268                .map(|io: TlsStream<TcpStream>| {
269                    let peer_addr = io.get_ref().0.peer_addr().ok();
270                    (io, peer_addr)
271                })
272                .and_then(self.map_err(TlsError::Service))
273        }
274    }
275}
276
277#[cfg(feature = "rustls-0_22")]
278mod rustls_0_22 {
279    use std::io;
280
281    use actix_service::ServiceFactoryExt as _;
282    use actix_tls::accept::{
283        rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
284        TlsError,
285    };
286
287    use super::*;
288
289    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
290    where
291        S: ServiceFactory<Request, Config = ()>,
292        S::Future: 'static,
293        S::Error: Into<Response<BoxBody>>,
294        S::InitError: fmt::Debug,
295        S::Response: Into<Response<B>>,
296
297        B: MessageBody,
298
299        X: ServiceFactory<Request, Config = (), Response = Request>,
300        X::Future: 'static,
301        X::Error: Into<Response<BoxBody>>,
302        X::InitError: fmt::Debug,
303
304        U: ServiceFactory<
305            (Request, Framed<TlsStream<TcpStream>, Codec>),
306            Config = (),
307            Response = (),
308        >,
309        U::Future: 'static,
310        U::Error: fmt::Display + Into<Response<BoxBody>>,
311        U::InitError: fmt::Debug,
312    {
313        /// Create Rustls v0.22 based service.
314        pub fn rustls_0_22(
315            self,
316            config: ServerConfig,
317        ) -> impl ServiceFactory<
318            TcpStream,
319            Config = (),
320            Response = (),
321            Error = TlsError<io::Error, DispatchError>,
322            InitError = (),
323        > {
324            Acceptor::new(config)
325                .map_init_err(|_| {
326                    unreachable!("TLS acceptor service factory does not error on init")
327                })
328                .map_err(TlsError::into_service_error)
329                .map(|io: TlsStream<TcpStream>| {
330                    let peer_addr = io.get_ref().0.peer_addr().ok();
331                    (io, peer_addr)
332                })
333                .and_then(self.map_err(TlsError::Service))
334        }
335    }
336}
337
338#[cfg(feature = "rustls-0_23")]
339mod rustls_0_23 {
340    use std::io;
341
342    use actix_service::ServiceFactoryExt as _;
343    use actix_tls::accept::{
344        rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
345        TlsError,
346    };
347
348    use super::*;
349
350    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
351    where
352        S: ServiceFactory<Request, Config = ()>,
353        S::Future: 'static,
354        S::Error: Into<Response<BoxBody>>,
355        S::InitError: fmt::Debug,
356        S::Response: Into<Response<B>>,
357
358        B: MessageBody,
359
360        X: ServiceFactory<Request, Config = (), Response = Request>,
361        X::Future: 'static,
362        X::Error: Into<Response<BoxBody>>,
363        X::InitError: fmt::Debug,
364
365        U: ServiceFactory<
366            (Request, Framed<TlsStream<TcpStream>, Codec>),
367            Config = (),
368            Response = (),
369        >,
370        U::Future: 'static,
371        U::Error: fmt::Display + Into<Response<BoxBody>>,
372        U::InitError: fmt::Debug,
373    {
374        /// Create Rustls v0.23 based service.
375        pub fn rustls_0_23(
376            self,
377            config: ServerConfig,
378        ) -> impl ServiceFactory<
379            TcpStream,
380            Config = (),
381            Response = (),
382            Error = TlsError<io::Error, DispatchError>,
383            InitError = (),
384        > {
385            Acceptor::new(config)
386                .map_init_err(|_| {
387                    unreachable!("TLS acceptor service factory does not error on init")
388                })
389                .map_err(TlsError::into_service_error)
390                .map(|io: TlsStream<TcpStream>| {
391                    let peer_addr = io.get_ref().0.peer_addr().ok();
392                    (io, peer_addr)
393                })
394                .and_then(self.map_err(TlsError::Service))
395        }
396    }
397}
398
399impl<T, S, B, X, U> H1Service<T, S, B, X, U>
400where
401    S: ServiceFactory<Request, Config = ()>,
402    S::Error: Into<Response<BoxBody>>,
403    S::Response: Into<Response<B>>,
404    S::InitError: fmt::Debug,
405    B: MessageBody,
406{
407    pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
408    where
409        X1: ServiceFactory<Request, Response = Request>,
410        X1::Error: Into<Response<BoxBody>>,
411        X1::InitError: fmt::Debug,
412    {
413        H1Service {
414            expect,
415            cfg: self.cfg,
416            srv: self.srv,
417            upgrade: self.upgrade,
418            on_connect_ext: self.on_connect_ext,
419            _phantom: PhantomData,
420        }
421    }
422
423    pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
424    where
425        U1: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
426        U1::Error: fmt::Display,
427        U1::InitError: fmt::Debug,
428    {
429        H1Service {
430            upgrade,
431            cfg: self.cfg,
432            srv: self.srv,
433            expect: self.expect,
434            on_connect_ext: self.on_connect_ext,
435            _phantom: PhantomData,
436        }
437    }
438
439    /// Set on connect callback.
440    pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
441        self.on_connect_ext = f;
442        self
443    }
444}
445
446impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)> for H1Service<T, S, B, X, U>
447where
448    T: AsyncRead + AsyncWrite + Unpin + 'static,
449
450    S: ServiceFactory<Request, Config = ()>,
451    S::Future: 'static,
452    S::Error: Into<Response<BoxBody>>,
453    S::Response: Into<Response<B>>,
454    S::InitError: fmt::Debug,
455
456    B: MessageBody,
457
458    X: ServiceFactory<Request, Config = (), Response = Request>,
459    X::Future: 'static,
460    X::Error: Into<Response<BoxBody>>,
461    X::InitError: fmt::Debug,
462
463    U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
464    U::Future: 'static,
465    U::Error: fmt::Display + Into<Response<BoxBody>>,
466    U::InitError: fmt::Debug,
467{
468    type Response = ();
469    type Error = DispatchError;
470    type Config = ();
471    type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
472    type InitError = ();
473    type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
474
475    fn new_service(&self, _: ()) -> Self::Future {
476        let service = self.srv.new_service(());
477        let expect = self.expect.new_service(());
478        let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
479        let on_connect_ext = self.on_connect_ext.clone();
480        let cfg = self.cfg.clone();
481
482        Box::pin(async move {
483            let expect = expect.await.map_err(|err| {
484                tracing::error!("Initialization of HTTP expect service error: {err:?}");
485            })?;
486
487            let upgrade = match upgrade {
488                Some(upgrade) => {
489                    let upgrade = upgrade.await.map_err(|err| {
490                        tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
491                    })?;
492                    Some(upgrade)
493                }
494                None => None,
495            };
496
497            let service = service
498                .await
499                .map_err(|err| error!("Initialization of HTTP service error: {err:?}"))?;
500
501            Ok(H1ServiceHandler::new(
502                cfg,
503                service,
504                expect,
505                upgrade,
506                on_connect_ext,
507            ))
508        })
509    }
510}
511
512/// `Service` implementation for HTTP/1 transport
513pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
514
515impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)> for HttpServiceHandler<T, S, B, X, U>
516where
517    T: AsyncRead + AsyncWrite + Unpin,
518
519    S: Service<Request>,
520    S::Error: Into<Response<BoxBody>>,
521    S::Response: Into<Response<B>>,
522
523    B: MessageBody,
524
525    X: Service<Request, Response = Request>,
526    X::Error: Into<Response<BoxBody>>,
527
528    U: Service<(Request, Framed<T, Codec>), Response = ()>,
529    U::Error: fmt::Display + Into<Response<BoxBody>>,
530{
531    type Response = ();
532    type Error = DispatchError;
533    type Future = Dispatcher<T, S, B, X, U>;
534
535    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
536        self._poll_ready(cx).map_err(|err| {
537            error!("HTTP/1 service readiness error: {:?}", err);
538            DispatchError::Service(err)
539        })
540    }
541
542    fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
543        let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
544        Dispatcher::new(io, Rc::clone(&self.flow), self.cfg.clone(), addr, conn_data)
545    }
546}