Skip to content

Commit b9b11f3

Browse files
authored
Merge pull request #9 from jtgeibel/remove-over-capacity-rejection
Remove rejection of requests when over capacity
2 parents eb27387 + 2941b38 commit b9b11f3

File tree

6 files changed

+13
-88
lines changed

6 files changed

+13
-88
lines changed

examples/server.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ use conduit_router::RouteBuilder;
77
use std::io;
88
use std::thread::sleep;
99

10-
const MAX_THREADS: usize = 1;
11-
1210
#[tokio::main]
1311
async fn main() {
1412
env_logger::init();
1513

1614
let app = build_conduit_handler();
1715
let addr = ([127, 0, 0, 1], 12345).into();
1816

19-
Server::serve(&addr, app, MAX_THREADS).await;
17+
Server::serve(&addr, app).await;
2018
}
2119

2220
fn build_conduit_handler() -> impl Handler {

src/lib.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717
//! use conduit_hyper::Server;
1818
//! use tokio::runtime::Runtime;
1919
//!
20-
//! const MAX_THREADS: usize = 10;
21-
//!
2220
//! #[tokio::main]
2321
//! async fn main() {
2422
//! let app = build_conduit_handler();
2523
//! let addr = ([127, 0, 0, 1], 12345).into();
26-
//! let server = Server::serve(&addr, app, MAX_THREADS);
24+
//! let server = Server::serve(&addr, app);
2725
//!
2826
//! server.await;
2927
//! }

src/server.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,8 @@ impl Server {
1919
/// `tokio::Runtime` it is not possible to furter configure the `hyper::Server`. If more
2020
/// control, such as configuring a graceful shutdown is necessary, then call
2121
/// `Service::from_blocking` instead.
22-
pub fn serve<H: conduit::Handler>(
23-
addr: &SocketAddr,
24-
handler: H,
25-
max_threads: usize,
26-
) -> impl Future {
27-
let handler = Arc::new(BlockingHandler::new(handler, max_threads));
22+
pub fn serve<H: conduit::Handler>(addr: &SocketAddr, handler: H) -> impl Future {
23+
let handler = Arc::new(BlockingHandler::new(handler));
2824
let make_service = make_service_fn(move |socket: &AddrStream| {
2925
let handler = handler.clone();
3026
let remote_addr = socket.remote_addr();

src/service.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ impl Service {
3232
/// # }
3333
/// # }
3434
/// # let app = Endpoint();
35-
/// const MAX_THREADS: usize = 10;
36-
/// let handler = Arc::new(BlockingHandler::new(app, MAX_THREADS));
35+
/// let handler = Arc::new(BlockingHandler::new(app));
3736
/// let make_service =
3837
/// hyper::service::make_service_fn(move |socket: &hyper::server::conn::AddrStream| {
3938
/// let addr = socket.remote_addr();

src/service/blocking_handler.rs

+3-51
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,20 @@ use crate::service::ServiceError;
44
use crate::{ConduitResponse, HyperResponse};
55

66
use std::net::SocketAddr;
7-
use std::sync::{
8-
atomic::{AtomicUsize, Ordering},
9-
Arc,
10-
};
7+
use std::sync::Arc;
118

12-
use conduit::{header, Handler, StatusCode};
9+
use conduit::{Handler, StatusCode};
1310
use hyper::{Body, Request, Response};
1411
use tracing::error;
1512

1613
#[derive(Debug)]
1714
pub struct BlockingHandler<H: Handler> {
18-
thread_count: AtomicUsize,
19-
max_thread_count: usize,
2015
handler: Arc<H>,
2116
}
2217

2318
impl<H: Handler> BlockingHandler<H> {
24-
pub fn new(handler: H, max_thread_count: usize) -> Self {
19+
pub fn new(handler: H) -> Self {
2520
Self {
26-
thread_count: AtomicUsize::new(0),
27-
max_thread_count,
2821
handler: Arc::new(handler),
2922
}
3023
}
@@ -40,14 +33,6 @@ impl<H: Handler> BlockingHandler<H> {
4033
let full_body = hyper::body::to_bytes(body).await?;
4134
let mut request_info = RequestInfo::new(parts, full_body);
4235

43-
// The _drop_on_return ensures the counter is decreased for all exit paths
44-
let (_drop_on_return, prev_count) = ThreadCounter::begin_with(&self.thread_count);
45-
46-
// Comparison is against the "previous value" from an atomic fetch_add, so using `>=`
47-
if prev_count >= self.max_thread_count {
48-
return Ok(over_capacity_error_response());
49-
}
50-
5136
let handler = self.handler.clone();
5237
tokio::task::spawn_blocking(move || {
5338
let mut request = ConduitRequest::new(&mut request_info, remote_addr);
@@ -83,36 +68,3 @@ fn server_error_response(message: &str) -> HyperResponse {
8368
.body(body)
8469
.expect("Unexpected invalid header")
8570
}
86-
87-
/// Logs an error message and returns a 503 status saying the service is over capacity
88-
fn over_capacity_error_response() -> HyperResponse {
89-
const RETRY_AFTER: u32 = 2;
90-
error!("Server Capacity Exceeded");
91-
let body = hyper::Body::from(format!(
92-
"Service Unavailable: Please retry after {} seconds.",
93-
RETRY_AFTER
94-
));
95-
Response::builder()
96-
.status(StatusCode::SERVICE_UNAVAILABLE)
97-
.header(header::RETRY_AFTER, RETRY_AFTER)
98-
.body(body)
99-
.expect("Unexpected invalid header")
100-
}
101-
102-
/// A struct that stores a reference to an atomic counter so it can be decremented when dropped
103-
struct ThreadCounter<'a> {
104-
counter: &'a AtomicUsize,
105-
}
106-
107-
impl<'a> ThreadCounter<'a> {
108-
fn begin_with(counter: &'a AtomicUsize) -> (Self, usize) {
109-
let previous = counter.fetch_add(1, Ordering::SeqCst);
110-
(Self { counter }, previous)
111-
}
112-
}
113-
114-
impl<'a> Drop for ThreadCounter<'a> {
115-
fn drop(&mut self) {
116-
self.counter.fetch_sub(1, Ordering::SeqCst);
117-
}
118-
}

src/tests.rs

+5-23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use conduit::{box_error, Body, Handler, HandlerResult, RequestExt, Response, StatusCode};
2-
use futures_util::future::{Future, FutureExt};
2+
use futures_util::future::Future;
33
use hyper::{body::to_bytes, service::Service};
44

55
use super::service::{BlockingHandler, ServiceError};
@@ -71,7 +71,6 @@ impl Handler for Sleep {
7171

7272
fn make_service<H: Handler>(
7373
handler: H,
74-
max_thread_count: usize,
7574
) -> impl Service<
7675
hyper::Request<hyper::Body>,
7776
Response = HyperResponse,
@@ -80,7 +79,7 @@ fn make_service<H: Handler>(
8079
> {
8180
use hyper::service::service_fn;
8281

83-
let handler = std::sync::Arc::new(BlockingHandler::new(handler, max_thread_count));
82+
let handler = std::sync::Arc::new(BlockingHandler::new(handler));
8483

8584
service_fn(move |request: hyper::Request<hyper::Body>| {
8685
let remote_addr = ([0, 0, 0, 0], 0).into();
@@ -89,7 +88,7 @@ fn make_service<H: Handler>(
8988
}
9089

9190
async fn simulate_request<H: Handler>(handler: H) -> HyperResponse {
92-
let mut service = make_service(handler, 1);
91+
let mut service = make_service(handler);
9392
service.call(hyper::Request::default()).await.unwrap()
9493
}
9594

@@ -128,7 +127,7 @@ async fn recover_from_panic() {
128127

129128
#[tokio::test]
130129
async fn normalize_path() {
131-
let mut service = make_service(AssertPathNormalized, 1);
130+
let mut service = make_service(AssertPathNormalized);
132131
let req = hyper::Request::put("//removed/.././.././normalized")
133132
.body(hyper::Body::default())
134133
.unwrap();
@@ -144,26 +143,9 @@ async fn normalize_path() {
144143
assert_eq!(resp.headers().len(), 1);
145144
}
146145

147-
#[tokio::test]
148-
async fn limits_thread_count() {
149-
let mut service = make_service(Sleep, 1);
150-
let first = service.call(hyper::Request::default());
151-
let second = service.call(hyper::Request::default());
152-
153-
let first_completed = futures_util::select! {
154-
// The first thead is spawned and sleeps for 100ms
155-
sleep = first.fuse() => sleep,
156-
// The second request is rejected immediately
157-
over_capacity = second.fuse() => over_capacity,
158-
}
159-
.unwrap();
160-
161-
assert_eq!(first_completed.status(), StatusCode::SERVICE_UNAVAILABLE)
162-
}
163-
164146
#[tokio::test]
165147
async fn sleeping_doesnt_block_another_request() {
166-
let mut service = make_service(Sleep, 2);
148+
let mut service = make_service(Sleep);
167149

168150
let first = service.call(hyper::Request::default());
169151
let second = service.call(hyper::Request::default());

0 commit comments

Comments
 (0)