From df6619a492aeb9c9fc81ee6fbf04edbaba71d239 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 28 Jul 2022 22:54:10 +0200 Subject: [PATCH 01/12] chore: remove reexport of tower_service::Service as hyper::service::Service --- examples/service_struct_impl.rs | 1 + src/server/conn.rs | 2 +- src/service/mod.rs | 2 -- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index 781e1e7a16..bd4106d501 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -4,6 +4,7 @@ use hyper::server::conn::Http; use hyper::service::Service; use hyper::{Recv, Request, Response}; use tokio::net::TcpListener; +use tower::Service; use std::future::Future; use std::net::SocketAddr; diff --git a/src/server/conn.rs b/src/server/conn.rs index f7d9a90784..14f88e3895 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -605,7 +605,7 @@ impl Http { /// # Example /// /// ``` - /// # use hyper::{Recv, Request, Response}; + /// # use hyper::{Body, Recv, Request, Response}; /// # use hyper::service::Service; /// # use hyper::server::conn::Http; /// # use tokio::io::{AsyncRead, AsyncWrite}; diff --git a/src/service/mod.rs b/src/service/mod.rs index c82939d632..18d8b3bbce 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -21,8 +21,6 @@ //! if you need to implement `Service` for a type manually, you can follow the example //! in `service_struct_impl.rs`. -pub use tower_service::Service; - mod http; mod util; From fede1ccbd4ac3315f4c46529ee2e3487a50bcc26 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 28 Jul 2022 22:55:32 +0200 Subject: [PATCH 02/12] feat(service): add Service trait --- src/service/mod.rs | 1 + src/service/service.rs | 43 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 src/service/service.rs diff --git a/src/service/mod.rs b/src/service/mod.rs index 18d8b3bbce..1e6f82d9ce 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -22,6 +22,7 @@ //! in `service_struct_impl.rs`. mod http; +mod service; mod util; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000000..6db69cab0b --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,43 @@ +use std::future::Future; + +/// An asynchronous function from a `Request` to a `Response`. +/// +/// The `Service` trait is a simplified interface making it easy to write +/// network applications in a modular and reusable way, decoupled from the +/// underlying protocol. +/// +/// # Functional +/// +/// A `Service` is a function of a `Request`. It immediately returns a +/// `Future` representing the eventual completion of processing the +/// request. The actual request processing may happen at any time in the +/// future, on any thread or executor. The processing may depend on calling +/// other services. At some point in the future, the processing will complete, +/// and the `Future` will resolve to a response or error. +/// +/// At a high level, the `Service::call` function represents an RPC request. The +/// `Service` value can be a server or a client. +pub trait Service { + /// Responses given by the service. + type Response; + + /// Errors produced by the service. + type Error; + + /// The future response value. + type Future: Future>; + + /// Process the request and return the response asynchronously. + /// + /// This function is expected to be callable off task. As such, + /// implementations should take care to not call `poll_ready`. + /// + /// Before dispatching a request, `poll_ready` must be called and return + /// `Poll::Ready(Ok(()))`. + /// + /// # Panics + /// + /// Implementations are permitted to panic if `call` is invoked without + /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. + fn call(&mut self, req: Request) -> Self::Future; +} From ca17e6f08c9b06053ae0fb4ddc51f0d1ba2c9efc Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 28 Jul 2022 23:07:12 +0200 Subject: [PATCH 03/12] chore: rename service::http::HttpService to service::tower_http::TowerHttpService --- src/proto/h1/dispatch.rs | 10 +++++----- src/proto/h2/server.rs | 10 +++++----- src/server/conn.rs | 24 ++++++++++++------------ src/service/mod.rs | 5 ++--- src/service/{http.rs => tower_http.rs} | 4 ++-- 5 files changed, 26 insertions(+), 27 deletions(-) rename src/service/{http.rs => tower_http.rs} (93%) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 5c75f302fa..bc709ae7f5 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -34,9 +34,9 @@ pub(crate) trait Dispatch { } cfg_server! { - use crate::service::HttpService; + use crate::service::TowerHttpService; - pub(crate) struct Server, B> { + pub(crate) struct Server, B> { in_flight: Pin>>, pub(crate) service: S, } @@ -446,7 +446,7 @@ impl<'a, T> Drop for OptGuard<'a, T> { cfg_server! { impl Server where - S: HttpService, + S: TowerHttpService, { pub(crate) fn new(service: S) -> Server { Server { @@ -461,11 +461,11 @@ cfg_server! { } // Service is never pinned - impl, B> Unpin for Server {} + impl, B> Unpin for Server {} impl Dispatch for Server where - S: HttpService, + S: TowerHttpService, S::Error: Into>, Bs: Body, { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index f2c2e7d763..0201d0cb9d 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -21,7 +21,7 @@ use crate::headers; use crate::proto::h2::ping::Recorder; use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; use crate::proto::Dispatched; -use crate::service::HttpService; +use crate::service::TowerHttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; use crate::{Recv, Response}; @@ -77,7 +77,7 @@ impl Default for Config { pin_project! { pub(crate) struct Server where - S: HttpService, + S: TowerHttpService, B: Body, { exec: E, @@ -111,7 +111,7 @@ where impl Server where T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, + S: TowerHttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -190,7 +190,7 @@ where impl Future for Server where T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, + S: TowerHttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -249,7 +249,7 @@ where exec: &mut E, ) -> Poll> where - S: HttpService, + S: TowerHttpService, S::Error: Into>, E: ConnStreamExec, { diff --git a/src/server/conn.rs b/src/server/conn.rs index 14f88e3895..db30bc9daa 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -75,7 +75,7 @@ cfg_feature! { use crate::common::Never; use crate::common::exec::{ConnStreamExec, Exec}; use crate::proto; - use crate::service::HttpService; + use crate::service::TowerHttpService; pub(super) use self::upgrades::UpgradeableConnection; } @@ -127,7 +127,7 @@ pin_project! { #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Connection where - S: HttpService, + S: TowerHttpService, { pub(super) conn: Option>, fallback: Fallback, @@ -155,7 +155,7 @@ pin_project! { #[project = ProtoServerProj] pub(super) enum ProtoServer where - S: HttpService, + S: TowerHttpService, B: Body, { H1 { @@ -605,7 +605,7 @@ impl Http { /// # Example /// /// ``` - /// # use hyper::{Body, Recv, Request, Response}; + /// # use hyper::{Recv, Request, Response}; /// # use hyper::service::Service; /// # use hyper::server::conn::Http; /// # use tokio::io::{AsyncRead, AsyncWrite}; @@ -627,7 +627,7 @@ impl Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: HttpService, + S: TowerHttpService, S::Error: Into>, Bd: Body + 'static, Bd::Error: Into>, @@ -720,7 +720,7 @@ impl Http { #[cfg(any(feature = "http1", feature = "http2"))] impl Connection where - S: HttpService, + S: TowerHttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -901,7 +901,7 @@ where #[cfg(any(feature = "http1", feature = "http2"))] impl Future for Connection where - S: HttpService, + S: TowerHttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, @@ -948,7 +948,7 @@ where #[cfg(any(feature = "http1", feature = "http2"))] impl fmt::Debug for Connection where - S: HttpService, + S: TowerHttpService, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() @@ -981,7 +981,7 @@ impl Default for ConnectionMode { impl Future for ProtoServer where T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, + S: TowerHttpService, S::Error: Into>, B: Body + 'static, B::Error: Into>, @@ -1016,14 +1016,14 @@ mod upgrades { #[allow(missing_debug_implementations)] pub struct UpgradeableConnection where - S: HttpService, + S: TowerHttpService, { pub(super) inner: Connection, } impl UpgradeableConnection where - S: HttpService, + S: TowerHttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -1041,7 +1041,7 @@ mod upgrades { impl Future for UpgradeableConnection where - S: HttpService, + S: TowerHttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, diff --git a/src/service/mod.rs b/src/service/mod.rs index 1e6f82d9ce..75c5b8e489 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -21,11 +21,10 @@ //! if you need to implement `Service` for a type manually, you can follow the example //! in `service_struct_impl.rs`. -mod http; +mod tower_http; mod service; mod util; -#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] -pub(super) use self::http::HttpService; +pub(super) use self::tower_http::TowerHttpService; pub use self::util::service_fn; diff --git a/src/service/http.rs b/src/service/tower_http.rs similarity index 93% rename from src/service/http.rs rename to src/service/tower_http.rs index ff05586498..5c877bb636 100644 --- a/src/service/http.rs +++ b/src/service/tower_http.rs @@ -5,7 +5,7 @@ use crate::common::{task, Future, Poll}; use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. -pub trait HttpService: sealed::Sealed { +pub trait TowerHttpService: sealed::Sealed { /// The `Body` body of the `http::Response`. type ResBody: Body; @@ -26,7 +26,7 @@ pub trait HttpService: sealed::Sealed { fn call(&mut self, req: Request) -> Self::Future; } -impl HttpService for T +impl TowerHttpService for T where T: tower_service::Service, Response = Response>, B2: Body, From ab78dbc46438122f0c423745ddb82a59f8594f5d Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 28 Jul 2022 23:09:45 +0200 Subject: [PATCH 04/12] feat(service): add service::HttpService that extends service::Service --- src/service/http.rs | 52 +++++++++++++++++++++++++++++++++++++++++++++ src/service/mod.rs | 3 ++- 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 src/service/http.rs diff --git a/src/service/http.rs b/src/service/http.rs new file mode 100644 index 0000000000..3e0933658e --- /dev/null +++ b/src/service/http.rs @@ -0,0 +1,52 @@ +use std::error::Error as StdError; + +use crate::body::HttpBody; +use crate::common::Future; +use crate::service::service::Service; +use crate::{Request, Response}; + +/// An asynchronous function from `Request` to `Response`. +pub trait HttpService: sealed::Sealed { + /// The `HttpBody` body of the `http::Response`. + type ResBody: HttpBody; + + /// The error type that can occur within this `Service`. + /// + /// Note: Returning an `Error` to a hyper server will cause the connection + /// to be abruptly aborted. In most cases, it is better to return a `Response` + /// with a 4xx or 5xx status code. + type Error: Into>; + + /// The `Future` returned by this `Service`. + type Future: Future, Self::Error>>; + + #[doc(hidden)] + fn call(&mut self, req: Request) -> Self::Future; +} + +impl HttpService for T +where + T: Service, Response = Response>, + B2: HttpBody, + T::Error: Into>, +{ + type ResBody = B2; + + type Error = T::Error; + type Future = T::Future; + + fn call(&mut self, req: Request) -> Self::Future { + Service::call(self, req) + } +} + +impl sealed::Sealed for T +where + T: Service, Response = Response>, + B2: HttpBody, +{ +} + +mod sealed { + pub trait Sealed {} +} diff --git a/src/service/mod.rs b/src/service/mod.rs index 75c5b8e489..c022f513cd 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -21,8 +21,9 @@ //! if you need to implement `Service` for a type manually, you can follow the example //! in `service_struct_impl.rs`. -mod tower_http; +mod http; mod service; +mod tower_http; mod util; pub(super) use self::tower_http::TowerHttpService; From 0e077c58d3f3dfd17d0d8def990ff105aaaf44f8 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 18 Aug 2022 20:40:39 +0200 Subject: [PATCH 05/12] feat(proto,server,service): completely remove server module's reliance on tower::Service --- src/proto/h1/dispatch.rs | 44 ++++------------------------- src/proto/h2/server.rs | 42 ++++------------------------ src/server/conn.rs | 22 +++++++-------- src/service/http.rs | 10 +++---- src/service/mod.rs | 4 +-- src/service/tower_http.rs | 58 --------------------------------------- 6 files changed, 28 insertions(+), 152 deletions(-) delete mode 100644 src/service/tower_http.rs diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index bc709ae7f5..cbbde9e746 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -29,14 +29,13 @@ pub(crate) trait Dispatch { cx: &mut task::Context<'_>, ) -> Poll>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Recv)>) -> crate::Result<()>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } cfg_server! { - use crate::service::TowerHttpService; + use crate::service::HttpService; - pub(crate) struct Server, B> { + pub(crate) struct Server, B> { in_flight: Pin>>, pub(crate) service: S, } @@ -233,15 +232,6 @@ where } fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll> { - // can dispatch receive, or does it still care about, an incoming message? - match ready!(self.dispatch.poll_ready(cx)) { - Ok(()) => (), - Err(()) => { - trace!("dispatch no longer receiving messages"); - self.close(); - return Poll::Ready(Ok(())); - } - } // dispatch is ready for a message, try to read one match ready!(self.conn.poll_read_head(cx)) { Some(Ok((mut head, body_len, wants))) => { @@ -446,7 +436,7 @@ impl<'a, T> Drop for OptGuard<'a, T> { cfg_server! { impl Server where - S: TowerHttpService, + S: HttpService, { pub(crate) fn new(service: S) -> Server { Server { @@ -461,11 +451,11 @@ cfg_server! { } // Service is never pinned - impl, B> Unpin for Server {} + impl, B> Unpin for Server {} impl Dispatch for Server where - S: TowerHttpService, + S: HttpService, S::Error: Into>, Bs: Body, { @@ -511,17 +501,6 @@ cfg_server! { Ok(()) } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - if self.in_flight.is_some() { - Poll::Pending - } else { - self.service.poll_ready(cx).map_err(|_e| { - // FIXME: return error value. - trace!("service closed"); - }) - } - } - fn should_poll(&self) -> bool { self.in_flight.is_some() } @@ -623,19 +602,6 @@ cfg_client! { } } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - match self.callback { - Some(ref mut cb) => match cb.poll_canceled(cx) { - Poll::Ready(()) => { - trace!("callback receiver has dropped"); - Poll::Ready(Err(())) - } - Poll::Pending => Poll::Ready(Ok(())), - }, - None => Poll::Ready(Err(())), - } - } - fn should_poll(&self) -> bool { self.callback.is_none() } diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 0201d0cb9d..c0e1df47be 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -21,7 +21,7 @@ use crate::headers; use crate::proto::h2::ping::Recorder; use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; use crate::proto::Dispatched; -use crate::service::TowerHttpService; +use crate::service::HttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; use crate::{Recv, Response}; @@ -77,7 +77,7 @@ impl Default for Config { pin_project! { pub(crate) struct Server where - S: TowerHttpService, + S: HttpService, B: Body, { exec: E, @@ -111,7 +111,7 @@ where impl Server where T: AsyncRead + AsyncWrite + Unpin, - S: TowerHttpService, + S: HttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -190,7 +190,7 @@ where impl Future for Server where T: AsyncRead + AsyncWrite + Unpin, - S: TowerHttpService, + S: HttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -249,7 +249,7 @@ where exec: &mut E, ) -> Poll> where - S: TowerHttpService, + S: HttpService, S::Error: Into>, E: ConnStreamExec, { @@ -257,38 +257,6 @@ where loop { self.poll_ping(cx); - // Check that the service is ready to accept a new request. - // - // - If not, just drive the connection some. - // - If ready, try to accept a new request from the connection. - match service.poll_ready(cx) { - Poll::Ready(Ok(())) => (), - Poll::Pending => { - // use `poll_closed` instead of `poll_accept`, - // in order to avoid accepting a request. - ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; - trace!("incoming connection complete"); - return Poll::Ready(Ok(())); - } - Poll::Ready(Err(err)) => { - let err = crate::Error::new_user_service(err); - debug!("service closed: {}", err); - - let reason = err.h2_reason(); - if reason == Reason::NO_ERROR { - // NO_ERROR is only used for graceful shutdowns... - trace!("interpreting NO_ERROR user error as graceful_shutdown"); - self.conn.graceful_shutdown(); - } else { - trace!("abruptly shutting down with {:?}", reason); - self.conn.abrupt_shutdown(reason); - } - self.closing = Some(err); - break; - } - } - - // When the service is ready, accepts an incoming request. match ready!(self.conn.poll_accept(cx)) { Some(Ok((req, mut respond))) => { trace!("incoming request"); diff --git a/src/server/conn.rs b/src/server/conn.rs index db30bc9daa..f7d9a90784 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -75,7 +75,7 @@ cfg_feature! { use crate::common::Never; use crate::common::exec::{ConnStreamExec, Exec}; use crate::proto; - use crate::service::TowerHttpService; + use crate::service::HttpService; pub(super) use self::upgrades::UpgradeableConnection; } @@ -127,7 +127,7 @@ pin_project! { #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Connection where - S: TowerHttpService, + S: HttpService, { pub(super) conn: Option>, fallback: Fallback, @@ -155,7 +155,7 @@ pin_project! { #[project = ProtoServerProj] pub(super) enum ProtoServer where - S: TowerHttpService, + S: HttpService, B: Body, { H1 { @@ -627,7 +627,7 @@ impl Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: TowerHttpService, + S: HttpService, S::Error: Into>, Bd: Body + 'static, Bd::Error: Into>, @@ -720,7 +720,7 @@ impl Http { #[cfg(any(feature = "http1", feature = "http2"))] impl Connection where - S: TowerHttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -901,7 +901,7 @@ where #[cfg(any(feature = "http1", feature = "http2"))] impl Future for Connection where - S: TowerHttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, @@ -948,7 +948,7 @@ where #[cfg(any(feature = "http1", feature = "http2"))] impl fmt::Debug for Connection where - S: TowerHttpService, + S: HttpService, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() @@ -981,7 +981,7 @@ impl Default for ConnectionMode { impl Future for ProtoServer where T: AsyncRead + AsyncWrite + Unpin, - S: TowerHttpService, + S: HttpService, S::Error: Into>, B: Body + 'static, B::Error: Into>, @@ -1016,14 +1016,14 @@ mod upgrades { #[allow(missing_debug_implementations)] pub struct UpgradeableConnection where - S: TowerHttpService, + S: HttpService, { pub(super) inner: Connection, } impl UpgradeableConnection where - S: TowerHttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -1041,7 +1041,7 @@ mod upgrades { impl Future for UpgradeableConnection where - S: TowerHttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, diff --git a/src/service/http.rs b/src/service/http.rs index 3e0933658e..dbbdaa107b 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,14 +1,14 @@ use std::error::Error as StdError; -use crate::body::HttpBody; +use crate::body::Body; use crate::common::Future; use crate::service::service::Service; use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. pub trait HttpService: sealed::Sealed { - /// The `HttpBody` body of the `http::Response`. - type ResBody: HttpBody; + /// The `Body` body of the `http::Response`. + type ResBody: Body; /// The error type that can occur within this `Service`. /// @@ -27,7 +27,7 @@ pub trait HttpService: sealed::Sealed { impl HttpService for T where T: Service, Response = Response>, - B2: HttpBody, + B2: Body, T::Error: Into>, { type ResBody = B2; @@ -43,7 +43,7 @@ where impl sealed::Sealed for T where T: Service, Response = Response>, - B2: HttpBody, + B2: Body, { } diff --git a/src/service/mod.rs b/src/service/mod.rs index c022f513cd..1e6f82d9ce 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -23,9 +23,9 @@ mod http; mod service; -mod tower_http; mod util; -pub(super) use self::tower_http::TowerHttpService; +#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] +pub(super) use self::http::HttpService; pub use self::util::service_fn; diff --git a/src/service/tower_http.rs b/src/service/tower_http.rs deleted file mode 100644 index 5c877bb636..0000000000 --- a/src/service/tower_http.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::error::Error as StdError; - -use crate::body::Body; -use crate::common::{task, Future, Poll}; -use crate::{Request, Response}; - -/// An asynchronous function from `Request` to `Response`. -pub trait TowerHttpService: sealed::Sealed { - /// The `Body` body of the `http::Response`. - type ResBody: Body; - - /// The error type that can occur within this `Service`. - /// - /// Note: Returning an `Error` to a hyper server will cause the connection - /// to be abruptly aborted. In most cases, it is better to return a `Response` - /// with a 4xx or 5xx status code. - type Error: Into>; - - /// The `Future` returned by this `Service`. - type Future: Future, Self::Error>>; - - #[doc(hidden)] - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; - - #[doc(hidden)] - fn call(&mut self, req: Request) -> Self::Future; -} - -impl TowerHttpService for T -where - T: tower_service::Service, Response = Response>, - B2: Body, - T::Error: Into>, -{ - type ResBody = B2; - - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - tower_service::Service::poll_ready(self, cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - tower_service::Service::call(self, req) - } -} - -impl sealed::Sealed for T -where - T: tower_service::Service, Response = Response>, - B2: Body, -{ -} - -mod sealed { - pub trait Sealed {} -} From 7e298fe69473c6420777ac73bdbd80ab588f3440 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Fri, 19 Aug 2022 16:02:06 +0200 Subject: [PATCH 06/12] chore: fix all examples and tests --- examples/service_struct_impl.rs | 6 --- src/service/mod.rs | 2 + src/service/util.rs | 10 ++-- tests/server.rs | 87 ++------------------------------- 4 files changed, 9 insertions(+), 96 deletions(-) diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index bd4106d501..ad73c6855c 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -4,12 +4,10 @@ use hyper::server::conn::Http; use hyper::service::Service; use hyper::{Recv, Request, Response}; use tokio::net::TcpListener; -use tower::Service; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use std::task::{Context, Poll}; type Counter = i32; @@ -43,10 +41,6 @@ impl Service> for Svc { type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: Request) -> Self::Future { fn mk_response(s: String) -> Result>, hyper::Error> { Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap()) diff --git a/src/service/mod.rs b/src/service/mod.rs index 1e6f82d9ce..dccccd3cc6 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -27,5 +27,7 @@ mod util; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] pub(super) use self::http::HttpService; +#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] +pub use self::service::Service; pub use self::util::service_fn; diff --git a/src/service/util.rs b/src/service/util.rs index f6dda7711f..a41945951c 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -3,7 +3,8 @@ use std::fmt; use std::marker::PhantomData; use crate::body::Body; -use crate::common::{task, Future, Poll}; +use crate::common::Future; +use crate::service::service::Service; use crate::{Request, Response}; /// Create a `Service` from a function. @@ -43,8 +44,7 @@ pub struct ServiceFn { _req: PhantomData, } -impl tower_service::Service> - for ServiceFn +impl Service> for ServiceFn where F: FnMut(Request) -> Ret, ReqBody: Body, @@ -56,10 +56,6 @@ where type Error = E; type Future = Ret; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: Request) -> Self::Future { (self.f)(req) } diff --git a/tests/server.rs b/tests/server.rs index c294a70f21..0486357a75 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -29,7 +29,7 @@ use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpSt use hyper::body::Body; use hyper::server::conn::Http; -use hyper::service::service_fn; +use hyper::service::{service_fn, Service}; use hyper::{Method, Recv, Request, Response, StatusCode, Uri, Version}; mod support; @@ -2310,77 +2310,6 @@ fn http2_body_user_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } -struct Http2ReadyErrorSvc; - -impl tower_service::Service> for Http2ReadyErrorSvc { - type Response = Response; - type Error = h2::Error; - type Future = Box< - dyn futures_core::Future> - + Send - + Sync - + Unpin, - >; - - fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Err::<(), _>(h2::Error::from( - h2::Reason::INADEQUATE_SECURITY, - ))) - } - - fn call(&mut self, _: hyper::Request) -> Self::Future { - unreachable!("poll_ready error should have shutdown conn"); - } -} - -#[tokio::test] -#[ignore] // sometimes ECONNRESET wins the race -async fn http2_service_poll_ready_error_sends_goaway() { - use std::error::Error; - - let _ = pretty_env_logger::try_init(); - - let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) - .await - .unwrap(); - - let addr_str = format!("http://{}", listener.local_addr().unwrap()); - - tokio::task::spawn(async move { - loop { - tokio::select! { - res = listener.accept() => { - let (stream, _) = res.unwrap(); - - tokio::task::spawn(async move { - let mut http = Http::new(); - http.http2_only(true); - - let service = Http2ReadyErrorSvc; - http.serve_connection(stream, service).await.unwrap(); - }); - } - } - } - }); - - let uri = addr_str.parse().expect("server addr should parse"); - let err = dbg!(TestClient::new() - .http2_only() - .get(uri) - .await - .expect_err("client.get should fail")); - - // client request should have gotten the specific GOAWAY error... - let h2_err = err - .source() - .expect("source") - .downcast_ref::() - .expect("downcast"); - - assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); -} - #[test] fn skips_content_length_for_304_responses() { let server = serve(); @@ -2789,15 +2718,11 @@ enum Msg { End, } -impl tower_service::Service> for TestService { +impl Service> for TestService { type Response = Response; type Error = BoxError; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - fn call(&mut self, mut req: Request) -> Self::Future { let tx = self.tx.clone(); let replies = self.reply.clone(); @@ -2856,22 +2781,18 @@ const HELLO: &str = "hello"; struct HelloWorld; -impl tower_service::Service> for HelloWorld { +impl Service> for HelloWorld { type Response = Response>; type Error = hyper::Error; type Future = future::Ready>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Ok(()).into() - } - fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(Full::new(HELLO.into())); future::ok(response) } } -fn unreachable_service() -> impl tower_service::Service< +fn unreachable_service() -> impl Service< http::Request, Response = http::Response, Error = BoxError, From 72927a653f644b59ea7cb6364dfbe2830108f2f5 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Fri, 19 Aug 2022 19:53:06 +0200 Subject: [PATCH 07/12] feat(client): implement hyper::service::Service instead of tower::Service on client::conn::SendRequest --- Cargo.toml | 2 -- src/client/conn/mod.rs | 9 +-------- src/service/mod.rs | 5 ++++- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11d15ce3b0..0126eae8ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ h2 = { version = "0.3.9", optional = true } itoa = "1" tracing = { version = "0.1", default-features = false, features = ["std"] } pin-project-lite = "0.2.4" -tower-service = "0.3" tokio = { version = "1", features = ["sync"] } want = "0.3" @@ -65,7 +64,6 @@ tokio = { version = "1", features = [ ] } tokio-test = "0.4" tokio-util = { version = "0.7", features = ["codec"] } -tower = { version = "0.4", features = ["make", "util"] } url = "2.2" [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index ae5ce15b71..81d1b80068 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -41,9 +41,6 @@ //! let response = request_sender.send_request(request).await?; //! assert!(response.status() == StatusCode::OK); //! -//! // To send via the same connection again, it may not work as it may not be ready, -//! // so we have to wait until the request_sender becomes ready. -//! request_sender.ready().await?; //! let request = Request::builder() //! .header("Host", "example.com") //! .method("GET") @@ -69,7 +66,6 @@ use futures_util::future; use httparse::ParserConfig; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service::Service; use tracing::{debug, trace}; use super::dispatch; @@ -86,6 +82,7 @@ use crate::rt::Executor; use crate::upgrade::Upgraded; use crate::{Recv, Request, Response}; use crate::{common::time::Time, rt::Timer}; +use crate::service::Service; #[cfg(feature = "http1")] pub mod http1; @@ -274,10 +271,6 @@ where type Error = crate::Error; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.poll_ready(cx) - } - fn call(&mut self, req: Request) -> Self::Future { self.send_request(req) } diff --git a/src/service/mod.rs b/src/service/mod.rs index dccccd3cc6..d149acf063 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -27,7 +27,10 @@ mod util; #[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] pub(super) use self::http::HttpService; -#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))] +#[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "server", feature = "client") +))] pub use self::service::Service; pub use self::util::service_fn; From 82f5fa54def4a8dfb4874284d0a800208ce9face Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Fri, 19 Aug 2022 21:06:36 +0200 Subject: [PATCH 08/12] chore(service): remove mentions of poll_ready in Service doc-strings --- src/service/service.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/service/service.rs b/src/service/service.rs index 6db69cab0b..b5de9bec20 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -28,16 +28,5 @@ pub trait Service { type Future: Future>; /// Process the request and return the response asynchronously. - /// - /// This function is expected to be callable off task. As such, - /// implementations should take care to not call `poll_ready`. - /// - /// Before dispatching a request, `poll_ready` must be called and return - /// `Poll::Ready(Ok(()))`. - /// - /// # Panics - /// - /// Implementations are permitted to panic if `call` is invoked without - /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. fn call(&mut self, req: Request) -> Self::Future; } From 8c121b724118d8e85b0227077b74167bed560cbf Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Sat, 20 Aug 2022 10:43:01 +0200 Subject: [PATCH 09/12] chore(client): remove Service implementation for client::conn::SendRequest --- src/client/conn/mod.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 81d1b80068..5cff769968 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -82,7 +82,6 @@ use crate::rt::Executor; use crate::upgrade::Upgraded; use crate::{Recv, Request, Response}; use crate::{common::time::Time, rt::Timer}; -use crate::service::Service; #[cfg(feature = "http1")] pub mod http1; @@ -263,19 +262,6 @@ where } } -impl Service> for SendRequest -where - B: Body + 'static, -{ - type Response = Response; - type Error = crate::Error; - type Future = ResponseFuture; - - fn call(&mut self, req: Request) -> Self::Future { - self.send_request(req) - } -} - impl fmt::Debug for SendRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendRequest").finish() From 1c553e5b850567535c7b01a090f174fdef67e060 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Mon, 5 Sep 2022 23:06:45 +0200 Subject: [PATCH 10/12] feat(proto): add poll_ready back to Dispatch trait, implement it simplified for both Client and Server --- src/proto/h1/dispatch.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index cbbde9e746..49a53463ea 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -29,6 +29,7 @@ pub(crate) trait Dispatch { cx: &mut task::Context<'_>, ) -> Poll>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Recv)>) -> crate::Result<()>; + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -501,6 +502,14 @@ cfg_server! { Ok(()) } + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + if self.in_flight.is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + fn should_poll(&self) -> bool { self.in_flight.is_some() } @@ -602,6 +611,10 @@ cfg_client! { } } + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn should_poll(&self) -> bool { self.callback.is_none() } From e2804d2714be1414da81bc0fc82ccd37bd245b06 Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Wed, 7 Sep 2022 02:02:36 +0200 Subject: [PATCH 11/12] chore: revert full Dispatch.poll_ready implementation for Client and fix error in doc-test --- src/client/conn/mod.rs | 1 - src/proto/h1/dispatch.rs | 13 +++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 5cff769968..7a2f3bc3a6 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -18,7 +18,6 @@ //! use http_body_util::Empty; //! use hyper::client::conn; //! use tokio::net::TcpStream; -//! use tower::ServiceExt; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 49a53463ea..8995ddc70c 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -611,8 +611,17 @@ cfg_client! { } } - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + match self.callback { + Some(ref mut cb) => match cb.poll_canceled(cx) { + Poll::Ready(()) => { + trace!("callback receiver has dropped"); + Poll::Ready(Err(())) + } + Poll::Pending => Poll::Ready(Ok(())), + }, + None => Poll::Ready(Err(())), + } } fn should_poll(&self) -> bool { From 40adb7f014e17333d0a71e2a6713206975632eeb Mon Sep 17 00:00:00 2001 From: Tomek Karwowski Date: Thu, 8 Sep 2022 20:40:40 +0200 Subject: [PATCH 12/12] chore(proto): revert check in Dispatcher.poll_read_head with updated comment wording --- src/proto/h1/dispatch.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 8995ddc70c..8605f246d5 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -233,6 +233,16 @@ where } fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll> { + // can dispatch receive, or does it still care about other incoming message? + match ready!(self.dispatch.poll_ready(cx)) { + Ok(()) => (), + Err(()) => { + trace!("dispatch no longer receiving messages"); + self.close(); + return Poll::Ready(Ok(())); + } + } + // dispatch is ready for a message, try to read one match ready!(self.conn.poll_read_head(cx)) { Some(Ok((mut head, body_len, wants))) => {