1use std::{
2 fmt,
3 future::Future,
4 marker::PhantomData,
5 net,
6 pin::Pin,
7 rc::Rc,
8 task::{Context, Poll},
9};
10
11use actix_codec::{AsyncRead, AsyncWrite, Framed};
12use actix_rt::net::TcpStream;
13use actix_service::{
14 fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
15};
16use futures_core::{future::LocalBoxFuture, ready};
17use pin_project_lite::pin_project;
18use tracing::error;
19
20use crate::{
21 body::{BoxBody, MessageBody},
22 builder::HttpServiceBuilder,
23 error::DispatchError,
24 h1, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig,
25};
26
27pub struct HttpService<T, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler> {
61 srv: S,
62 cfg: ServiceConfig,
63 expect: X,
64 upgrade: Option<U>,
65 on_connect_ext: Option<Rc<ConnectCallback<T>>>,
66 _phantom: PhantomData<B>,
67}
68
69impl<T, S, B> HttpService<T, S, B>
70where
71 S: ServiceFactory<Request, Config = ()>,
72 S::Error: Into<Response<BoxBody>> + 'static,
73 S::InitError: fmt::Debug,
74 S::Response: Into<Response<B>> + 'static,
75 <S::Service as Service<Request>>::Future: 'static,
76 B: MessageBody + 'static,
77{
78 pub fn build() -> HttpServiceBuilder<T, S> {
80 HttpServiceBuilder::default()
81 }
82}
83
84impl<T, S, B> HttpService<T, S, B>
85where
86 S: ServiceFactory<Request, Config = ()>,
87 S::Error: Into<Response<BoxBody>> + 'static,
88 S::InitError: fmt::Debug,
89 S::Response: Into<Response<B>> + 'static,
90 <S::Service as Service<Request>>::Future: 'static,
91 B: MessageBody + 'static,
92{
93 pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {
95 HttpService {
96 cfg: ServiceConfig::default(),
97 srv: service.into_factory(),
98 expect: h1::ExpectHandler,
99 upgrade: None,
100 on_connect_ext: None,
101 _phantom: PhantomData,
102 }
103 }
104
105 pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
107 cfg: ServiceConfig,
108 service: F,
109 ) -> Self {
110 HttpService {
111 cfg,
112 srv: service.into_factory(),
113 expect: h1::ExpectHandler,
114 upgrade: None,
115 on_connect_ext: None,
116 _phantom: PhantomData,
117 }
118 }
119}
120
121impl<T, S, B, X, U> HttpService<T, S, B, X, U>
122where
123 S: ServiceFactory<Request, Config = ()>,
124 S::Error: Into<Response<BoxBody>> + 'static,
125 S::InitError: fmt::Debug,
126 S::Response: Into<Response<B>> + 'static,
127 <S::Service as Service<Request>>::Future: 'static,
128 B: MessageBody,
129{
130 pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
135 where
136 X1: ServiceFactory<Request, Config = (), Response = Request>,
137 X1::Error: Into<Response<BoxBody>>,
138 X1::InitError: fmt::Debug,
139 {
140 HttpService {
141 expect,
142 cfg: self.cfg,
143 srv: self.srv,
144 upgrade: self.upgrade,
145 on_connect_ext: self.on_connect_ext,
146 _phantom: PhantomData,
147 }
148 }
149
150 pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
155 where
156 U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
157 U1::Error: fmt::Display,
158 U1::InitError: fmt::Debug,
159 {
160 HttpService {
161 upgrade,
162 cfg: self.cfg,
163 srv: self.srv,
164 expect: self.expect,
165 on_connect_ext: self.on_connect_ext,
166 _phantom: PhantomData,
167 }
168 }
169
170 pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
172 self.on_connect_ext = f;
173 self
174 }
175}
176
177impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
178where
179 S: ServiceFactory<Request, Config = ()>,
180 S::Future: 'static,
181 S::Error: Into<Response<BoxBody>> + 'static,
182 S::InitError: fmt::Debug,
183 S::Response: Into<Response<B>> + 'static,
184 <S::Service as Service<Request>>::Future: 'static,
185
186 B: MessageBody + 'static,
187
188 X: ServiceFactory<Request, Config = (), Response = Request>,
189 X::Future: 'static,
190 X::Error: Into<Response<BoxBody>>,
191 X::InitError: fmt::Debug,
192
193 U: ServiceFactory<(Request, Framed<TcpStream, h1::Codec>), Config = (), Response = ()>,
194 U::Future: 'static,
195 U::Error: fmt::Display + Into<Response<BoxBody>>,
196 U::InitError: fmt::Debug,
197{
198 pub fn tcp(
202 self,
203 ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
204 {
205 fn_service(|io: TcpStream| async {
206 let peer_addr = io.peer_addr().ok();
207 Ok((io, Protocol::Http1, peer_addr))
208 })
209 .and_then(self)
210 }
211
212 #[cfg(feature = "http2")]
215 pub fn tcp_auto_h2c(
216 self,
217 ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
218 {
219 fn_service(move |io: TcpStream| async move {
220 const H2_PREFACE: &[u8] = b"PRI * HTTP/2";
225
226 let mut buf = [0; 12];
227
228 io.peek(&mut buf).await?;
229
230 let proto = if buf == H2_PREFACE {
231 Protocol::Http2
232 } else {
233 Protocol::Http1
234 };
235
236 let peer_addr = io.peer_addr().ok();
237 Ok((io, proto, peer_addr))
238 })
239 .and_then(self)
240 }
241}
242
243#[cfg(feature = "__tls")]
245#[derive(Debug, Default)]
246pub struct TlsAcceptorConfig {
247 pub(crate) handshake_timeout: Option<std::time::Duration>,
248}
249
250#[cfg(feature = "__tls")]
251impl TlsAcceptorConfig {
252 pub fn handshake_timeout(self, dur: std::time::Duration) -> Self {
254 Self {
255 handshake_timeout: Some(dur),
256 }
258 }
259}
260
261#[cfg(feature = "openssl")]
262mod openssl {
263 use actix_service::ServiceFactoryExt as _;
264 use actix_tls::accept::{
265 openssl::{
266 reexports::{Error as SslError, SslAcceptor},
267 Acceptor, TlsStream,
268 },
269 TlsError,
270 };
271
272 use super::*;
273
274 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
275 where
276 S: ServiceFactory<Request, Config = ()>,
277 S::Future: 'static,
278 S::Error: Into<Response<BoxBody>> + 'static,
279 S::InitError: fmt::Debug,
280 S::Response: Into<Response<B>> + 'static,
281 <S::Service as Service<Request>>::Future: 'static,
282
283 B: MessageBody + 'static,
284
285 X: ServiceFactory<Request, Config = (), Response = Request>,
286 X::Future: 'static,
287 X::Error: Into<Response<BoxBody>>,
288 X::InitError: fmt::Debug,
289
290 U: ServiceFactory<
291 (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
292 Config = (),
293 Response = (),
294 >,
295 U::Future: 'static,
296 U::Error: fmt::Display + Into<Response<BoxBody>>,
297 U::InitError: fmt::Debug,
298 {
299 pub fn openssl(
301 self,
302 acceptor: SslAcceptor,
303 ) -> impl ServiceFactory<
304 TcpStream,
305 Config = (),
306 Response = (),
307 Error = TlsError<SslError, DispatchError>,
308 InitError = (),
309 > {
310 self.openssl_with_config(acceptor, TlsAcceptorConfig::default())
311 }
312
313 pub fn openssl_with_config(
315 self,
316 acceptor: SslAcceptor,
317 tls_acceptor_config: TlsAcceptorConfig,
318 ) -> impl ServiceFactory<
319 TcpStream,
320 Config = (),
321 Response = (),
322 Error = TlsError<SslError, DispatchError>,
323 InitError = (),
324 > {
325 let mut acceptor = Acceptor::new(acceptor);
326
327 if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
328 acceptor.set_handshake_timeout(handshake_timeout);
329 }
330
331 acceptor
332 .map_init_err(|_| {
333 unreachable!("TLS acceptor service factory does not error on init")
334 })
335 .map_err(TlsError::into_service_error)
336 .map(|io: TlsStream<TcpStream>| {
337 let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
338 if protos.windows(2).any(|window| window == b"h2") {
339 Protocol::Http2
340 } else {
341 Protocol::Http1
342 }
343 } else {
344 Protocol::Http1
345 };
346
347 let peer_addr = io.get_ref().peer_addr().ok();
348 (io, proto, peer_addr)
349 })
350 .and_then(self.map_err(TlsError::Service))
351 }
352 }
353}
354
355#[cfg(feature = "rustls-0_20")]
356mod rustls_0_20 {
357 use std::io;
358
359 use actix_service::ServiceFactoryExt as _;
360 use actix_tls::accept::{
361 rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
362 TlsError,
363 };
364
365 use super::*;
366
367 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
368 where
369 S: ServiceFactory<Request, Config = ()>,
370 S::Future: 'static,
371 S::Error: Into<Response<BoxBody>> + 'static,
372 S::InitError: fmt::Debug,
373 S::Response: Into<Response<B>> + 'static,
374 <S::Service as Service<Request>>::Future: 'static,
375
376 B: MessageBody + 'static,
377
378 X: ServiceFactory<Request, Config = (), Response = Request>,
379 X::Future: 'static,
380 X::Error: Into<Response<BoxBody>>,
381 X::InitError: fmt::Debug,
382
383 U: ServiceFactory<
384 (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
385 Config = (),
386 Response = (),
387 >,
388 U::Future: 'static,
389 U::Error: fmt::Display + Into<Response<BoxBody>>,
390 U::InitError: fmt::Debug,
391 {
392 pub fn rustls(
394 self,
395 config: ServerConfig,
396 ) -> impl ServiceFactory<
397 TcpStream,
398 Config = (),
399 Response = (),
400 Error = TlsError<io::Error, DispatchError>,
401 InitError = (),
402 > {
403 self.rustls_with_config(config, TlsAcceptorConfig::default())
404 }
405
406 pub fn rustls_with_config(
408 self,
409 mut config: ServerConfig,
410 tls_acceptor_config: TlsAcceptorConfig,
411 ) -> impl ServiceFactory<
412 TcpStream,
413 Config = (),
414 Response = (),
415 Error = TlsError<io::Error, DispatchError>,
416 InitError = (),
417 > {
418 let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
419 protos.extend_from_slice(&config.alpn_protocols);
420 config.alpn_protocols = protos;
421
422 let mut acceptor = Acceptor::new(config);
423
424 if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
425 acceptor.set_handshake_timeout(handshake_timeout);
426 }
427
428 acceptor
429 .map_init_err(|_| {
430 unreachable!("TLS acceptor service factory does not error on init")
431 })
432 .map_err(TlsError::into_service_error)
433 .and_then(|io: TlsStream<TcpStream>| async {
434 let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
435 if protos.windows(2).any(|window| window == b"h2") {
436 Protocol::Http2
437 } else {
438 Protocol::Http1
439 }
440 } else {
441 Protocol::Http1
442 };
443 let peer_addr = io.get_ref().0.peer_addr().ok();
444 Ok((io, proto, peer_addr))
445 })
446 .and_then(self.map_err(TlsError::Service))
447 }
448 }
449}
450
451#[cfg(feature = "rustls-0_21")]
452mod rustls_0_21 {
453 use std::io;
454
455 use actix_service::ServiceFactoryExt as _;
456 use actix_tls::accept::{
457 rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
458 TlsError,
459 };
460
461 use super::*;
462
463 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
464 where
465 S: ServiceFactory<Request, Config = ()>,
466 S::Future: 'static,
467 S::Error: Into<Response<BoxBody>> + 'static,
468 S::InitError: fmt::Debug,
469 S::Response: Into<Response<B>> + 'static,
470 <S::Service as Service<Request>>::Future: 'static,
471
472 B: MessageBody + 'static,
473
474 X: ServiceFactory<Request, Config = (), Response = Request>,
475 X::Future: 'static,
476 X::Error: Into<Response<BoxBody>>,
477 X::InitError: fmt::Debug,
478
479 U: ServiceFactory<
480 (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
481 Config = (),
482 Response = (),
483 >,
484 U::Future: 'static,
485 U::Error: fmt::Display + Into<Response<BoxBody>>,
486 U::InitError: fmt::Debug,
487 {
488 pub fn rustls_021(
490 self,
491 config: ServerConfig,
492 ) -> impl ServiceFactory<
493 TcpStream,
494 Config = (),
495 Response = (),
496 Error = TlsError<io::Error, DispatchError>,
497 InitError = (),
498 > {
499 self.rustls_021_with_config(config, TlsAcceptorConfig::default())
500 }
501
502 pub fn rustls_021_with_config(
504 self,
505 mut config: ServerConfig,
506 tls_acceptor_config: TlsAcceptorConfig,
507 ) -> impl ServiceFactory<
508 TcpStream,
509 Config = (),
510 Response = (),
511 Error = TlsError<io::Error, DispatchError>,
512 InitError = (),
513 > {
514 let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
515 protos.extend_from_slice(&config.alpn_protocols);
516 config.alpn_protocols = protos;
517
518 let mut acceptor = Acceptor::new(config);
519
520 if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
521 acceptor.set_handshake_timeout(handshake_timeout);
522 }
523
524 acceptor
525 .map_init_err(|_| {
526 unreachable!("TLS acceptor service factory does not error on init")
527 })
528 .map_err(TlsError::into_service_error)
529 .and_then(|io: TlsStream<TcpStream>| async {
530 let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
531 if protos.windows(2).any(|window| window == b"h2") {
532 Protocol::Http2
533 } else {
534 Protocol::Http1
535 }
536 } else {
537 Protocol::Http1
538 };
539 let peer_addr = io.get_ref().0.peer_addr().ok();
540 Ok((io, proto, peer_addr))
541 })
542 .and_then(self.map_err(TlsError::Service))
543 }
544 }
545}
546
547#[cfg(feature = "rustls-0_22")]
548mod rustls_0_22 {
549 use std::io;
550
551 use actix_service::ServiceFactoryExt as _;
552 use actix_tls::accept::{
553 rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
554 TlsError,
555 };
556
557 use super::*;
558
559 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
560 where
561 S: ServiceFactory<Request, Config = ()>,
562 S::Future: 'static,
563 S::Error: Into<Response<BoxBody>> + 'static,
564 S::InitError: fmt::Debug,
565 S::Response: Into<Response<B>> + 'static,
566 <S::Service as Service<Request>>::Future: 'static,
567
568 B: MessageBody + 'static,
569
570 X: ServiceFactory<Request, Config = (), Response = Request>,
571 X::Future: 'static,
572 X::Error: Into<Response<BoxBody>>,
573 X::InitError: fmt::Debug,
574
575 U: ServiceFactory<
576 (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
577 Config = (),
578 Response = (),
579 >,
580 U::Future: 'static,
581 U::Error: fmt::Display + Into<Response<BoxBody>>,
582 U::InitError: fmt::Debug,
583 {
584 pub fn rustls_0_22(
586 self,
587 config: ServerConfig,
588 ) -> impl ServiceFactory<
589 TcpStream,
590 Config = (),
591 Response = (),
592 Error = TlsError<io::Error, DispatchError>,
593 InitError = (),
594 > {
595 self.rustls_0_22_with_config(config, TlsAcceptorConfig::default())
596 }
597
598 pub fn rustls_0_22_with_config(
600 self,
601 mut config: ServerConfig,
602 tls_acceptor_config: TlsAcceptorConfig,
603 ) -> impl ServiceFactory<
604 TcpStream,
605 Config = (),
606 Response = (),
607 Error = TlsError<io::Error, DispatchError>,
608 InitError = (),
609 > {
610 let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
611 protos.extend_from_slice(&config.alpn_protocols);
612 config.alpn_protocols = protos;
613
614 let mut acceptor = Acceptor::new(config);
615
616 if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
617 acceptor.set_handshake_timeout(handshake_timeout);
618 }
619
620 acceptor
621 .map_init_err(|_| {
622 unreachable!("TLS acceptor service factory does not error on init")
623 })
624 .map_err(TlsError::into_service_error)
625 .and_then(|io: TlsStream<TcpStream>| async {
626 let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
627 if protos.windows(2).any(|window| window == b"h2") {
628 Protocol::Http2
629 } else {
630 Protocol::Http1
631 }
632 } else {
633 Protocol::Http1
634 };
635 let peer_addr = io.get_ref().0.peer_addr().ok();
636 Ok((io, proto, peer_addr))
637 })
638 .and_then(self.map_err(TlsError::Service))
639 }
640 }
641}
642
643#[cfg(feature = "rustls-0_23")]
644mod rustls_0_23 {
645 use std::io;
646
647 use actix_service::ServiceFactoryExt as _;
648 use actix_tls::accept::{
649 rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
650 TlsError,
651 };
652
653 use super::*;
654
655 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
656 where
657 S: ServiceFactory<Request, Config = ()>,
658 S::Future: 'static,
659 S::Error: Into<Response<BoxBody>> + 'static,
660 S::InitError: fmt::Debug,
661 S::Response: Into<Response<B>> + 'static,
662 <S::Service as Service<Request>>::Future: 'static,
663
664 B: MessageBody + 'static,
665
666 X: ServiceFactory<Request, Config = (), Response = Request>,
667 X::Future: 'static,
668 X::Error: Into<Response<BoxBody>>,
669 X::InitError: fmt::Debug,
670
671 U: ServiceFactory<
672 (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
673 Config = (),
674 Response = (),
675 >,
676 U::Future: 'static,
677 U::Error: fmt::Display + Into<Response<BoxBody>>,
678 U::InitError: fmt::Debug,
679 {
680 pub fn rustls_0_23(
682 self,
683 config: ServerConfig,
684 ) -> impl ServiceFactory<
685 TcpStream,
686 Config = (),
687 Response = (),
688 Error = TlsError<io::Error, DispatchError>,
689 InitError = (),
690 > {
691 self.rustls_0_23_with_config(config, TlsAcceptorConfig::default())
692 }
693
694 pub fn rustls_0_23_with_config(
696 self,
697 mut config: ServerConfig,
698 tls_acceptor_config: TlsAcceptorConfig,
699 ) -> impl ServiceFactory<
700 TcpStream,
701 Config = (),
702 Response = (),
703 Error = TlsError<io::Error, DispatchError>,
704 InitError = (),
705 > {
706 let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
707 protos.extend_from_slice(&config.alpn_protocols);
708 config.alpn_protocols = protos;
709
710 let mut acceptor = Acceptor::new(config);
711
712 if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout {
713 acceptor.set_handshake_timeout(handshake_timeout);
714 }
715
716 acceptor
717 .map_init_err(|_| {
718 unreachable!("TLS acceptor service factory does not error on init")
719 })
720 .map_err(TlsError::into_service_error)
721 .and_then(|io: TlsStream<TcpStream>| async {
722 let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() {
723 if protos.windows(2).any(|window| window == b"h2") {
724 Protocol::Http2
725 } else {
726 Protocol::Http1
727 }
728 } else {
729 Protocol::Http1
730 };
731 let peer_addr = io.get_ref().0.peer_addr().ok();
732 Ok((io, proto, peer_addr))
733 })
734 .and_then(self.map_err(TlsError::Service))
735 }
736 }
737}
738
739impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)>
740 for HttpService<T, S, B, X, U>
741where
742 T: AsyncRead + AsyncWrite + Unpin + 'static,
743
744 S: ServiceFactory<Request, Config = ()>,
745 S::Future: 'static,
746 S::Error: Into<Response<BoxBody>> + 'static,
747 S::InitError: fmt::Debug,
748 S::Response: Into<Response<B>> + 'static,
749 <S::Service as Service<Request>>::Future: 'static,
750
751 B: MessageBody + 'static,
752
753 X: ServiceFactory<Request, Config = (), Response = Request>,
754 X::Future: 'static,
755 X::Error: Into<Response<BoxBody>>,
756 X::InitError: fmt::Debug,
757
758 U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
759 U::Future: 'static,
760 U::Error: fmt::Display + Into<Response<BoxBody>>,
761 U::InitError: fmt::Debug,
762{
763 type Response = ();
764 type Error = DispatchError;
765 type Config = ();
766 type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
767 type InitError = ();
768 type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
769
770 fn new_service(&self, _: ()) -> Self::Future {
771 let service = self.srv.new_service(());
772 let expect = self.expect.new_service(());
773 let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
774 let on_connect_ext = self.on_connect_ext.clone();
775 let cfg = self.cfg.clone();
776
777 Box::pin(async move {
778 let expect = expect.await.map_err(|err| {
779 tracing::error!("Initialization of HTTP expect service error: {err:?}");
780 })?;
781
782 let upgrade = match upgrade {
783 Some(upgrade) => {
784 let upgrade = upgrade.await.map_err(|err| {
785 tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
786 })?;
787 Some(upgrade)
788 }
789 None => None,
790 };
791
792 let service = service.await.map_err(|err| {
793 tracing::error!("Initialization of HTTP service error: {err:?}");
794 })?;
795
796 Ok(HttpServiceHandler::new(
797 cfg,
798 service,
799 expect,
800 upgrade,
801 on_connect_ext,
802 ))
803 })
804 }
805}
806
807pub struct HttpServiceHandler<T, S, B, X, U>
809where
810 S: Service<Request>,
811 X: Service<Request>,
812 U: Service<(Request, Framed<T, h1::Codec>)>,
813{
814 pub(super) flow: Rc<HttpFlow<S, X, U>>,
815 pub(super) cfg: ServiceConfig,
816 pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>,
817 _phantom: PhantomData<B>,
818}
819
820impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
821where
822 S: Service<Request>,
823 S::Error: Into<Response<BoxBody>>,
824 X: Service<Request>,
825 X::Error: Into<Response<BoxBody>>,
826 U: Service<(Request, Framed<T, h1::Codec>)>,
827 U::Error: Into<Response<BoxBody>>,
828{
829 pub(super) fn new(
830 cfg: ServiceConfig,
831 service: S,
832 expect: X,
833 upgrade: Option<U>,
834 on_connect_ext: Option<Rc<ConnectCallback<T>>>,
835 ) -> HttpServiceHandler<T, S, B, X, U> {
836 HttpServiceHandler {
837 cfg,
838 on_connect_ext,
839 flow: HttpFlow::new(service, expect, upgrade),
840 _phantom: PhantomData,
841 }
842 }
843
844 pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Response<BoxBody>>> {
845 ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?;
846
847 ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?;
848
849 if let Some(ref upg) = self.flow.upgrade {
850 ready!(upg.poll_ready(cx).map_err(Into::into))?;
851 };
852
853 Poll::Ready(Ok(()))
854 }
855}
856
857pub(super) struct HttpFlow<S, X, U> {
859 pub(super) service: S,
860 pub(super) expect: X,
861 pub(super) upgrade: Option<U>,
862}
863
864impl<S, X, U> HttpFlow<S, X, U> {
865 pub(super) fn new(service: S, expect: X, upgrade: Option<U>) -> Rc<Self> {
866 Rc::new(Self {
867 service,
868 expect,
869 upgrade,
870 })
871 }
872}
873
874impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)>
875 for HttpServiceHandler<T, S, B, X, U>
876where
877 T: AsyncRead + AsyncWrite + Unpin,
878
879 S: Service<Request>,
880 S::Error: Into<Response<BoxBody>> + 'static,
881 S::Future: 'static,
882 S::Response: Into<Response<B>> + 'static,
883
884 B: MessageBody + 'static,
885
886 X: Service<Request, Response = Request>,
887 X::Error: Into<Response<BoxBody>>,
888
889 U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
890 U::Error: fmt::Display + Into<Response<BoxBody>>,
891{
892 type Response = ();
893 type Error = DispatchError;
894 type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
895
896 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
897 self._poll_ready(cx).map_err(|err| {
898 error!("HTTP service readiness error: {:?}", err);
899 DispatchError::Service(err)
900 })
901 }
902
903 fn call(&self, (io, proto, peer_addr): (T, Protocol, Option<net::SocketAddr>)) -> Self::Future {
904 let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
905
906 match proto {
907 #[cfg(feature = "http2")]
908 Protocol::Http2 => HttpServiceHandlerResponse {
909 state: State::H2Handshake {
910 handshake: Some((
911 crate::h2::handshake_with_timeout(io, &self.cfg),
912 self.cfg.clone(),
913 Rc::clone(&self.flow),
914 conn_data,
915 peer_addr,
916 )),
917 },
918 },
919
920 #[cfg(not(feature = "http2"))]
921 Protocol::Http2 => {
922 panic!("HTTP/2 support is disabled (enable with the `http2` feature flag)")
923 }
924
925 Protocol::Http1 => HttpServiceHandlerResponse {
926 state: State::H1 {
927 dispatcher: h1::Dispatcher::new(
928 io,
929 Rc::clone(&self.flow),
930 self.cfg.clone(),
931 peer_addr,
932 conn_data,
933 ),
934 },
935 },
936
937 proto => unimplemented!("Unsupported HTTP version: {:?}.", proto),
938 }
939 }
940}
941
942#[cfg(not(feature = "http2"))]
943pin_project! {
944 #[project = StateProj]
945 enum State<T, S, B, X, U>
946 where
947 T: AsyncRead,
948 T: AsyncWrite,
949 T: Unpin,
950
951 S: Service<Request>,
952 S::Future: 'static,
953 S::Error: Into<Response<BoxBody>>,
954
955 B: MessageBody,
956
957 X: Service<Request, Response = Request>,
958 X::Error: Into<Response<BoxBody>>,
959
960 U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
961 U::Error: fmt::Display,
962 {
963 H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
964 }
965}
966
967#[cfg(feature = "http2")]
968pin_project! {
969 #[project = StateProj]
970 enum State<T, S, B, X, U>
971 where
972 T: AsyncRead,
973 T: AsyncWrite,
974 T: Unpin,
975
976 S: Service<Request>,
977 S::Future: 'static,
978 S::Error: Into<Response<BoxBody>>,
979
980 B: MessageBody,
981
982 X: Service<Request, Response = Request>,
983 X::Error: Into<Response<BoxBody>>,
984
985 U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
986 U::Error: fmt::Display,
987 {
988 H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
989
990 H2 { #[pin] dispatcher: crate::h2::Dispatcher<T, S, B, X, U> },
991
992 H2Handshake {
993 handshake: Option<(
994 crate::h2::HandshakeWithTimeout<T>,
995 ServiceConfig,
996 Rc<HttpFlow<S, X, U>>,
997 OnConnectData,
998 Option<net::SocketAddr>,
999 )>,
1000 },
1001 }
1002}
1003
1004pin_project! {
1005 pub struct HttpServiceHandlerResponse<T, S, B, X, U>
1006 where
1007 T: AsyncRead,
1008 T: AsyncWrite,
1009 T: Unpin,
1010
1011 S: Service<Request>,
1012 S::Error: Into<Response<BoxBody>>,
1013 S::Error: 'static,
1014 S::Future: 'static,
1015 S::Response: Into<Response<B>>,
1016 S::Response: 'static,
1017
1018 B: MessageBody,
1019
1020 X: Service<Request, Response = Request>,
1021 X::Error: Into<Response<BoxBody>>,
1022
1023 U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
1024 U::Error: fmt::Display,
1025 {
1026 #[pin]
1027 state: State<T, S, B, X, U>,
1028 }
1029}
1030
1031impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
1032where
1033 T: AsyncRead + AsyncWrite + Unpin,
1034
1035 S: Service<Request>,
1036 S::Error: Into<Response<BoxBody>> + 'static,
1037 S::Future: 'static,
1038 S::Response: Into<Response<B>> + 'static,
1039
1040 B: MessageBody + 'static,
1041
1042 X: Service<Request, Response = Request>,
1043 X::Error: Into<Response<BoxBody>>,
1044
1045 U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
1046 U::Error: fmt::Display,
1047{
1048 type Output = Result<(), DispatchError>;
1049
1050 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1051 match self.as_mut().project().state.project() {
1052 StateProj::H1 { dispatcher } => dispatcher.poll(cx),
1053
1054 #[cfg(feature = "http2")]
1055 StateProj::H2 { dispatcher } => dispatcher.poll(cx),
1056
1057 #[cfg(feature = "http2")]
1058 StateProj::H2Handshake { handshake: data } => {
1059 match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) {
1060 Ok((conn, timer)) => {
1061 let (_, config, flow, conn_data, peer_addr) = data.take().unwrap();
1062
1063 self.as_mut().project().state.set(State::H2 {
1064 dispatcher: crate::h2::Dispatcher::new(
1065 conn, flow, config, peer_addr, conn_data, timer,
1066 ),
1067 });
1068 self.poll(cx)
1069 }
1070 Err(err) => {
1071 tracing::trace!("H2 handshake error: {}", err);
1072 Poll::Ready(Err(err))
1073 }
1074 }
1075 }
1076 }
1077 }
1078}