Skip to content

Commit e7c6b5a

Browse files
committed
feat(proto,server,service): completely remove server module's reliance on tower::Service
1 parent e2adbb7 commit e7c6b5a

File tree

5 files changed

+23
-147
lines changed

5 files changed

+23
-147
lines changed

src/proto/h1/dispatch.rs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@ pub(crate) trait Dispatch {
3131
cx: &mut task::Context<'_>,
3232
) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
3333
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
34-
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
3534
fn should_poll(&self) -> bool;
3635
}
3736

3837
cfg_server! {
39-
use crate::service::TowerHttpService;
38+
use crate::service::HttpService;
4039

41-
pub(crate) struct Server<S: TowerHttpService<B>, B> {
40+
pub(crate) struct Server<S: HttpService<B>, B> {
4241
in_flight: Pin<Box<Option<S::Future>>>,
4342
pub(crate) service: S,
4443
}
@@ -235,15 +234,6 @@ where
235234
}
236235

237236
fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
238-
// can dispatch receive, or does it still care about, an incoming message?
239-
match ready!(self.dispatch.poll_ready(cx)) {
240-
Ok(()) => (),
241-
Err(()) => {
242-
trace!("dispatch no longer receiving messages");
243-
self.close();
244-
return Poll::Ready(Ok(()));
245-
}
246-
}
247237
// dispatch is ready for a message, try to read one
248238
match ready!(self.conn.poll_read_head(cx)) {
249239
Some(Ok((mut head, body_len, wants))) => {
@@ -454,7 +444,7 @@ impl<'a, T> Drop for OptGuard<'a, T> {
454444
cfg_server! {
455445
impl<S, B> Server<S, B>
456446
where
457-
S: TowerHttpService<B>,
447+
S: HttpService<B>,
458448
{
459449
pub(crate) fn new(service: S) -> Server<S, B> {
460450
Server {
@@ -469,11 +459,11 @@ cfg_server! {
469459
}
470460

471461
// Service is never pinned
472-
impl<S: TowerHttpService<B>, B> Unpin for Server<S, B> {}
462+
impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
473463

474464
impl<S, Bs> Dispatch for Server<S, Body>
475465
where
476-
S: TowerHttpService<Body, ResBody = Bs>,
466+
S: HttpService<Body, ResBody = Bs>,
477467
S::Error: Into<Box<dyn StdError + Send + Sync>>,
478468
Bs: HttpBody,
479469
{
@@ -519,17 +509,6 @@ cfg_server! {
519509
Ok(())
520510
}
521511

522-
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
523-
if self.in_flight.is_some() {
524-
Poll::Pending
525-
} else {
526-
self.service.poll_ready(cx).map_err(|_e| {
527-
// FIXME: return error value.
528-
trace!("service closed");
529-
})
530-
}
531-
}
532-
533512
fn should_poll(&self) -> bool {
534513
self.in_flight.is_some()
535514
}
@@ -631,19 +610,6 @@ cfg_client! {
631610
}
632611
}
633612

634-
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
635-
match self.callback {
636-
Some(ref mut cb) => match cb.poll_canceled(cx) {
637-
Poll::Ready(()) => {
638-
trace!("callback receiver has dropped");
639-
Poll::Ready(Err(()))
640-
}
641-
Poll::Pending => Poll::Ready(Ok(())),
642-
},
643-
None => Poll::Ready(Err(())),
644-
}
645-
}
646-
647613
fn should_poll(&self) -> bool {
648614
self.callback.is_none()
649615
}

src/proto/h2/server.rs

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::headers;
2020
use crate::proto::h2::ping::Recorder;
2121
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
2222
use crate::proto::Dispatched;
23-
use crate::service::TowerHttpService;
23+
use crate::service::HttpService;
2424

2525
use crate::upgrade::{OnUpgrade, Pending, Upgraded};
2626
use crate::{Body, Response};
@@ -76,7 +76,7 @@ impl Default for Config {
7676
pin_project! {
7777
pub(crate) struct Server<T, S, B, E>
7878
where
79-
S: TowerHttpService<Body>,
79+
S: HttpService<Body>,
8080
B: HttpBody,
8181
{
8282
exec: E,
@@ -109,7 +109,7 @@ where
109109
impl<T, S, B, E> Server<T, S, B, E>
110110
where
111111
T: AsyncRead + AsyncWrite + Unpin,
112-
S: TowerHttpService<Body, ResBody = B>,
112+
S: HttpService<Body, ResBody = B>,
113113
S::Error: Into<Box<dyn StdError + Send + Sync>>,
114114
B: HttpBody + 'static,
115115
E: ConnStreamExec<S::Future, B>,
@@ -181,7 +181,7 @@ where
181181
impl<T, S, B, E> Future for Server<T, S, B, E>
182182
where
183183
T: AsyncRead + AsyncWrite + Unpin,
184-
S: TowerHttpService<Body, ResBody = B>,
184+
S: HttpService<Body, ResBody = B>,
185185
S::Error: Into<Box<dyn StdError + Send + Sync>>,
186186
B: HttpBody + 'static,
187187
E: ConnStreamExec<S::Future, B>,
@@ -236,46 +236,14 @@ where
236236
exec: &mut E,
237237
) -> Poll<crate::Result<()>>
238238
where
239-
S: TowerHttpService<Body, ResBody = B>,
239+
S: HttpService<Body, ResBody = B>,
240240
S::Error: Into<Box<dyn StdError + Send + Sync>>,
241241
E: ConnStreamExec<S::Future, B>,
242242
{
243243
if self.closing.is_none() {
244244
loop {
245245
self.poll_ping(cx);
246246

247-
// Check that the service is ready to accept a new request.
248-
//
249-
// - If not, just drive the connection some.
250-
// - If ready, try to accept a new request from the connection.
251-
match service.poll_ready(cx) {
252-
Poll::Ready(Ok(())) => (),
253-
Poll::Pending => {
254-
// use `poll_closed` instead of `poll_accept`,
255-
// in order to avoid accepting a request.
256-
ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
257-
trace!("incoming connection complete");
258-
return Poll::Ready(Ok(()));
259-
}
260-
Poll::Ready(Err(err)) => {
261-
let err = crate::Error::new_user_service(err);
262-
debug!("service closed: {}", err);
263-
264-
let reason = err.h2_reason();
265-
if reason == Reason::NO_ERROR {
266-
// NO_ERROR is only used for graceful shutdowns...
267-
trace!("interpreting NO_ERROR user error as graceful_shutdown");
268-
self.conn.graceful_shutdown();
269-
} else {
270-
trace!("abruptly shutting down with {:?}", reason);
271-
self.conn.abrupt_shutdown(reason);
272-
}
273-
self.closing = Some(err);
274-
break;
275-
}
276-
}
277-
278-
// When the service is ready, accepts an incoming request.
279247
match ready!(self.conn.poll_accept(cx)) {
280248
Some(Ok((req, mut respond))) => {
281249
trace!("incoming request");

src/server/conn.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ cfg_feature! {
7272
use crate::common::Never;
7373
use crate::common::exec::{ConnStreamExec, Exec};
7474
use crate::proto;
75-
use crate::service::TowerHttpService;
75+
use crate::service::HttpService;
7676

7777
pub(super) use self::upgrades::UpgradeableConnection;
7878
}
@@ -123,7 +123,7 @@ pin_project! {
123123
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
124124
pub struct Connection<T, S, E = Exec>
125125
where
126-
S: TowerHttpService<Body>,
126+
S: HttpService<Body>,
127127
{
128128
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
129129
fallback: Fallback<E>,
@@ -151,7 +151,7 @@ pin_project! {
151151
#[project = ProtoServerProj]
152152
pub(super) enum ProtoServer<T, B, S, E = Exec>
153153
where
154-
S: TowerHttpService<Body>,
154+
S: HttpService<Body>,
155155
B: HttpBody,
156156
{
157157
H1 {
@@ -598,7 +598,7 @@ impl<E> Http<E> {
598598
/// ```
599599
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
600600
where
601-
S: TowerHttpService<Body, ResBody = Bd>,
601+
S: HttpService<Body, ResBody = Bd>,
602602
S::Error: Into<Box<dyn StdError + Send + Sync>>,
603603
Bd: HttpBody + 'static,
604604
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -678,7 +678,7 @@ impl<E> Http<E> {
678678
#[cfg(any(feature = "http1", feature = "http2"))]
679679
impl<I, B, S, E> Connection<I, S, E>
680680
where
681-
S: TowerHttpService<Body, ResBody = B>,
681+
S: HttpService<Body, ResBody = B>,
682682
S::Error: Into<Box<dyn StdError + Send + Sync>>,
683683
I: AsyncRead + AsyncWrite + Unpin,
684684
B: HttpBody + 'static,
@@ -848,7 +848,7 @@ where
848848
#[cfg(any(feature = "http1", feature = "http2"))]
849849
impl<I, B, S, E> Future for Connection<I, S, E>
850850
where
851-
S: TowerHttpService<Body, ResBody = B>,
851+
S: HttpService<Body, ResBody = B>,
852852
S::Error: Into<Box<dyn StdError + Send + Sync>>,
853853
I: AsyncRead + AsyncWrite + Unpin + 'static,
854854
B: HttpBody + 'static,
@@ -895,7 +895,7 @@ where
895895
#[cfg(any(feature = "http1", feature = "http2"))]
896896
impl<I, S> fmt::Debug for Connection<I, S>
897897
where
898-
S: TowerHttpService<Body>,
898+
S: HttpService<Body>,
899899
{
900900
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
901901
f.debug_struct("Connection").finish()
@@ -928,7 +928,7 @@ impl Default for ConnectionMode {
928928
impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
929929
where
930930
T: AsyncRead + AsyncWrite + Unpin,
931-
S: TowerHttpService<Body, ResBody = B>,
931+
S: HttpService<Body, ResBody = B>,
932932
S::Error: Into<Box<dyn StdError + Send + Sync>>,
933933
B: HttpBody + 'static,
934934
B::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -963,14 +963,14 @@ mod upgrades {
963963
#[allow(missing_debug_implementations)]
964964
pub struct UpgradeableConnection<T, S, E>
965965
where
966-
S: TowerHttpService<Body>,
966+
S: HttpService<Body>,
967967
{
968968
pub(super) inner: Connection<T, S, E>,
969969
}
970970

971971
impl<I, B, S, E> UpgradeableConnection<I, S, E>
972972
where
973-
S: TowerHttpService<Body, ResBody = B>,
973+
S: HttpService<Body, ResBody = B>,
974974
S::Error: Into<Box<dyn StdError + Send + Sync>>,
975975
I: AsyncRead + AsyncWrite + Unpin,
976976
B: HttpBody + 'static,
@@ -988,7 +988,7 @@ mod upgrades {
988988

989989
impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
990990
where
991-
S: TowerHttpService<Body, ResBody = B>,
991+
S: HttpService<Body, ResBody = B>,
992992
S::Error: Into<Box<dyn StdError + Send + Sync>>,
993993
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
994994
B: HttpBody + 'static,

src/service/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
2424
mod http;
2525
mod service;
26-
mod tower_http;
2726
mod util;
2827

29-
pub(super) use self::tower_http::TowerHttpService;
28+
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "server"))]
29+
pub(super) use self::http::HttpService;
3030

3131
pub use self::util::service_fn;

src/service/tower_http.rs

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)