actix_http/
service.rs

1use std::{
2    fmt,
3    future::Future,
4    marker::PhantomData,
5    net,
6    pin::Pin,
7    rc::Rc,
8    task::{Context, Poll},
9};
10
11use actix_codec::{AsyncRead, AsyncWrite, Framed};
12use actix_rt::net::TcpStream;
13use actix_service::{
14    fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
15};
16use futures_core::{future::LocalBoxFuture, ready};
17use pin_project_lite::pin_project;
18use tracing::error;
19
20use crate::{
21    body::{BoxBody, MessageBody},
22    builder::HttpServiceBuilder,
23    error::DispatchError,
24    h1, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig,
25};
26
27/// A [`ServiceFactory`] for HTTP/1.1 and HTTP/2 connections.
28///
29/// Use [`build`](Self::build) to begin constructing service. Also see [`HttpServiceBuilder`].
30///
31/// # Automatic HTTP Version Selection
32/// There are two ways to select the HTTP version of an incoming connection:
33/// - One is to rely on the ALPN information that is provided when using TLS (HTTPS); both versions
34///   are supported automatically when using either of the `.rustls()` or `.openssl()` finalizing
35///   methods.
36/// - The other is to read the first few bytes of the TCP stream. This is the only viable approach
37///   for supporting H2C, which allows the HTTP/2 protocol to work over plaintext connections. Use
38///   the `.tcp_auto_h2c()` finalizing method to enable this behavior.
39///
40/// # Examples
41/// ```
42/// # use std::convert::Infallible;
43/// use actix_http::{HttpService, Request, Response, StatusCode};
44///
45/// // this service would constructed in an actix_server::Server
46///
47/// # actix_rt::System::new().block_on(async {
48/// HttpService::build()
49///     // the builder finalizing method, other finalizers would not return an `HttpService`
50///     .finish(|_req: Request| async move {
51///         Ok::<_, Infallible>(
52///             Response::build(StatusCode::OK).body("Hello!")
53///         )
54///     })
55///     // the service finalizing method method
56///     // you can use `.tcp_auto_h2c()`, `.rustls()`, or `.openssl()` instead of `.tcp()`
57///     .tcp();
58/// # })
59/// ```
60pub struct HttpService<T, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler> {
61    srv: S,
62    cfg: ServiceConfig,
63    expect: X,
64    upgrade: Option<U>,
65    on_connect_ext: Option<Rc<ConnectCallback<T>>>,
66    _phantom: PhantomData<B>,
67}
68
69impl<T, S, B> HttpService<T, S, B>
70where
71    S: ServiceFactory<Request, Config = ()>,
72    S::Error: Into<Response<BoxBody>> + 'static,
73    S::InitError: fmt::Debug,
74    S::Response: Into<Response<B>> + 'static,
75    <S::Service as Service<Request>>::Future: 'static,
76    B: MessageBody + 'static,
77{
78    /// Constructs builder for `HttpService` instance.
79    pub fn build() -> HttpServiceBuilder<T, S> {
80        HttpServiceBuilder::default()
81    }
82}
83
84impl<T, S, B> HttpService<T, S, B>
85where
86    S: ServiceFactory<Request, Config = ()>,
87    S::Error: Into<Response<BoxBody>> + 'static,
88    S::InitError: fmt::Debug,
89    S::Response: Into<Response<B>> + 'static,
90    <S::Service as Service<Request>>::Future: 'static,
91    B: MessageBody + 'static,
92{
93    /// Constructs new `HttpService` instance from service with default config.
94    pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {
95        HttpService {
96            cfg: ServiceConfig::default(),
97            srv: service.into_factory(),
98            expect: h1::ExpectHandler,
99            upgrade: None,
100            on_connect_ext: None,
101            _phantom: PhantomData,
102        }
103    }
104
105    /// Constructs new `HttpService` instance from config and service.
106    pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
107        cfg: ServiceConfig,
108        service: F,
109    ) -> Self {
110        HttpService {
111            cfg,
112            srv: service.into_factory(),
113            expect: h1::ExpectHandler,
114            upgrade: None,
115            on_connect_ext: None,
116            _phantom: PhantomData,
117        }
118    }
119}
120
121impl<T, S, B, X, U> HttpService<T, S, B, X, U>
122where
123    S: ServiceFactory<Request, Config = ()>,
124    S::Error: Into<Response<BoxBody>> + 'static,
125    S::InitError: fmt::Debug,
126    S::Response: Into<Response<B>> + 'static,
127    <S::Service as Service<Request>>::Future: 'static,
128    B: MessageBody,
129{
130    /// Sets service for `Expect: 100-Continue` handling.
131    ///
132    /// An expect service is called with requests that contain an `Expect` header. A successful
133    /// response type is also a request which will be forwarded to the main service.
134    pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
135    where
136        X1: ServiceFactory<Request, Config = (), Response = Request>,
137        X1::Error: Into<Response<BoxBody>>,
138        X1::InitError: fmt::Debug,
139    {
140        HttpService {
141            expect,
142            cfg: self.cfg,
143            srv: self.srv,
144            upgrade: self.upgrade,
145            on_connect_ext: self.on_connect_ext,
146            _phantom: PhantomData,
147        }
148    }
149
150    /// Sets service for custom `Connection: Upgrade` handling.
151    ///
152    /// If service is provided then normal requests handling get halted and this service get called
153    /// with original request and framed object.
154    pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
155    where
156        U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
157        U1::Error: fmt::Display,
158        U1::InitError: fmt::Debug,
159    {
160        HttpService {
161            upgrade,
162            cfg: self.cfg,
163            srv: self.srv,
164            expect: self.expect,
165            on_connect_ext: self.on_connect_ext,
166            _phantom: PhantomData,
167        }
168    }
169
170    /// Set connect callback with mutable access to request data container.
171    pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
172        self.on_connect_ext = f;
173        self
174    }
175}
176
177impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
178where
179    S: ServiceFactory<Request, Config = ()>,
180    S::Future: 'static,
181    S::Error: Into<Response<BoxBody>> + 'static,
182    S::InitError: fmt::Debug,
183    S::Response: Into<Response<B>> + 'static,
184    <S::Service as Service<Request>>::Future: 'static,
185
186    B: MessageBody + 'static,
187
188    X: ServiceFactory<Request, Config = (), Response = Request>,
189    X::Future: 'static,
190    X::Error: Into<Response<BoxBody>>,
191    X::InitError: fmt::Debug,
192
193    U: ServiceFactory<(Request, Framed<TcpStream, h1::Codec>), Config = (), Response = ()>,
194    U::Future: 'static,
195    U::Error: fmt::Display + Into<Response<BoxBody>>,
196    U::InitError: fmt::Debug,
197{
198    /// Creates TCP stream service from HTTP service.
199    ///
200    /// The resulting service only supports HTTP/1.x.
201    pub fn tcp(
202        self,
203    ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
204    {
205        fn_service(|io: TcpStream| async {
206            let peer_addr = io.peer_addr().ok();
207            Ok((io, Protocol::Http1, peer_addr))
208        })
209        .and_then(self)
210    }
211
212    /// Creates TCP stream service from HTTP service that automatically selects HTTP/1.x or HTTP/2
213    /// on plaintext connections.
214    #[cfg(feature = "http2")]
215    pub fn tcp_auto_h2c(
216        self,
217    ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
218    {
219        fn_service(move |io: TcpStream| async move {
220            // subset of HTTP/2 preface defined by RFC 9113 ยง3.4
221            // this subset was chosen to maximize likelihood that peeking only once will allow us to
222            // reliably determine version or else it should fallback to h1 and fail quickly if data
223            // on the wire is junk
224            const H2_PREFACE: &[u8] = b"PRI * HTTP/2";
225
226            let mut buf = [0; 12];
227
228            io.peek(&mut buf).await?;
229
230            let proto = if buf == H2_PREFACE {
231                Protocol::Http2
232            } else {
233                Protocol::Http1
234            };
235
236            let peer_addr = io.peer_addr().ok();
237            Ok((io, proto, peer_addr))
238        })
239        .and_then(self)
240    }
241}
242
243/// Configuration options used when accepting TLS connection.
244#[cfg(feature = "__tls")]
245#[derive(Debug, Default)]
246pub struct TlsAcceptorConfig {
247    pub(crate) handshake_timeout: Option<std::time::Duration>,
248}
249
250#[cfg(feature = "__tls")]
251impl TlsAcceptorConfig {
252    /// Set TLS handshake timeout duration.
253    pub fn handshake_timeout(self, dur: std::time::Duration) -> Self {
254        Self {
255            handshake_timeout: Some(dur),
256            // ..self
257        }
258    }
259}
260
261#[cfg(feature = "openssl")]
262mod openssl {
263    use actix_service::ServiceFactoryExt as _;
264    use actix_tls::accept::{
265        openssl::{
266            reexports::{Error as SslError, SslAcceptor},
267            Acceptor, TlsStream,
268        },
269        TlsError,
270    };
271
272    use super::*;
273
274    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
275    where
276        S: ServiceFactory<Request, Config = ()>,
277        S::Future: 'static,
278        S::Error: Into<Response<BoxBody>> + 'static,
279        S::InitError: fmt::Debug,
280        S::Response: Into<Response<B>> + 'static,
281        <S::Service as Service<Request>>::Future: 'static,
282
283        B: MessageBody + 'static,
284
285        X: ServiceFactory<Request, Config = (), Response = Request>,
286        X::Future: 'static,
287        X::Error: Into<Response<BoxBody>>,
288        X::InitError: fmt::Debug,
289
290        U: ServiceFactory<
291            (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
292            Config = (),
293            Response = (),
294        >,
295        U::Future: 'static,
296        U::Error: fmt::Display + Into<Response<BoxBody>>,
297        U::InitError: fmt::Debug,
298    {
299        /// Create OpenSSL based service.
300        pub fn openssl(
301            self,
302            acceptor: SslAcceptor,
303        ) -> impl ServiceFactory<
304            TcpStream,
305            Config = (),
306            Response = (),
307            Error = TlsError<SslError, DispatchError>,
308            InitError = (),
309        > {
310            self.openssl_with_config(acceptor, TlsAcceptorConfig::default())
311        }
312
313        /// Create OpenSSL based service with custom TLS acceptor configuration.
314        pub fn openssl_with_config(
315            self,
316            acceptor: SslAcceptor,
317            tls_acceptor_config: TlsAcceptorConfig,
318        ) -> impl ServiceFactory<
319            TcpStream,
320            Config = (),
321            Response = (),
322            Error = TlsError<SslError, DispatchError>,
323            InitError = (),
324        > {
325            let mut acceptor = Acceptor::new(acceptor);
326
327            if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
328                acceptor.set_handshake_timeout(handshake_timeout);
329            }
330
331            acceptor
332                .map_init_err(|_| {
333                    unreachable!("TLS acceptor service factory does not error on init")
334                })
335                .map_err(TlsError::into_service_error)
336                .map(|io: TlsStream<TcpStream>| {
337                    let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
338                        if protos.windows(2).any(|window| window == b"h2") {
339                            Protocol::Http2
340                        } else {
341                            Protocol::Http1
342                        }
343                    } else {
344                        Protocol::Http1
345                    };
346
347                    let peer_addr = io.get_ref().peer_addr().ok();
348                    (io, proto, peer_addr)
349                })
350                .and_then(self.map_err(TlsError::Service))
351        }
352    }
353}
354
355#[cfg(feature = "rustls-0_20")]
356mod rustls_0_20 {
357    use std::io;
358
359    use actix_service::ServiceFactoryExt as _;
360    use actix_tls::accept::{
361        rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
362        TlsError,
363    };
364
365    use super::*;
366
367    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
368    where
369        S: ServiceFactory<Request, Config = ()>,
370        S::Future: 'static,
371        S::Error: Into<Response<BoxBody>> + 'static,
372        S::InitError: fmt::Debug,
373        S::Response: Into<Response<B>> + 'static,
374        <S::Service as Service<Request>>::Future: 'static,
375
376        B: MessageBody + 'static,
377
378        X: ServiceFactory<Request, Config = (), Response = Request>,
379        X::Future: 'static,
380        X::Error: Into<Response<BoxBody>>,
381        X::InitError: fmt::Debug,
382
383        U: ServiceFactory<
384            (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
385            Config = (),
386            Response = (),
387        >,
388        U::Future: 'static,
389        U::Error: fmt::Display + Into<Response<BoxBody>>,
390        U::InitError: fmt::Debug,
391    {
392        /// Create Rustls v0.20 based service.
393        pub fn rustls(
394            self,
395            config: ServerConfig,
396        ) -> impl ServiceFactory<
397            TcpStream,
398            Config = (),
399            Response = (),
400            Error = TlsError<io::Error, DispatchError>,
401            InitError = (),
402        > {
403            self.rustls_with_config(config, TlsAcceptorConfig::default())
404        }
405
406        /// Create Rustls v0.20 based service with custom TLS acceptor configuration.
407        pub fn rustls_with_config(
408            self,
409            mut config: ServerConfig,
410            tls_acceptor_config: TlsAcceptorConfig,
411        ) -> impl ServiceFactory<
412            TcpStream,
413            Config = (),
414            Response = (),
415            Error = TlsError<io::Error, DispatchError>,
416            InitError = (),
417        > {
418            let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
419            protos.extend_from_slice(&config.alpn_protocols);
420            config.alpn_protocols = protos;
421
422            let mut acceptor = Acceptor::new(config);
423
424            if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
425                acceptor.set_handshake_timeout(handshake_timeout);
426            }
427
428            acceptor
429                .map_init_err(|_| {
430                    unreachable!("TLS acceptor service factory does not error on init")
431                })
432                .map_err(TlsError::into_service_error)
433                .and_then(|io: TlsStream<TcpStream>| async {
434                    let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
435                        if protos.windows(2).any(|window| window == b"h2") {
436                            Protocol::Http2
437                        } else {
438                            Protocol::Http1
439                        }
440                    } else {
441                        Protocol::Http1
442                    };
443                    let peer_addr = io.get_ref().0.peer_addr().ok();
444                    Ok((io, proto, peer_addr))
445                })
446                .and_then(self.map_err(TlsError::Service))
447        }
448    }
449}
450
451#[cfg(feature = "rustls-0_21")]
452mod rustls_0_21 {
453    use std::io;
454
455    use actix_service::ServiceFactoryExt as _;
456    use actix_tls::accept::{
457        rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
458        TlsError,
459    };
460
461    use super::*;
462
463    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
464    where
465        S: ServiceFactory<Request, Config = ()>,
466        S::Future: 'static,
467        S::Error: Into<Response<BoxBody>> + 'static,
468        S::InitError: fmt::Debug,
469        S::Response: Into<Response<B>> + 'static,
470        <S::Service as Service<Request>>::Future: 'static,
471
472        B: MessageBody + 'static,
473
474        X: ServiceFactory<Request, Config = (), Response = Request>,
475        X::Future: 'static,
476        X::Error: Into<Response<BoxBody>>,
477        X::InitError: fmt::Debug,
478
479        U: ServiceFactory<
480            (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
481            Config = (),
482            Response = (),
483        >,
484        U::Future: 'static,
485        U::Error: fmt::Display + Into<Response<BoxBody>>,
486        U::InitError: fmt::Debug,
487    {
488        /// Create Rustls v0.21 based service.
489        pub fn rustls_021(
490            self,
491            config: ServerConfig,
492        ) -> impl ServiceFactory<
493            TcpStream,
494            Config = (),
495            Response = (),
496            Error = TlsError<io::Error, DispatchError>,
497            InitError = (),
498        > {
499            self.rustls_021_with_config(config, TlsAcceptorConfig::default())
500        }
501
502        /// Create Rustls v0.21 based service with custom TLS acceptor configuration.
503        pub fn rustls_021_with_config(
504            self,
505            mut config: ServerConfig,
506            tls_acceptor_config: TlsAcceptorConfig,
507        ) -> impl ServiceFactory<
508            TcpStream,
509            Config = (),
510            Response = (),
511            Error = TlsError<io::Error, DispatchError>,
512            InitError = (),
513        > {
514            let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
515            protos.extend_from_slice(&config.alpn_protocols);
516            config.alpn_protocols = protos;
517
518            let mut acceptor = Acceptor::new(config);
519
520            if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
521                acceptor.set_handshake_timeout(handshake_timeout);
522            }
523
524            acceptor
525                .map_init_err(|_| {
526                    unreachable!("TLS acceptor service factory does not error on init")
527                })
528                .map_err(TlsError::into_service_error)
529                .and_then(|io: TlsStream<TcpStream>| async {
530                    let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
531                        if protos.windows(2).any(|window| window == b"h2") {
532                            Protocol::Http2
533                        } else {
534                            Protocol::Http1
535                        }
536                    } else {
537                        Protocol::Http1
538                    };
539                    let peer_addr = io.get_ref().0.peer_addr().ok();
540                    Ok((io, proto, peer_addr))
541                })
542                .and_then(self.map_err(TlsError::Service))
543        }
544    }
545}
546
547#[cfg(feature = "rustls-0_22")]
548mod rustls_0_22 {
549    use std::io;
550
551    use actix_service::ServiceFactoryExt as _;
552    use actix_tls::accept::{
553        rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
554        TlsError,
555    };
556
557    use super::*;
558
559    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
560    where
561        S: ServiceFactory<Request, Config = ()>,
562        S::Future: 'static,
563        S::Error: Into<Response<BoxBody>> + 'static,
564        S::InitError: fmt::Debug,
565        S::Response: Into<Response<B>> + 'static,
566        <S::Service as Service<Request>>::Future: 'static,
567
568        B: MessageBody + 'static,
569
570        X: ServiceFactory<Request, Config = (), Response = Request>,
571        X::Future: 'static,
572        X::Error: Into<Response<BoxBody>>,
573        X::InitError: fmt::Debug,
574
575        U: ServiceFactory<
576            (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
577            Config = (),
578            Response = (),
579        >,
580        U::Future: 'static,
581        U::Error: fmt::Display + Into<Response<BoxBody>>,
582        U::InitError: fmt::Debug,
583    {
584        /// Create Rustls v0.22 based service.
585        pub fn rustls_0_22(
586            self,
587            config: ServerConfig,
588        ) -> impl ServiceFactory<
589            TcpStream,
590            Config = (),
591            Response = (),
592            Error = TlsError<io::Error, DispatchError>,
593            InitError = (),
594        > {
595            self.rustls_0_22_with_config(config, TlsAcceptorConfig::default())
596        }
597
598        /// Create Rustls v0.22 based service with custom TLS acceptor configuration.
599        pub fn rustls_0_22_with_config(
600            self,
601            mut config: ServerConfig,
602            tls_acceptor_config: TlsAcceptorConfig,
603        ) -> impl ServiceFactory<
604            TcpStream,
605            Config = (),
606            Response = (),
607            Error = TlsError<io::Error, DispatchError>,
608            InitError = (),
609        > {
610            let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
611            protos.extend_from_slice(&config.alpn_protocols);
612            config.alpn_protocols = protos;
613
614            let mut acceptor = Acceptor::new(config);
615
616            if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
617                acceptor.set_handshake_timeout(handshake_timeout);
618            }
619
620            acceptor
621                .map_init_err(|_| {
622                    unreachable!("TLS acceptor service factory does not error on init")
623                })
624                .map_err(TlsError::into_service_error)
625                .and_then(|io: TlsStream<TcpStream>| async {
626                    let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
627                        if protos.windows(2).any(|window| window == b"h2") {
628                            Protocol::Http2
629                        } else {
630                            Protocol::Http1
631                        }
632                    } else {
633                        Protocol::Http1
634                    };
635                    let peer_addr = io.get_ref().0.peer_addr().ok();
636                    Ok((io, proto, peer_addr))
637                })
638                .and_then(self.map_err(TlsError::Service))
639        }
640    }
641}
642
643#[cfg(feature = "rustls-0_23")]
644mod rustls_0_23 {
645    use std::io;
646
647    use actix_service::ServiceFactoryExt as _;
648    use actix_tls::accept::{
649        rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
650        TlsError,
651    };
652
653    use super::*;
654
655    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
656    where
657        S: ServiceFactory<Request, Config = ()>,
658        S::Future: 'static,
659        S::Error: Into<Response<BoxBody>> + 'static,
660        S::InitError: fmt::Debug,
661        S::Response: Into<Response<B>> + 'static,
662        <S::Service as Service<Request>>::Future: 'static,
663
664        B: MessageBody + 'static,
665
666        X: ServiceFactory<Request, Config = (), Response = Request>,
667        X::Future: 'static,
668        X::Error: Into<Response<BoxBody>>,
669        X::InitError: fmt::Debug,
670
671        U: ServiceFactory<
672            (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
673            Config = (),
674            Response = (),
675        >,
676        U::Future: 'static,
677        U::Error: fmt::Display + Into<Response<BoxBody>>,
678        U::InitError: fmt::Debug,
679    {
680        /// Create Rustls v0.23 based service.
681        pub fn rustls_0_23(
682            self,
683            config: ServerConfig,
684        ) -> impl ServiceFactory<
685            TcpStream,
686            Config = (),
687            Response = (),
688            Error = TlsError<io::Error, DispatchError>,
689            InitError = (),
690        > {
691            self.rustls_0_23_with_config(config, TlsAcceptorConfig::default())
692        }
693
694        /// Create Rustls v0.23 based service with custom TLS acceptor configuration.
695        pub fn rustls_0_23_with_config(
696            self,
697            mut config: ServerConfig,
698            tls_acceptor_config: TlsAcceptorConfig,
699        ) -> impl ServiceFactory<
700            TcpStream,
701            Config = (),
702            Response = (),
703            Error = TlsError<io::Error, DispatchError>,
704            InitError = (),
705        > {
706            let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
707            protos.extend_from_slice(&config.alpn_protocols);
708            config.alpn_protocols = protos;
709
710            let mut acceptor = Acceptor::new(config);
711
712            if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
713                acceptor.set_handshake_timeout(handshake_timeout);
714            }
715
716            acceptor
717                .map_init_err(|_| {
718                    unreachable!("TLS acceptor service factory does not error on init")
719                })
720                .map_err(TlsError::into_service_error)
721                .and_then(|io: TlsStream<TcpStream>| async {
722                    let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
723                        if protos.windows(2).any(|window| window == b"h2") {
724                            Protocol::Http2
725                        } else {
726                            Protocol::Http1
727                        }
728                    } else {
729                        Protocol::Http1
730                    };
731                    let peer_addr = io.get_ref().0.peer_addr().ok();
732                    Ok((io, proto, peer_addr))
733                })
734                .and_then(self.map_err(TlsError::Service))
735        }
736    }
737}
738
739impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)>
740    for HttpService<T, S, B, X, U>
741where
742    T: AsyncRead + AsyncWrite + Unpin + 'static,
743
744    S: ServiceFactory<Request, Config = ()>,
745    S::Future: 'static,
746    S::Error: Into<Response<BoxBody>> + 'static,
747    S::InitError: fmt::Debug,
748    S::Response: Into<Response<B>> + 'static,
749    <S::Service as Service<Request>>::Future: 'static,
750
751    B: MessageBody + 'static,
752
753    X: ServiceFactory<Request, Config = (), Response = Request>,
754    X::Future: 'static,
755    X::Error: Into<Response<BoxBody>>,
756    X::InitError: fmt::Debug,
757
758    U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
759    U::Future: 'static,
760    U::Error: fmt::Display + Into<Response<BoxBody>>,
761    U::InitError: fmt::Debug,
762{
763    type Response = ();
764    type Error = DispatchError;
765    type Config = ();
766    type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
767    type InitError = ();
768    type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
769
770    fn new_service(&self, _: ()) -> Self::Future {
771        let service = self.srv.new_service(());
772        let expect = self.expect.new_service(());
773        let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
774        let on_connect_ext = self.on_connect_ext.clone();
775        let cfg = self.cfg.clone();
776
777        Box::pin(async move {
778            let expect = expect.await.map_err(|err| {
779                tracing::error!("Initialization of HTTP expect service error: {err:?}");
780            })?;
781
782            let upgrade = match upgrade {
783                Some(upgrade) => {
784                    let upgrade = upgrade.await.map_err(|err| {
785                        tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
786                    })?;
787                    Some(upgrade)
788                }
789                None => None,
790            };
791
792            let service = service.await.map_err(|err| {
793                tracing::error!("Initialization of HTTP service error: {err:?}");
794            })?;
795
796            Ok(HttpServiceHandler::new(
797                cfg,
798                service,
799                expect,
800                upgrade,
801                on_connect_ext,
802            ))
803        })
804    }
805}
806
807/// `Service` implementation for HTTP/1 and HTTP/2 transport
808pub struct HttpServiceHandler<T, S, B, X, U>
809where
810    S: Service<Request>,
811    X: Service<Request>,
812    U: Service<(Request, Framed<T, h1::Codec>)>,
813{
814    pub(super) flow: Rc<HttpFlow<S, X, U>>,
815    pub(super) cfg: ServiceConfig,
816    pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>,
817    _phantom: PhantomData<B>,
818}
819
820impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
821where
822    S: Service<Request>,
823    S::Error: Into<Response<BoxBody>>,
824    X: Service<Request>,
825    X::Error: Into<Response<BoxBody>>,
826    U: Service<(Request, Framed<T, h1::Codec>)>,
827    U::Error: Into<Response<BoxBody>>,
828{
829    pub(super) fn new(
830        cfg: ServiceConfig,
831        service: S,
832        expect: X,
833        upgrade: Option<U>,
834        on_connect_ext: Option<Rc<ConnectCallback<T>>>,
835    ) -> HttpServiceHandler<T, S, B, X, U> {
836        HttpServiceHandler {
837            cfg,
838            on_connect_ext,
839            flow: HttpFlow::new(service, expect, upgrade),
840            _phantom: PhantomData,
841        }
842    }
843
844    pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Response<BoxBody>>> {
845        ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?;
846
847        ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?;
848
849        if let Some(ref upg) = self.flow.upgrade {
850            ready!(upg.poll_ready(cx).map_err(Into::into))?;
851        };
852
853        Poll::Ready(Ok(()))
854    }
855}
856
857/// A collection of services that describe an HTTP request flow.
858pub(super) struct HttpFlow<S, X, U> {
859    pub(super) service: S,
860    pub(super) expect: X,
861    pub(super) upgrade: Option<U>,
862}
863
864impl<S, X, U> HttpFlow<S, X, U> {
865    pub(super) fn new(service: S, expect: X, upgrade: Option<U>) -> Rc<Self> {
866        Rc::new(Self {
867            service,
868            expect,
869            upgrade,
870        })
871    }
872}
873
874impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)>
875    for HttpServiceHandler<T, S, B, X, U>
876where
877    T: AsyncRead + AsyncWrite + Unpin,
878
879    S: Service<Request>,
880    S::Error: Into<Response<BoxBody>> + 'static,
881    S::Future: 'static,
882    S::Response: Into<Response<B>> + 'static,
883
884    B: MessageBody + 'static,
885
886    X: Service<Request, Response = Request>,
887    X::Error: Into<Response<BoxBody>>,
888
889    U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
890    U::Error: fmt::Display + Into<Response<BoxBody>>,
891{
892    type Response = ();
893    type Error = DispatchError;
894    type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
895
896    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
897        self._poll_ready(cx).map_err(|err| {
898            error!("HTTP service readiness error: {:?}", err);
899            DispatchError::Service(err)
900        })
901    }
902
903    fn call(&self, (io, proto, peer_addr): (T, Protocol, Option<net::SocketAddr>)) -> Self::Future {
904        let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
905
906        match proto {
907            #[cfg(feature = "http2")]
908            Protocol::Http2 => HttpServiceHandlerResponse {
909                state: State::H2Handshake {
910                    handshake: Some((
911                        crate::h2::handshake_with_timeout(io, &self.cfg),
912                        self.cfg.clone(),
913                        Rc::clone(&self.flow),
914                        conn_data,
915                        peer_addr,
916                    )),
917                },
918            },
919
920            #[cfg(not(feature = "http2"))]
921            Protocol::Http2 => {
922                panic!("HTTP/2 support is disabled (enable with the `http2` feature flag)")
923            }
924
925            Protocol::Http1 => HttpServiceHandlerResponse {
926                state: State::H1 {
927                    dispatcher: h1::Dispatcher::new(
928                        io,
929                        Rc::clone(&self.flow),
930                        self.cfg.clone(),
931                        peer_addr,
932                        conn_data,
933                    ),
934                },
935            },
936
937            proto => unimplemented!("Unsupported HTTP version: {:?}.", proto),
938        }
939    }
940}
941
942#[cfg(not(feature = "http2"))]
943pin_project! {
944    #[project = StateProj]
945    enum State<T, S, B, X, U>
946    where
947        T: AsyncRead,
948        T: AsyncWrite,
949        T: Unpin,
950
951        S: Service<Request>,
952        S::Future: 'static,
953        S::Error: Into<Response<BoxBody>>,
954
955        B: MessageBody,
956
957        X: Service<Request, Response = Request>,
958        X::Error: Into<Response<BoxBody>>,
959
960        U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
961        U::Error: fmt::Display,
962    {
963        H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
964    }
965}
966
967#[cfg(feature = "http2")]
968pin_project! {
969    #[project = StateProj]
970    enum State<T, S, B, X, U>
971    where
972        T: AsyncRead,
973        T: AsyncWrite,
974        T: Unpin,
975
976        S: Service<Request>,
977        S::Future: 'static,
978        S::Error: Into<Response<BoxBody>>,
979
980        B: MessageBody,
981
982        X: Service<Request, Response = Request>,
983        X::Error: Into<Response<BoxBody>>,
984
985        U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
986        U::Error: fmt::Display,
987    {
988        H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
989
990        H2 { #[pin] dispatcher: crate::h2::Dispatcher<T, S, B, X, U> },
991
992        H2Handshake {
993            handshake: Option<(
994                crate::h2::HandshakeWithTimeout<T>,
995                ServiceConfig,
996                Rc<HttpFlow<S, X, U>>,
997                OnConnectData,
998                Option<net::SocketAddr>,
999            )>,
1000        },
1001    }
1002}
1003
1004pin_project! {
1005    pub struct HttpServiceHandlerResponse<T, S, B, X, U>
1006    where
1007        T: AsyncRead,
1008        T: AsyncWrite,
1009        T: Unpin,
1010
1011        S: Service<Request>,
1012        S::Error: Into<Response<BoxBody>>,
1013        S::Error: 'static,
1014        S::Future: 'static,
1015        S::Response: Into<Response<B>>,
1016        S::Response: 'static,
1017
1018        B: MessageBody,
1019
1020        X: Service<Request, Response = Request>,
1021        X::Error: Into<Response<BoxBody>>,
1022
1023        U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
1024        U::Error: fmt::Display,
1025    {
1026        #[pin]
1027        state: State<T, S, B, X, U>,
1028    }
1029}
1030
1031impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
1032where
1033    T: AsyncRead + AsyncWrite + Unpin,
1034
1035    S: Service<Request>,
1036    S::Error: Into<Response<BoxBody>> + 'static,
1037    S::Future: 'static,
1038    S::Response: Into<Response<B>> + 'static,
1039
1040    B: MessageBody + 'static,
1041
1042    X: Service<Request, Response = Request>,
1043    X::Error: Into<Response<BoxBody>>,
1044
1045    U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
1046    U::Error: fmt::Display,
1047{
1048    type Output = Result<(), DispatchError>;
1049
1050    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1051        match self.as_mut().project().state.project() {
1052            StateProj::H1 { dispatcher } => dispatcher.poll(cx),
1053
1054            #[cfg(feature = "http2")]
1055            StateProj::H2 { dispatcher } => dispatcher.poll(cx),
1056
1057            #[cfg(feature = "http2")]
1058            StateProj::H2Handshake { handshake: data } => {
1059                match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) {
1060                    Ok((conn, timer)) => {
1061                        let (_, config, flow, conn_data, peer_addr) = data.take().unwrap();
1062
1063                        self.as_mut().project().state.set(State::H2 {
1064                            dispatcher: crate::h2::Dispatcher::new(
1065                                conn, flow, config, peer_addr, conn_data, timer,
1066                            ),
1067                        });
1068                        self.poll(cx)
1069                    }
1070                    Err(err) => {
1071                        tracing::trace!("H2 handshake error: {}", err);
1072                        Poll::Ready(Err(err))
1073                    }
1074                }
1075            }
1076        }
1077    }
1078}