Skip to content

Commit eaf32f0

Browse files
jfourie1seanmonstar
authored andcommitted
fix(http2): Fix race condition in client dispatcher (#3041)
There exists a race condition in ClientTask::poll() when the request that is sent via h2::client::send_request() is pending open. A task will be spawned to wait for send capacity on the sendstream. Because this same stream is also stored in the pending member of h2::client::SendRequest the next iteration of the poll() loop can call poll_ready() and call wait_send() on the same stream passed into the spawned task. Fix this by always calling poll_ready() after send_request(). If this call to poll_ready() returns Pending save the necessary context in ClientTask and only spawn the task that will eventually resolve to the response after poll_ready() returns Ok.
1 parent 81e25fa commit eaf32f0

File tree

1 file changed

+143
-81
lines changed

1 file changed

+143
-81
lines changed

src/proto/h2/client.rs

Lines changed: 143 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@ use futures_channel::{mpsc, oneshot};
66
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
77
use futures_util::stream::StreamExt as _;
88
use h2::client::{Builder, SendRequest};
9+
use h2::SendStream;
910
use http::{Method, StatusCode};
1011
use tokio::io::{AsyncRead, AsyncWrite};
1112
use tracing::{debug, trace, warn};
1213

1314
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
1415
use crate::body::{Body, Incoming as IncomingBody};
1516
use crate::common::time::Time;
17+
use crate::client::dispatch::Callback;
1618
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
1719
use crate::ext::Protocol;
1820
use crate::headers;
1921
use crate::proto::h2::UpgradedSendStream;
2022
use crate::proto::Dispatched;
2123
use crate::upgrade::Upgraded;
2224
use crate::{Request, Response};
25+
use h2::client::ResponseFuture;
2326

2427
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
2528

@@ -161,6 +164,7 @@ where
161164
executor: exec,
162165
h2_tx,
163166
req_rx,
167+
fut_ctx: None,
164168
})
165169
}
166170

@@ -184,6 +188,20 @@ where
184188
}
185189
}
186190

191+
struct FutCtx<B>
192+
where
193+
B: Body,
194+
{
195+
is_connect: bool,
196+
eos: bool,
197+
fut: ResponseFuture,
198+
body_tx: SendStream<SendBuf<B::Data>>,
199+
body: B,
200+
cb: Callback<Request<B>, Response<IncomingBody>>,
201+
}
202+
203+
impl<B: Body> Unpin for FutCtx<B> {}
204+
187205
pub(crate) struct ClientTask<B>
188206
where
189207
B: Body,
@@ -194,6 +212,7 @@ where
194212
executor: Exec,
195213
h2_tx: SendRequest<SendBuf<B::Data>>,
196214
req_rx: ClientRx<B>,
215+
fut_ctx: Option<FutCtx<B>>,
197216
}
198217

199218
impl<B> ClientTask<B>
@@ -205,6 +224,99 @@ where
205224
}
206225
}
207226

227+
impl<B> ClientTask<B>
228+
where
229+
B: Body + Send + 'static,
230+
B::Data: Send,
231+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
232+
{
233+
fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut task::Context<'_>) {
234+
let ping = self.ping.clone();
235+
let send_stream = if !f.is_connect {
236+
if !f.eos {
237+
let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| {
238+
if let Err(e) = res {
239+
debug!("client request body error: {}", e);
240+
}
241+
});
242+
243+
// eagerly see if the body pipe is ready and
244+
// can thus skip allocating in the executor
245+
match Pin::new(&mut pipe).poll(cx) {
246+
Poll::Ready(_) => (),
247+
Poll::Pending => {
248+
let conn_drop_ref = self.conn_drop_ref.clone();
249+
// keep the ping recorder's knowledge of an
250+
// "open stream" alive while this body is
251+
// still sending...
252+
let ping = ping.clone();
253+
let pipe = pipe.map(move |x| {
254+
drop(conn_drop_ref);
255+
drop(ping);
256+
x
257+
});
258+
// Clear send task
259+
self.executor.execute(pipe);
260+
}
261+
}
262+
}
263+
264+
None
265+
} else {
266+
Some(f.body_tx)
267+
};
268+
269+
let fut = f.fut.map(move |result| match result {
270+
Ok(res) => {
271+
// record that we got the response headers
272+
ping.record_non_data();
273+
274+
let content_length = headers::content_length_parse_all(res.headers());
275+
if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
276+
if content_length.map_or(false, |len| len != 0) {
277+
warn!("h2 connect response with non-zero body not supported");
278+
279+
send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
280+
return Err((
281+
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
282+
None,
283+
));
284+
}
285+
let (parts, recv_stream) = res.into_parts();
286+
let mut res = Response::from_parts(parts, IncomingBody::empty());
287+
288+
let (pending, on_upgrade) = crate::upgrade::pending();
289+
let io = H2Upgraded {
290+
ping,
291+
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
292+
recv_stream,
293+
buf: Bytes::new(),
294+
};
295+
let upgraded = Upgraded::new(io, Bytes::new());
296+
297+
pending.fulfill(upgraded);
298+
res.extensions_mut().insert(on_upgrade);
299+
300+
Ok(res)
301+
} else {
302+
let res = res.map(|stream| {
303+
let ping = ping.for_stream(&stream);
304+
IncomingBody::h2(stream, content_length.into(), ping)
305+
});
306+
Ok(res)
307+
}
308+
}
309+
Err(err) => {
310+
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
311+
312+
debug!("client response error: {}", err);
313+
Err((crate::Error::new_h2(err), None))
314+
}
315+
});
316+
self.executor.execute(f.cb.send_when(fut));
317+
}
318+
}
319+
208320
impl<B> Future for ClientTask<B>
209321
where
210322
B: Body + Send + 'static,
@@ -228,6 +340,16 @@ where
228340
}
229341
};
230342

343+
match self.fut_ctx.take() {
344+
// If we were waiting on pending open
345+
// continue where we left off.
346+
Some(f) => {
347+
self.poll_pipe(f, cx);
348+
continue;
349+
}
350+
None => (),
351+
}
352+
231353
match self.req_rx.poll_recv(cx) {
232354
Poll::Ready(Some((req, cb))) => {
233355
// check that future hasn't been canceled already
@@ -246,7 +368,6 @@ where
246368

247369
let is_connect = req.method() == Method::CONNECT;
248370
let eos = body.is_end_stream();
249-
let ping = self.ping.clone();
250371

251372
if is_connect {
252373
if headers::content_length_parse_all(req.headers())
@@ -274,90 +395,31 @@ where
274395
}
275396
};
276397

277-
let send_stream = if !is_connect {
278-
if !eos {
279-
let mut pipe =
280-
Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
281-
if let Err(e) = res {
282-
debug!("client request body error: {}", e);
283-
}
284-
});
285-
286-
// eagerly see if the body pipe is ready and
287-
// can thus skip allocating in the executor
288-
match Pin::new(&mut pipe).poll(cx) {
289-
Poll::Ready(_) => (),
290-
Poll::Pending => {
291-
let conn_drop_ref = self.conn_drop_ref.clone();
292-
// keep the ping recorder's knowledge of an
293-
// "open stream" alive while this body is
294-
// still sending...
295-
let ping = ping.clone();
296-
let pipe = pipe.map(move |x| {
297-
drop(conn_drop_ref);
298-
drop(ping);
299-
x
300-
});
301-
self.executor.execute(pipe);
302-
}
303-
}
304-
}
305-
306-
None
307-
} else {
308-
Some(body_tx)
398+
let f = FutCtx {
399+
is_connect,
400+
eos,
401+
fut,
402+
body_tx,
403+
body,
404+
cb,
309405
};
310406

311-
let fut = fut.map(move |result| match result {
312-
Ok(res) => {
313-
// record that we got the response headers
314-
ping.record_non_data();
315-
316-
let content_length = headers::content_length_parse_all(res.headers());
317-
if let (Some(mut send_stream), StatusCode::OK) =
318-
(send_stream, res.status())
319-
{
320-
if content_length.map_or(false, |len| len != 0) {
321-
warn!("h2 connect response with non-zero body not supported");
322-
323-
send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
324-
return Err((
325-
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
326-
None,
327-
));
328-
}
329-
let (parts, recv_stream) = res.into_parts();
330-
let mut res = Response::from_parts(parts, IncomingBody::empty());
331-
332-
let (pending, on_upgrade) = crate::upgrade::pending();
333-
let io = H2Upgraded {
334-
ping,
335-
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
336-
recv_stream,
337-
buf: Bytes::new(),
338-
};
339-
let upgraded = Upgraded::new(io, Bytes::new());
340-
341-
pending.fulfill(upgraded);
342-
res.extensions_mut().insert(on_upgrade);
343-
344-
Ok(res)
345-
} else {
346-
let res = res.map(|stream| {
347-
let ping = ping.for_stream(&stream);
348-
IncomingBody::h2(stream, content_length.into(), ping)
349-
});
350-
Ok(res)
351-
}
407+
// Check poll_ready() again.
408+
// If the call to send_request() resulted in the new stream being pending open
409+
// we have to wait for the open to complete before accepting new requests.
410+
match self.h2_tx.poll_ready(cx) {
411+
Poll::Pending => {
412+
// Save Context
413+
self.fut_ctx = Some(f);
414+
return Poll::Pending;
352415
}
353-
Err(err) => {
354-
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
355-
356-
debug!("client response error: {}", err);
357-
Err((crate::Error::new_h2(err), None))
416+
Poll::Ready(Ok(())) => (),
417+
Poll::Ready(Err(err)) => {
418+
f.cb.send(Err((crate::Error::new_h2(err), None)));
419+
continue;
358420
}
359-
});
360-
self.executor.execute(cb.send_when(fut));
421+
}
422+
self.poll_pipe(f, cx);
361423
continue;
362424
}
363425

0 commit comments

Comments
 (0)