Skip to content

Add max send buffer per stream option #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ pub struct Builder {
/// Initial target window size for new connections.
initial_target_connection_window_size: Option<u32>,

/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,

/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

Expand Down Expand Up @@ -628,6 +631,7 @@ impl Builder {
/// ```
pub fn new() -> Builder {
Builder {
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
initial_target_connection_window_size: None,
Expand Down Expand Up @@ -962,6 +966,24 @@ impl Builder {
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
/// flow control will not "poll" additional capacity. Once bytes for the
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
/// The default is currently ~400MB, but may change.
///
/// # Panics
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
self.max_send_buffer_size = max;
self
}

/// Enables or disables server push promises.
///
/// This value is included in the initial SETTINGS handshake. When set, the
Expand Down Expand Up @@ -1184,6 +1206,7 @@ where
proto::Config {
next_stream_id: builder.stream_id,
initial_max_send_streams: builder.initial_max_send_streams,
max_send_buffer_size: builder.max_send_buffer_size,
reset_stream_duration: builder.reset_stream_duration,
reset_stream_max: builder.reset_stream_max,
settings: builder.settings.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> {
pub(crate) struct Config {
pub next_stream_id: StreamId,
pub initial_max_send_streams: usize,
pub max_send_buffer_size: usize,
pub reset_stream_duration: Duration,
pub reset_stream_max: usize,
pub settings: frame::Settings,
Expand Down Expand Up @@ -108,6 +109,7 @@ where
.initial_window_size()
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
initial_max_send_streams: config.initial_max_send_streams,
local_max_buffer_size: config.max_send_buffer_size,
local_next_stream_id: config.next_stream_id,
local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
extended_connect_protocol_enabled: config
Expand Down
1 change: 1 addition & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ pub type WindowSize = u32;
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
3 changes: 3 additions & 0 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct Config {
/// MAX_CONCURRENT_STREAMS specified in the frame.
pub initial_max_send_streams: usize,

/// Max amount of DATA bytes to buffer per stream.
pub local_max_buffer_size: usize,

/// The stream ID to start the next local stream with
pub local_next_stream_id: StreamId,

Expand Down
12 changes: 6 additions & 6 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(super) struct Send {
/// > the identified last stream.
max_stream_id: StreamId,

/// The maximum amount of bytes a stream should buffer.
max_buffer_size: usize,

/// Initial window size of locally initiated streams
init_window_sz: WindowSize,

Expand All @@ -52,6 +55,7 @@ impl Send {
pub fn new(config: &Config) -> Self {
Send {
init_window_sz: config.remote_init_window_sz,
max_buffer_size: config.local_max_buffer_size,
max_stream_id: StreamId::MAX,
next_stream_id: Ok(config.local_next_stream_id),
prioritize: Prioritize::new(config),
Expand Down Expand Up @@ -333,14 +337,10 @@ impl Send {

/// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
let available = stream.send_flow.available().as_size();
let available = stream.send_flow.available().as_size() as usize;
let buffered = stream.buffered_send_data;

if available as usize <= buffered {
0
} else {
available - buffered as WindowSize
}
available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize
}

pub fn poll_reset(
Expand Down
23 changes: 23 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ pub struct Builder {

/// Initial target window size for new connections.
initial_target_connection_window_size: Option<u32>,

/// Maximum amount of bytes to "buffer" for writing per stream.
max_send_buffer_size: usize,
}

/// Send a response back to the client
Expand Down Expand Up @@ -633,6 +636,7 @@ impl Builder {
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
}
}

Expand Down Expand Up @@ -870,6 +874,24 @@ impl Builder {
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
/// flow control will not "poll" additional capacity. Once bytes for the
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
/// The default is currently ~400MB, but may change.
///
/// # Panics
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
self.max_send_buffer_size = max;
self
}

/// Sets the maximum number of concurrent locally reset streams.
///
/// When a stream is explicitly reset by either calling
Expand Down Expand Up @@ -1290,6 +1312,7 @@ where
next_stream_id: 2.into(),
// Server does not need to locally initiate any streams
initial_max_send_streams: 0,
max_send_buffer_size: self.builder.max_send_buffer_size,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
settings: self.builder.settings.clone(),
Expand Down
57 changes: 57 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,3 +1611,60 @@ async fn poll_capacity_after_send_data_and_reserve() {

join(srv, h2).await;
}

#[tokio::test]
async fn max_send_buffer_size_overflow() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.recv_frame(frames::data(1, &[0; 10][..])).await;
srv.recv_frame(frames::data(1, &[][..]).eos()).await;
};

let client = async move {
let (mut client, mut conn) = client::Builder::new()
.max_send_buffer_size(5)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();

let response = conn.drive(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

assert_eq!(stream.capacity(), 0);
stream.reserve_capacity(10);
assert_eq!(
stream.capacity(),
5,
"polled capacity not over max buffer size"
);

stream.send_data([0; 10][..].into(), false).unwrap();

stream.reserve_capacity(15);
assert_eq!(
stream.capacity(),
0,
"now with buffered over the max, don't overflow"
);
stream.send_data([0; 0][..].into(), true).unwrap();

// Wait for the connection to close
conn.await.unwrap();
};

join(srv, client).await;
}