Skip to content

Commit ce05581

Browse files
authored
Merge pull request #7 from jtgeibel/new-body
Add support for changes to `conduit::Body`
2 parents 12c6f74 + eeb37d1 commit ce05581

File tree

7 files changed

+79
-37
lines changed

7 files changed

+79
-37
lines changed

Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "conduit-hyper"
3-
version = "0.3.0-alpha.1"
3+
version = "0.3.0-alpha.2"
44
authors = ["Justin Geibel <[email protected]>"]
55
license = "MIT OR Apache-2.0"
66
description = "Host a conduit based web application on a hyper server"
@@ -9,15 +9,15 @@ readme = "README.md"
99
edition = "2018"
1010

1111
[dependencies]
12-
conduit = "0.9.0-alpha.1"
12+
conduit = "0.9.0-alpha.2"
1313
futures = "0.3"
1414
hyper = "0.13"
1515
http = "0.2"
1616
tracing = { version = "0.1", features = ["log"] }
17-
tokio = { version = "0.2", features = ["blocking", "rt-threaded"] }
17+
tokio = { version = "0.2", features = ["blocking", "fs", "rt-threaded", "stream"] }
1818
tower-service = "0.3"
1919

2020
[dev-dependencies]
21-
conduit-router = "0.9.0-alpha.1"
21+
conduit-router = "0.9.0-alpha.2"
2222
env_logger = "0.7"
2323
tokio = { version = "0.2", features = ["macros"] }

examples/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![deny(clippy::all)]
22

3-
use conduit::{header, static_to_body, Handler, RequestExt, Response, ResponseResult};
3+
use conduit::{header, Body, Handler, RequestExt, Response, ResponseResult};
44
use conduit_hyper::Server;
55
use conduit_router::RouteBuilder;
66

@@ -35,7 +35,7 @@ fn endpoint(_: &mut dyn RequestExt) -> ResponseResult<http::Error> {
3535
Response::builder()
3636
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
3737
.header(header::CONTENT_LENGTH, body.len())
38-
.body(static_to_body(body))
38+
.body(Body::from_static(body))
3939
}
4040

4141
fn panic(_: &mut dyn RequestExt) -> ResponseResult<http::Error> {

src/file_stream.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::task::{Context, Poll};
2+
use std::{io::Error, pin::Pin};
3+
4+
use hyper::body::{Body, Bytes};
5+
use tokio::{fs::File, io::AsyncRead, stream::Stream};
6+
7+
const BUFFER_SIZE: usize = 8 * 1024;
8+
9+
pub struct FileStream {
10+
file: File,
11+
buffer: Box<[u8; BUFFER_SIZE]>,
12+
}
13+
14+
impl FileStream {
15+
pub fn from_std(file: std::fs::File) -> Self {
16+
let buffer = Box::new([0; BUFFER_SIZE]);
17+
let file = File::from_std(file);
18+
Self { file, buffer }
19+
}
20+
21+
pub fn into_streamed_body(self) -> Body {
22+
Body::wrap_stream(self)
23+
}
24+
}
25+
26+
impl Stream for FileStream {
27+
type Item = Result<Bytes, Error>;
28+
29+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30+
let Self {
31+
ref mut file,
32+
ref mut buffer,
33+
} = *self;
34+
match Pin::new(file).poll_read(cx, &mut buffer[..]) {
35+
Poll::Ready(Ok(0)) => Poll::Ready(None),
36+
Poll::Ready(Ok(size)) => Poll::Ready(Some(Ok(self.buffer[..size].to_owned().into()))),
37+
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
38+
Poll::Pending => Poll::Pending,
39+
}
40+
}
41+
}

src/lib.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@
3535
//! }
3636
//! #
3737
//! # use std::{error, io};
38-
//! # use conduit::{box_error, static_to_body, Response, RequestExt, HandlerResult};
38+
//! # use conduit::{box_error, Body, Response, RequestExt, HandlerResult};
3939
//! #
4040
//! # struct Endpoint();
4141
//! # impl Handler for Endpoint {
4242
//! # fn call(&self, _: &mut dyn RequestExt) -> HandlerResult {
43-
//! # Response::builder().body(static_to_body(b"")).map_err(box_error)
43+
//! # Response::builder().body(Body::empty()).map_err(box_error)
4444
//! # }
4545
//! # }
4646
//! ```
4747
4848
mod adaptor;
49+
mod file_stream;
4950
mod server;
5051
mod service;
5152
#[cfg(test)]
@@ -55,3 +56,18 @@ pub use server::Server;
5556
pub use service::{BlockingHandler, Service};
5657

5758
type HyperResponse = hyper::Response<hyper::Body>;
59+
type ConduitResponse = conduit::Response<conduit::Body>;
60+
use crate::file_stream::FileStream;
61+
62+
/// Turns a `ConduitResponse` into a `HyperResponse`
63+
fn conduit_into_hyper(response: ConduitResponse) -> HyperResponse {
64+
use conduit::Body::*;
65+
66+
let (parts, body) = response.into_parts();
67+
let body = match body {
68+
Static(slice) => slice.into(),
69+
Owned(vec) => vec.into(),
70+
File(file) => FileStream::from_std(file).into_streamed_body(),
71+
};
72+
HyperResponse::from_parts(parts, body)
73+
}

src/service.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ impl Service {
2323
/// ```no_run
2424
/// # use std::sync::Arc;
2525
/// # use conduit_hyper::{BlockingHandler, Service};
26-
/// # use conduit::{box_error, static_to_body, Handler, HandlerResult, RequestExt, Response};
26+
/// # use conduit::{box_error, Body, Handler, HandlerResult, RequestExt, Response};
2727
/// #
2828
/// # struct Endpoint();
2929
/// # impl Handler for Endpoint {
3030
/// # fn call(&self, _: &mut dyn RequestExt) -> HandlerResult {
31-
/// # Response::builder().body(static_to_body(b"")).map_err(box_error)
31+
/// # Response::builder().body(Body::empty()).map_err(box_error)
3232
/// # }
3333
/// # }
3434
/// # let app = Endpoint();

src/service/blocking_handler.rs

+8-21
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
use crate::adaptor::{ConduitRequest, RequestInfo};
22
use crate::service::ServiceError;
3-
use crate::HyperResponse;
3+
use crate::{conduit_into_hyper, HyperResponse};
44

55
use std::net::SocketAddr;
66
use std::sync::{
77
atomic::{AtomicUsize, Ordering},
88
Arc,
99
};
1010

11-
use conduit::Handler;
11+
use conduit::{header, Handler, StatusCode};
1212
use hyper::{Body, Request, Response};
1313
use tracing::error;
1414

15-
type ConduitResponse = Response<conduit::Body>;
16-
1715
#[derive(Debug)]
1816
pub struct BlockingHandler<H: Handler> {
1917
thread_count: AtomicUsize,
@@ -54,31 +52,20 @@ impl<H: Handler> BlockingHandler<H> {
5452
let mut request = ConduitRequest::new(&mut request_info, remote_addr);
5553
handler
5654
.call(&mut request)
57-
.map(good_response)
55+
.map(conduit_into_hyper)
5856
.unwrap_or_else(|e| server_error_response(&e.to_string()))
5957
})
6058
.await
6159
.map_err(Into::into)
6260
}
6361
}
6462

65-
/// Builds a `hyper::Response` given a `conduit:Response`
66-
fn good_response(mut response: ConduitResponse) -> HyperResponse {
67-
let mut body = Vec::new();
68-
if response.body_mut().write_body(&mut body).is_err() {
69-
return server_error_response("Error writing body");
70-
}
71-
72-
let (parts, _) = response.into_parts();
73-
Response::from_parts(parts, body.into())
74-
}
75-
7663
/// Logs an error message and returns a generic status 500 response
7764
fn server_error_response(message: &str) -> HyperResponse {
7865
error!("Internal Server Error: {}", message);
79-
let body = Body::from("Internal Server Error");
66+
let body = hyper::Body::from("Internal Server Error");
8067
Response::builder()
81-
.status(500)
68+
.status(StatusCode::INTERNAL_SERVER_ERROR)
8269
.body(body)
8370
.expect("Unexpected invalid header")
8471
}
@@ -87,13 +74,13 @@ fn server_error_response(message: &str) -> HyperResponse {
8774
fn over_capacity_error_response() -> HyperResponse {
8875
const RETRY_AFTER: u32 = 2;
8976
error!("Server Capacity Exceeded");
90-
let body = Body::from(format!(
77+
let body = hyper::Body::from(format!(
9178
"Service Unavailable: Please retry after {} seconds.",
9279
RETRY_AFTER
9380
));
9481
Response::builder()
95-
.status(503)
96-
.header("Retry-After", RETRY_AFTER)
82+
.status(StatusCode::SERVICE_UNAVAILABLE)
83+
.header(header::RETRY_AFTER, RETRY_AFTER)
9784
.body(body)
9885
.expect("Unexpected invalid header")
9986
}

src/tests.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use conduit::{
2-
box_error, static_to_body, Handler, HandlerResult, RequestExt, Response, StatusCode,
3-
};
1+
use conduit::{box_error, Body, Handler, HandlerResult, RequestExt, Response, StatusCode};
42
use futures::prelude::*;
53
use hyper::{body::to_bytes, service::Service};
64

@@ -12,7 +10,7 @@ impl Handler for OkResult {
1210
fn call(&self, _req: &mut dyn RequestExt) -> HandlerResult {
1311
Response::builder()
1412
.header("ok", "value")
15-
.body(static_to_body(b"Hello, world!"))
13+
.body(Body::from_static(b"Hello, world!"))
1614
.map_err(box_error)
1715
}
1816
}
@@ -37,7 +35,7 @@ impl Handler for InvalidHeader {
3735
fn call(&self, _req: &mut dyn RequestExt) -> HandlerResult {
3836
Response::builder()
3937
.header("invalid-value", "\r\n")
40-
.body(static_to_body(b"discarded"))
38+
.body(Body::from_static(b"discarded"))
4139
.map_err(box_error)
4240
}
4341
}
@@ -47,7 +45,7 @@ impl Handler for InvalidStatus {
4745
fn call(&self, _req: &mut dyn RequestExt) -> HandlerResult {
4846
Response::builder()
4947
.status(1000)
50-
.body(static_to_body(b""))
48+
.body(Body::empty())
5149
.map_err(box_error)
5250
}
5351
}

0 commit comments

Comments
 (0)