Skip to content

Commit f7d388f

Browse files
committed
Implement server::Builder::max_send_buffer_size
1 parent ce81583 commit f7d388f

File tree

5 files changed

+95
-4
lines changed

5 files changed

+95
-4
lines changed

src/proto/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ where
140140
self.inner.streams.set_target_connection_window_size(size);
141141
}
142142

143+
pub(crate) fn set_max_send_buffer_size(&mut self, max: usize) {
144+
self.inner.streams.set_max_send_buffer_size(max);
145+
}
146+
143147
/// Send a new SETTINGS frame with an updated initial window size.
144148
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
145149
let mut settings = frame::Settings::default();

src/proto/streams/prioritize.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ pub(super) struct Prioritize {
5151

5252
/// What `DATA` frame is currently being sent in the codec.
5353
in_flight_data_frame: InFlightData,
54+
55+
/// The max send buffer size allowed.
56+
max_send_buffer_size: usize,
57+
58+
/// The current send buffer size.
59+
current_send_buffer_size: usize,
5460
}
5561

5662
#[derive(Debug, Eq, PartialEq)]
@@ -93,9 +99,17 @@ impl Prioritize {
9399
flow,
94100
last_opened_id: StreamId::ZERO,
95101
in_flight_data_frame: InFlightData::Nothing,
102+
max_send_buffer_size: usize::MAX,
103+
current_send_buffer_size: 0,
96104
}
97105
}
98106

107+
pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
108+
self.max_send_buffer_size = max;
109+
110+
self.assign_connection_capacity(0, store, counts);
111+
}
112+
99113
/// Queue a frame to be sent to the remote
100114
pub fn queue_frame<B>(
101115
&mut self,
@@ -157,6 +171,8 @@ impl Prioritize {
157171
}
158172
}
159173

174+
self.current_send_buffer_size += sz as usize;
175+
160176
// Update the buffered data counter
161177
stream.buffered_send_data += sz as usize;
162178

@@ -350,7 +366,7 @@ impl Prioritize {
350366
self.flow.assign_capacity(inc);
351367

352368
// Assign newly acquired capacity to streams pending capacity.
353-
while self.flow.available() > 0 {
369+
while self.available() > 0 {
354370
let stream = match self.pending_capacity.pop(store) {
355371
Some(stream) => stream,
356372
None => return,
@@ -373,6 +389,14 @@ impl Prioritize {
373389
}
374390
}
375391

392+
pub(crate) fn available(&self) -> WindowSize {
393+
cmp::min(
394+
self.flow.available().as_size() as usize,
395+
self.max_send_buffer_size
396+
.saturating_sub(self.current_send_buffer_size),
397+
) as WindowSize
398+
}
399+
376400
/// Request capacity to send data
377401
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
378402
let total_requested = stream.requested_send_capacity;
@@ -413,7 +437,7 @@ impl Prioritize {
413437
);
414438

415439
// The amount of currently available capacity on the connection
416-
let conn_available = self.flow.available().as_size();
440+
let conn_available = self.available();
417441

418442
// First check if capacity is immediately available
419443
if conn_available > 0 {
@@ -630,6 +654,10 @@ impl Prioritize {
630654
tracing::trace!(?frame, "dropping");
631655
}
632656

657+
// FIXME(nox): Do we need to wake something up when the current send
658+
// buffer size was equal to max send buffer size and decreased?
659+
self.current_send_buffer_size -= stream.buffered_send_data;
660+
633661
stream.buffered_send_data = 0;
634662
stream.requested_send_capacity = 0;
635663
if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
@@ -736,6 +764,8 @@ impl Prioritize {
736764
tracing::trace_span!("updating stream flow").in_scope(|| {
737765
stream.send_flow.send_data(len);
738766

767+
self.current_send_buffer_size -= len as usize;
768+
739769
// Decrement the stream's buffered data counter
740770
debug_assert!(stream.buffered_send_data >= len as usize);
741771
stream.buffered_send_data -= len as usize;

src/proto/streams/send.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use http;
1111
use std::task::{Context, Poll, Waker};
1212
use tokio::io::AsyncWrite;
1313

14-
use std::io;
14+
use std::{cmp, io};
1515

1616
/// Manages state transitions related to outbound frames.
1717
#[derive(Debug)]
@@ -118,6 +118,10 @@ impl Send {
118118
Ok(())
119119
}
120120

121+
pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
122+
self.prioritize.set_max_send_buffer_size(max, store, counts);
123+
}
124+
121125
pub fn send_headers<B>(
122126
&mut self,
123127
frame: frame::Headers,
@@ -335,7 +339,10 @@ impl Send {
335339
if available as usize <= buffered {
336340
0
337341
} else {
338-
available - buffered as WindowSize
342+
cmp::min(
343+
available - buffered as WindowSize,
344+
self.prioritize.available(),
345+
)
339346
}
340347
}
341348

src/proto/streams/streams.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ where
127127
.set_target_connection_window(size, &mut me.actions.task)
128128
}
129129

130+
pub fn set_max_send_buffer_size(&mut self, max: usize) {
131+
let mut me = self.inner.lock().unwrap();
132+
let me = &mut *me;
133+
134+
me.actions
135+
.send
136+
.set_max_send_buffer_size(max, &mut me.store, &mut me.counts);
137+
}
138+
130139
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131140
let mut me = self.inner.lock().unwrap();
132141
let me = &mut *me;

src/server.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ pub struct Builder {
245245

246246
/// Initial target window size for new connections.
247247
initial_target_connection_window_size: Option<u32>,
248+
249+
/// Max send buffer size.
250+
max_send_buffer_size: Option<usize>,
248251
}
249252

250253
/// Send a response back to the client
@@ -451,6 +454,11 @@ where
451454
self.connection.set_target_window_size(size);
452455
}
453456

457+
/// Sets the max send buffer size for the whole connection.
458+
pub fn set_max_send_buffer_size(&mut self, max: usize) {
459+
self.connection.set_max_send_buffer_size(max);
460+
}
461+
454462
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
455463
/// flow control for received data.
456464
///
@@ -620,6 +628,7 @@ impl Builder {
620628
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
621629
settings: Settings::default(),
622630
initial_target_connection_window_size: None,
631+
max_send_buffer_size: None,
623632
}
624633
}
625634

@@ -763,6 +772,35 @@ impl Builder {
763772
self
764773
}
765774

775+
/// Sets the max size of the send buffer.
776+
///
777+
/// This setting is also used to limit the maximum amount of data
778+
/// buffered to be sent.
779+
///
780+
/// # Examples
781+
///
782+
/// ```
783+
/// # use tokio::io::{AsyncRead, AsyncWrite};
784+
/// # use h2::server::*;
785+
/// #
786+
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787+
/// # -> Handshake<T>
788+
/// # {
789+
/// // `server_fut` is a future representing the completion of the HTTP/2
790+
/// // handshake.
791+
/// let server_fut = Builder::new()
792+
/// .max_send_buffer_size(16 * 1024)
793+
/// .handshake(my_io);
794+
/// # server_fut
795+
/// # }
796+
/// #
797+
/// # pub fn main() {}
798+
/// ```
799+
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
800+
self.max_send_buffer_size = Some(max);
801+
self
802+
}
803+
766804
/// Sets the maximum number of concurrent streams.
767805
///
768806
/// The maximum concurrent streams setting only controls the maximum number
@@ -1280,6 +1318,9 @@ where
12801318
if let Some(sz) = self.builder.initial_target_connection_window_size {
12811319
c.set_target_window_size(sz);
12821320
}
1321+
if let Some(sz) = self.builder.max_send_buffer_size {
1322+
c.set_max_send_buffer_size(sz);
1323+
}
12831324
Ok(c)
12841325
})
12851326
}

0 commit comments

Comments
 (0)