Skip to content

Commit ee04292

Browse files
seanmonstarjwilm
andauthored
Fix opening new streams over max concurrent (#707)
There was a bug where opening streams over the max concurrent streams was possible if max_concurrent_streams were lowered beyond the current number of open streams and there were already new streams adding to the pending_send queue. There was two mechanisms for streams to end up in that queue. 1. send_headers would push directly onto pending_send when below max_concurrent_streams 2. prioritize would pop from pending_open until max_concurrent_streams was reached. For case 1, a settings frame could be received after pushing many streams onto pending_send and before the socket was ready to write again. For case 2, the pending_send queue could have Headers frames queued going into a Not Ready state with the socket, a settings frame could be received, and then the headers would be written anyway after the ack. The fix is therefore also two fold. Fixing case 1 is as simple as letting Prioritize decide when to transition streams from `pending_open` to `pending_send` since only it knows the readiness of the socket and whether the headers can be written immediately. This is slightly complicated by the fact that previously SendRequest would block when streams would be added as "pending open". That was addressed by guessing when to block based on max concurrent streams rather than the stream state. The fix for Prioritize was to conservatively pop streams from pending_open when the socket is immediately available for writing a headers frame. This required a change to queuing to support pushing on the front of pending_send to ensure headers frames don't linger in pending_send. The alternative to this was adding a check to pending_send whether a new stream would exceed max concurrent. In that case, headers frames would need to carefully be reenqueued. This seemed to impose more complexity to ensure ordering of stream IDs would be maintained. Closes #704 Closes #706 Co-authored-by: Joe Wilm <[email protected]>
1 parent 9bd62a2 commit ee04292

File tree

8 files changed

+179
-26
lines changed

8 files changed

+179
-26
lines changed

src/client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,10 @@ where
510510
self.inner
511511
.send_request(request, end_of_stream, self.pending.as_ref())
512512
.map_err(Into::into)
513-
.map(|stream| {
514-
if stream.is_pending_open() {
513+
.map(|(stream, is_full)| {
514+
if stream.is_pending_open() && is_full {
515+
// Only prevent sending another request when the request queue
516+
// is not full.
515517
self.pending = Some(stream.clone_to_opaque());
516518
}
517519

src/proto/streams/counts.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ impl Counts {
4949
}
5050
}
5151

52+
/// Returns true when the next opened stream will reach capacity of outbound streams
53+
///
54+
/// The number of client send streams is incremented in prioritize; send_request has to guess if
55+
/// it should wait before allowing another request to be sent.
56+
pub fn next_send_stream_will_reach_capacity(&self) -> bool {
57+
self.max_send_streams <= (self.num_send_streams + 1)
58+
}
59+
5260
/// Returns the current peer
5361
pub fn peer(&self) -> peer::Dyn {
5462
self.peer

src/proto/streams/prioritize.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,9 @@ impl Prioritize {
520520
tracing::trace!("poll_complete");
521521

522522
loop {
523-
self.schedule_pending_open(store, counts);
523+
if let Some(mut stream) = self.pop_pending_open(store, counts) {
524+
self.pending_send.push_front(&mut stream);
525+
}
524526

525527
match self.pop_frame(buffer, store, max_frame_len, counts) {
526528
Some(frame) => {
@@ -874,20 +876,24 @@ impl Prioritize {
874876
}
875877
}
876878

877-
fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
879+
fn pop_pending_open<'s>(
880+
&mut self,
881+
store: &'s mut Store,
882+
counts: &mut Counts,
883+
) -> Option<store::Ptr<'s>> {
878884
tracing::trace!("schedule_pending_open");
879885
// check for any pending open streams
880-
while counts.can_inc_num_send_streams() {
886+
if counts.can_inc_num_send_streams() {
881887
if let Some(mut stream) = self.pending_open.pop(store) {
882888
tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
883889

884890
counts.inc_num_send_streams(&mut stream);
885-
self.pending_send.push(&mut stream);
886891
stream.notify_send();
887-
} else {
888-
return;
892+
return Some(stream);
889893
}
890894
}
895+
896+
None
891897
}
892898
}
893899

src/proto/streams/send.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,22 +143,27 @@ impl Send {
143143
// Update the state
144144
stream.state.send_open(end_stream)?;
145145

146-
if counts.peer().is_local_init(frame.stream_id()) {
147-
// If we're waiting on a PushPromise anyway
148-
// handle potentially queueing the stream at that point
149-
if !stream.is_pending_push {
150-
if counts.can_inc_num_send_streams() {
151-
counts.inc_num_send_streams(stream);
152-
} else {
153-
self.prioritize.queue_open(stream);
154-
}
155-
}
146+
let mut pending_open = false;
147+
if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148+
self.prioritize.queue_open(stream);
149+
pending_open = true;
156150
}
157151

158152
// Queue the frame for sending
153+
//
154+
// This call expects that, since new streams are in the open queue, new
155+
// streams won't be pushed on pending_send.
159156
self.prioritize
160157
.queue_frame(frame.into(), buffer, stream, task);
161158

159+
// Need to notify the connection when pushing onto pending_open since
160+
// queue_frame only notifies for pending_send.
161+
if pending_open {
162+
if let Some(task) = task.take() {
163+
task.wake();
164+
}
165+
}
166+
162167
Ok(())
163168
}
164169

src/proto/streams/store.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ where
256256
///
257257
/// If the stream is already contained by the list, return `false`.
258258
pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
259-
tracing::trace!("Queue::push");
259+
tracing::trace!("Queue::push_back");
260260

261261
if N::is_queued(stream) {
262262
tracing::trace!(" -> already queued");
@@ -292,6 +292,46 @@ where
292292
true
293293
}
294294

295+
/// Queue the stream
296+
///
297+
/// If the stream is already contained by the list, return `false`.
298+
pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
299+
tracing::trace!("Queue::push_front");
300+
301+
if N::is_queued(stream) {
302+
tracing::trace!(" -> already queued");
303+
return false;
304+
}
305+
306+
N::set_queued(stream, true);
307+
308+
// The next pointer shouldn't be set
309+
debug_assert!(N::next(stream).is_none());
310+
311+
// Queue the stream
312+
match self.indices {
313+
Some(ref mut idxs) => {
314+
tracing::trace!(" -> existing entries");
315+
316+
// Update the provided stream to point to the head node
317+
let head_key = stream.resolve(idxs.head).key();
318+
N::set_next(stream, Some(head_key));
319+
320+
// Update the head pointer
321+
idxs.head = stream.key();
322+
}
323+
None => {
324+
tracing::trace!(" -> first entry");
325+
self.indices = Some(store::Indices {
326+
head: stream.key(),
327+
tail: stream.key(),
328+
});
329+
}
330+
}
331+
332+
true
333+
}
334+
295335
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
296336
where
297337
R: Resolve,

src/proto/streams/streams.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ where
216216
mut request: Request<()>,
217217
end_of_stream: bool,
218218
pending: Option<&OpaqueStreamRef>,
219-
) -> Result<StreamRef<B>, SendError> {
219+
) -> Result<(StreamRef<B>, bool), SendError> {
220220
use super::stream::ContentLength;
221221
use http::Method;
222222

@@ -298,10 +298,14 @@ where
298298
// the lock, so it can't.
299299
me.refs += 1;
300300

301-
Ok(StreamRef {
302-
opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
303-
send_buffer: self.send_buffer.clone(),
304-
})
301+
let is_full = me.counts.next_send_stream_will_reach_capacity();
302+
Ok((
303+
StreamRef {
304+
opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
305+
send_buffer: self.send_buffer.clone(),
306+
},
307+
is_full,
308+
))
305309
}
306310

307311
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {

tests/h2-tests/tests/client_request.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ async fn request_over_max_concurrent_streams_errors() {
239239

240240
// first request is allowed
241241
let (resp1, mut stream1) = client.send_request(request, false).unwrap();
242+
// as long as we let the connection internals tick
243+
client = h2.drive(client.ready()).await.unwrap();
242244

243245
let request = Request::builder()
244246
.method(Method::POST)
@@ -284,6 +286,90 @@ async fn request_over_max_concurrent_streams_errors() {
284286
join(srv, h2).await;
285287
}
286288

289+
#[tokio::test]
290+
async fn recv_decrement_max_concurrent_streams_when_requests_queued() {
291+
h2_support::trace_init!();
292+
let (io, mut srv) = mock::new();
293+
294+
let srv = async move {
295+
let settings = srv.assert_client_handshake().await;
296+
assert_default_settings!(settings);
297+
srv.recv_frame(
298+
frames::headers(1)
299+
.request("POST", "https://example.com/")
300+
.eos(),
301+
)
302+
.await;
303+
srv.send_frame(frames::headers(1).response(200).eos()).await;
304+
305+
srv.ping_pong([0; 8]).await;
306+
307+
// limit this server later in life
308+
srv.send_frame(frames::settings().max_concurrent_streams(1))
309+
.await;
310+
srv.recv_frame(frames::settings_ack()).await;
311+
srv.recv_frame(
312+
frames::headers(3)
313+
.request("POST", "https://example.com/")
314+
.eos(),
315+
)
316+
.await;
317+
srv.ping_pong([1; 8]).await;
318+
srv.send_frame(frames::headers(3).response(200).eos()).await;
319+
320+
srv.recv_frame(
321+
frames::headers(5)
322+
.request("POST", "https://example.com/")
323+
.eos(),
324+
)
325+
.await;
326+
srv.send_frame(frames::headers(5).response(200).eos()).await;
327+
};
328+
329+
let h2 = async move {
330+
let (mut client, mut h2) = client::handshake(io).await.expect("handshake");
331+
// we send a simple req here just to drive the connection so we can
332+
// receive the server settings.
333+
let request = Request::builder()
334+
.method(Method::POST)
335+
.uri("https://example.com/")
336+
.body(())
337+
.unwrap();
338+
// first request is allowed
339+
let (response, _) = client.send_request(request, true).unwrap();
340+
h2.drive(response).await.unwrap();
341+
342+
let request = Request::builder()
343+
.method(Method::POST)
344+
.uri("https://example.com/")
345+
.body(())
346+
.unwrap();
347+
348+
// first request is allowed
349+
let (resp1, _) = client.send_request(request, true).unwrap();
350+
351+
let request = Request::builder()
352+
.method(Method::POST)
353+
.uri("https://example.com/")
354+
.body(())
355+
.unwrap();
356+
357+
// second request is put into pending_open
358+
let (resp2, _) = client.send_request(request, true).unwrap();
359+
360+
h2.drive(async move {
361+
resp1.await.expect("req");
362+
})
363+
.await;
364+
join(async move { h2.await.unwrap() }, async move {
365+
resp2.await.unwrap()
366+
})
367+
.await;
368+
};
369+
370+
join(srv, h2).await;
371+
}
372+
287373
#[tokio::test]
288374
async fn send_request_poll_ready_when_connection_error() {
289375
h2_support::trace_init!();
@@ -336,6 +422,8 @@ async fn send_request_poll_ready_when_connection_error() {
336422

337423
// first request is allowed
338424
let (resp1, _) = client.send_request(request, true).unwrap();
425+
// as long as we let the connection internals tick
426+
client = h2.drive(client.ready()).await.unwrap();
339427

340428
let request = Request::builder()
341429
.method(Method::POST)

tests/h2-tests/tests/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,10 @@ async fn push_request_against_concurrency() {
296296
.await;
297297
client.recv_frame(frames::data(2, &b""[..]).eos()).await;
298298
client
299-
.recv_frame(frames::headers(1).response(200).eos())
299+
.recv_frame(frames::headers(4).response(200).eos())
300300
.await;
301301
client
302-
.recv_frame(frames::headers(4).response(200).eos())
302+
.recv_frame(frames::headers(1).response(200).eos())
303303
.await;
304304
};
305305

0 commit comments

Comments
 (0)