Skip to content

Commit d837e1f

Browse files
committed
Implement server::Builder::max_send_buffer_size
1 parent ce81583 commit d837e1f

File tree

5 files changed

+95
-3
lines changed

5 files changed

+95
-3
lines changed

src/proto/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ where
140140
self.inner.streams.set_target_connection_window_size(size);
141141
}
142142

143+
pub(crate) fn set_max_send_buffer_size(&mut self, max: usize) {
144+
self.inner.streams.set_max_send_buffer_size(max);
145+
}
146+
143147
/// Send a new SETTINGS frame with an updated initial window size.
144148
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
145149
let mut settings = frame::Settings::default();

src/proto/streams/prioritize.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ pub(super) struct Prioritize {
5151

5252
/// What `DATA` frame is currently being sent in the codec.
5353
in_flight_data_frame: InFlightData,
54+
55+
/// The max send buffer size allowed.
56+
max_send_buffer_size: usize,
57+
58+
/// The current send buffer size.
59+
current_send_buffer_size: usize,
5460
}
5561

5662
#[derive(Debug, Eq, PartialEq)]
@@ -93,9 +99,17 @@ impl Prioritize {
9399
flow,
94100
last_opened_id: StreamId::ZERO,
95101
in_flight_data_frame: InFlightData::Nothing,
102+
max_send_buffer_size: usize::MAX,
103+
current_send_buffer_size: 0,
96104
}
97105
}
98106

107+
pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
108+
self.max_send_buffer_size = max;
109+
110+
self.assign_connection_capacity(0, store, counts);
111+
}
112+
99113
/// Queue a frame to be sent to the remote
100114
pub fn queue_frame<B>(
101115
&mut self,
@@ -175,6 +189,8 @@ impl Prioritize {
175189
self.try_assign_capacity(stream);
176190
}
177191

192+
self.current_send_buffer_size += sz as usize;
193+
178194
if frame.is_end_stream() {
179195
stream.state.send_close();
180196
self.reserve_capacity(0, stream, counts);
@@ -350,7 +366,7 @@ impl Prioritize {
350366
self.flow.assign_capacity(inc);
351367

352368
// Assign newly acquired capacity to streams pending capacity.
353-
while self.flow.available() > 0 {
369+
while self.available() > 0 {
354370
let stream = match self.pending_capacity.pop(store) {
355371
Some(stream) => stream,
356372
None => return,
@@ -373,6 +389,17 @@ impl Prioritize {
373389
}
374390
}
375391

392+
fn available(&self) -> WindowSize {
393+
cmp::min(
394+
self.flow.available().as_size() as usize,
395+
cmp::min(
396+
self.max_send_buffer_size
397+
.saturating_sub(self.current_send_buffer_size),
398+
WindowSize::MAX as usize,
399+
),
400+
) as WindowSize
401+
}
402+
376403
/// Request capacity to send data
377404
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
378405
let total_requested = stream.requested_send_capacity;
@@ -395,7 +422,8 @@ impl Prioritize {
395422
additional,
396423
buffered = stream.buffered_send_data,
397424
window = stream.send_flow.window_size(),
398-
conn = %self.flow.available()
425+
conn_window = %self.flow.available(),
426+
conn = self.available(),
399427
);
400428

401429
if additional == 0 {
@@ -413,7 +441,7 @@ impl Prioritize {
413441
);
414442

415443
// The amount of currently available capacity on the connection
416-
let conn_available = self.flow.available().as_size();
444+
let conn_available = self.available();
417445

418446
// First check if capacity is immediately available
419447
if conn_available > 0 {
@@ -509,6 +537,8 @@ impl Prioritize {
509537

510538
// Because, always try to reclaim...
511539
self.reclaim_frame(buffer, store, dst);
540+
541+
self.assign_connection_capacity(0, store, counts);
512542
}
513543
None => {
514544
// Try to flush the codec.
@@ -630,6 +660,8 @@ impl Prioritize {
630660
tracing::trace!(?frame, "dropping");
631661
}
632662

663+
self.current_send_buffer_size -= stream.buffered_send_data;
664+
633665
stream.buffered_send_data = 0;
634666
stream.requested_send_capacity = 0;
635667
if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
@@ -736,6 +768,8 @@ impl Prioritize {
736768
tracing::trace_span!("updating stream flow").in_scope(|| {
737769
stream.send_flow.send_data(len);
738770

771+
self.current_send_buffer_size -= len as usize;
772+
739773
// Decrement the stream's buffered data counter
740774
debug_assert!(stream.buffered_send_data >= len as usize);
741775
stream.buffered_send_data -= len as usize;

src/proto/streams/send.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ impl Send {
118118
Ok(())
119119
}
120120

121+
pub fn set_max_send_buffer_size(&mut self, max: usize, store: &mut Store, counts: &mut Counts) {
122+
self.prioritize.set_max_send_buffer_size(max, store, counts);
123+
}
124+
121125
pub fn send_headers<B>(
122126
&mut self,
123127
frame: frame::Headers,

src/proto/streams/streams.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ where
127127
.set_target_connection_window(size, &mut me.actions.task)
128128
}
129129

130+
pub fn set_max_send_buffer_size(&mut self, max: usize) {
131+
let mut me = self.inner.lock().unwrap();
132+
let me = &mut *me;
133+
134+
me.actions
135+
.send
136+
.set_max_send_buffer_size(max, &mut me.store, &mut me.counts);
137+
}
138+
130139
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131140
let mut me = self.inner.lock().unwrap();
132141
let me = &mut *me;

src/server.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ pub struct Builder {
245245

246246
/// Initial target window size for new connections.
247247
initial_target_connection_window_size: Option<u32>,
248+
249+
/// Max send buffer size.
250+
max_send_buffer_size: Option<usize>,
248251
}
249252

250253
/// Send a response back to the client
@@ -451,6 +454,11 @@ where
451454
self.connection.set_target_window_size(size);
452455
}
453456

457+
/// Sets the max send buffer size for the whole connection.
458+
pub fn set_max_send_buffer_size(&mut self, max: usize) {
459+
self.connection.set_max_send_buffer_size(max);
460+
}
461+
454462
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
455463
/// flow control for received data.
456464
///
@@ -620,6 +628,7 @@ impl Builder {
620628
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
621629
settings: Settings::default(),
622630
initial_target_connection_window_size: None,
631+
max_send_buffer_size: None,
623632
}
624633
}
625634

@@ -763,6 +772,35 @@ impl Builder {
763772
self
764773
}
765774

775+
/// Sets the max size of the send buffer.
776+
///
777+
/// This setting is also used to limit the maximum amount of data
778+
/// buffered to be sent.
779+
///
780+
/// # Examples
781+
///
782+
/// ```
783+
/// # use tokio::io::{AsyncRead, AsyncWrite};
784+
/// # use h2::server::*;
785+
/// #
786+
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787+
/// # -> Handshake<T>
788+
/// # {
789+
/// // `server_fut` is a future representing the completion of the HTTP/2
790+
/// // handshake.
791+
/// let server_fut = Builder::new()
792+
/// .max_send_buffer_size(16 * 1024)
793+
/// .handshake(my_io);
794+
/// # server_fut
795+
/// # }
796+
/// #
797+
/// # pub fn main() {}
798+
/// ```
799+
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
800+
self.max_send_buffer_size = Some(max);
801+
self
802+
}
803+
766804
/// Sets the maximum number of concurrent streams.
767805
///
768806
/// The maximum concurrent streams setting only controls the maximum number
@@ -1280,6 +1318,9 @@ where
12801318
if let Some(sz) = self.builder.initial_target_connection_window_size {
12811319
c.set_target_window_size(sz);
12821320
}
1321+
if let Some(sz) = self.builder.max_send_buffer_size {
1322+
c.set_max_send_buffer_size(sz);
1323+
}
12831324
Ok(c)
12841325
})
12851326
}

0 commit comments

Comments
 (0)