Skip to content

Commit a3f01c1

Browse files
authored
perf: Improve throughput when vectored IO is not enabled (#712)
As discussed in #711, the current implementation of sending data is suboptimal when vectored I/O is not enabled: data frame's head is likely to be sent in a separate TCP segment, whose payload is of only 9 bytes. This PR adds some specialized implementaton for non-vectored I/O case. In short, it sets a larget chain threhold, and also makes sure a data frame's head is sent along with the beginning part of the real data payload. All existing unit tests passed. Also I take a look at the e2e https://github.com/hyperium/hyper/blob/0.14.x/benches/end_to_end.rs but realize that all the benchmarks there are for the case of vectored I/O if the OS supports vectored I/O. There isn't a specific case for non-vectored I/O so I am not sure how to proceed with benchmark for performance evaluations.
1 parent 62cf7a6 commit a3f01c1

File tree

2 files changed

+37
-49
lines changed

2 files changed

+37
-49
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ members = [
4444
futures-core = { version = "0.3", default-features = false }
4545
futures-sink = { version = "0.3", default-features = false }
4646
futures-util = { version = "0.3", default-features = false }
47-
tokio-util = { version = "0.7.1", features = ["codec"] }
47+
tokio-util = { version = "0.7.1", features = ["codec", "io"] }
4848
tokio = { version = "1", features = ["io-util"] }
4949
bytes = "1"
5050
http = "0.2"

src/codec/framed_write.rs

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut};
77
use std::pin::Pin;
88
use std::task::{Context, Poll};
99
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
10+
use tokio_util::io::poll_write_buf;
1011

11-
use std::io::{self, Cursor, IoSlice};
12+
use std::io::{self, Cursor};
1213

1314
// A macro to get around a method needing to borrow &mut self
1415
macro_rules! limited_write_buf {
@@ -45,8 +46,11 @@ struct Encoder<B> {
4546
/// Max frame size, this is specified by the peer
4647
max_frame_size: FrameSize,
4748

48-
/// Whether or not the wrapped `AsyncWrite` supports vectored IO.
49-
is_write_vectored: bool,
49+
/// Chain payloads bigger than this.
50+
chain_threshold: usize,
51+
52+
/// Min buffer required to attempt to write a frame
53+
min_buffer_capacity: usize,
5054
}
5155

5256
#[derive(Debug)]
@@ -61,22 +65,28 @@ enum Next<B> {
6165
/// frame that big.
6266
const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024;
6367

64-
/// Min buffer required to attempt to write a frame
65-
const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD;
66-
67-
/// Chain payloads bigger than this. The remote will never advertise a max frame
68-
/// size less than this (well, the spec says the max frame size can't be less
69-
/// than 16kb, so not even close).
68+
/// Chain payloads bigger than this when vectored I/O is enabled. The remote
69+
/// will never advertise a max frame size less than this (well, the spec says
70+
/// the max frame size can't be less than 16kb, so not even close).
7071
const CHAIN_THRESHOLD: usize = 256;
7172

73+
/// Chain payloads bigger than this when vectored I/O is **not** enabled.
74+
/// A larger value in this scenario will reduce the number of small and
75+
/// fragmented data being sent, and hereby improve the throughput.
76+
const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024;
77+
7278
// TODO: Make generic
7379
impl<T, B> FramedWrite<T, B>
7480
where
7581
T: AsyncWrite + Unpin,
7682
B: Buf,
7783
{
7884
pub fn new(inner: T) -> FramedWrite<T, B> {
79-
let is_write_vectored = inner.is_write_vectored();
85+
let chain_threshold = if inner.is_write_vectored() {
86+
CHAIN_THRESHOLD
87+
} else {
88+
CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
89+
};
8090
FramedWrite {
8191
inner,
8292
encoder: Encoder {
@@ -85,7 +95,8 @@ where
8595
next: None,
8696
last_data_frame: None,
8797
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
88-
is_write_vectored,
98+
chain_threshold,
99+
min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
89100
},
90101
}
91102
}
@@ -126,23 +137,17 @@ where
126137
Some(Next::Data(ref mut frame)) => {
127138
tracing::trace!(queued_data_frame = true);
128139
let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
129-
ready!(write(
130-
&mut self.inner,
131-
self.encoder.is_write_vectored,
132-
&mut buf,
133-
cx,
134-
))?
140+
ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
135141
}
136142
_ => {
137143
tracing::trace!(queued_data_frame = false);
138-
ready!(write(
139-
&mut self.inner,
140-
self.encoder.is_write_vectored,
141-
&mut self.encoder.buf,
144+
ready!(poll_write_buf(
145+
Pin::new(&mut self.inner),
142146
cx,
147+
&mut self.encoder.buf
143148
))?
144149
}
145-
}
150+
};
146151
}
147152

148153
match self.encoder.unset_frame() {
@@ -165,30 +170,6 @@ where
165170
}
166171
}
167172

168-
fn write<T, B>(
169-
writer: &mut T,
170-
is_write_vectored: bool,
171-
buf: &mut B,
172-
cx: &mut Context<'_>,
173-
) -> Poll<io::Result<()>>
174-
where
175-
T: AsyncWrite + Unpin,
176-
B: Buf,
177-
{
178-
// TODO(eliza): when tokio-util 0.5.1 is released, this
179-
// could just use `poll_write_buf`...
180-
const MAX_IOVS: usize = 64;
181-
let n = if is_write_vectored {
182-
let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
183-
let cnt = buf.chunks_vectored(&mut bufs);
184-
ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))?
185-
} else {
186-
ready!(Pin::new(writer).poll_write(cx, buf.chunk()))?
187-
};
188-
buf.advance(n);
189-
Ok(()).into()
190-
}
191-
192173
#[must_use]
193174
enum ControlFlow {
194175
Continue,
@@ -240,12 +221,17 @@ where
240221
return Err(PayloadTooBig);
241222
}
242223

243-
if len >= CHAIN_THRESHOLD {
224+
if len >= self.chain_threshold {
244225
let head = v.head();
245226

246227
// Encode the frame head to the buffer
247228
head.encode(len, self.buf.get_mut());
248229

230+
if self.buf.get_ref().remaining() < self.chain_threshold {
231+
let extra_bytes = self.chain_threshold - self.buf.remaining();
232+
self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
233+
}
234+
249235
// Save the data frame
250236
self.next = Some(Next::Data(v));
251237
} else {
@@ -305,7 +291,9 @@ where
305291
}
306292

307293
fn has_capacity(&self) -> bool {
308-
self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY
294+
self.next.is_none()
295+
&& (self.buf.get_ref().capacity() - self.buf.get_ref().len()
296+
>= self.min_buffer_capacity)
309297
}
310298

311299
fn is_empty(&self) -> bool {

0 commit comments

Comments
 (0)