diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 000000000..00642f837 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: seanmonstar diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 2cff15cff..437e6e088 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Rust uses: dtolnay/rust-toolchain@stable @@ -31,12 +31,11 @@ jobs: strategy: matrix: rust: - - nightly - beta - stable steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Rust (${{ matrix.rust }}) uses: dtolnay/rust-toolchain@master @@ -61,9 +60,12 @@ jobs: run: ./ci/h2spec.sh if: matrix.rust == 'stable' - - name: Check minimal versions - run: cargo clean; cargo update -Zminimal-versions; cargo check - if: matrix.rust == 'nightly' + #clippy_check: + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v4 + # - name: Run Clippy + # run: cargo clippy --all-targets --all-features msrv: name: Check MSRV @@ -73,17 +75,27 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Get MSRV from package metadata - id: metadata - run: | - cargo metadata --no-deps --format-version 1 | - jq -r '"msrv=" + (.packages[] | select(.name == "h2")).rust_version' >> $GITHUB_OUTPUT + id: msrv + run: grep rust-version Cargo.toml | cut -d '"' -f2 | sed 's/^/version=/' >> $GITHUB_OUTPUT - name: Install Rust (${{ steps.metadata.outputs.msrv }}) + id: msrv-toolchain uses: dtolnay/rust-toolchain@master with: - toolchain: ${{ steps.metadata.outputs.msrv }} + toolchain: ${{ steps.msrv.outputs.version }} - - run: cargo check + - run: cargo check -p h2 + + minimal-versions: + runs-on: ubuntu-latest + needs: [style] + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@nightly + - uses: dtolnay/rust-toolchain@stable + - uses: taiki-e/install-action@cargo-hack + - uses: taiki-e/install-action@cargo-minimal-versions + - run: cargo minimal-versions --ignore-private check diff --git a/CHANGELOG.md b/CHANGELOG.md index 31852daff..f5dc74672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,42 @@ +# 0.3.26 (April 3, 2024) + +* Limit number of CONTINUATION frames for misbehaving connections. + +# 0.3.25 (March 15, 2024) + +* Improve performance decoding many headers. + +# 0.3.24 (January 17, 2024) + +* Limit error resets for misbehaving connections. + +# 0.3.23 (January 10, 2024) + +* Backport fix from 0.4.1 for stream capacity assignment. + +# 0.3.22 (November 15, 2023) + +* Add `header_table_size(usize)` option to client and server builders. +* Improve throughput when vectored IO is not available. +* Update indexmap to 2. + +# 0.3.21 (August 21, 2023) + +* Fix opening of new streams over peer's max concurrent limit. +* Fix `RecvStream` to return data even if it has received a `CANCEL` stream error. +* Update MSRV to 1.63. + +# 0.3.20 (June 26, 2023) + +* Fix panic if a server received a request with a `:status` pseudo header in the 1xx range. +* Fix panic if a reset stream had pending push promises that were more than allowed. +* Fix potential flow control overflow by subtraction, instead returning a connection error. + +# 0.3.19 (May 12, 2023) + +* Fix counting reset streams when triggered by a GOAWAY. +* Send `too_many_resets` in opaque debug data of GOAWAY when too many resets received. + # 0.3.18 (April 17, 2023) * Fix panic because of opposite check in `is_remote_local()`. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 10e74bf29..8af0abcc7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -5,7 +5,7 @@ ## Getting Help ## If you have a question about the h2 library or have encountered problems using it, you may -[file an issue][issue] or ask ask a question on the [Tokio Gitter][gitter]. +[file an issue][issue] or ask a question on the [Tokio Gitter][gitter]. ## Submitting a Pull Request ## @@ -15,7 +15,7 @@ Do you have an improvement? 2. We will try to respond to your issue promptly. 3. Fork this repo, develop and test your code changes. See the project's [README](README.md) for further information about working in this repository. 4. Submit a pull request against this repo's `master` branch. -6. Your branch may be merged once all configured checks pass, including: +5. Your branch may be merged once all configured checks pass, including: - Code review has been completed. - The branch has passed tests in CI. diff --git a/Cargo.toml b/Cargo.toml index 767961d0a..a84beebc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.18" +version = "0.3.26" license = "MIT" authors = [ "Carl Lerche ", @@ -19,7 +19,7 @@ keywords = ["http", "async", "non-blocking"] categories = ["asynchronous", "web-programming", "network-programming"] exclude = ["fixtures/**", "ci/**"] edition = "2018" -rust-version = "1.56" +rust-version = "1.63" [features] # Enables `futures::Stream` implementations for various types. @@ -44,14 +44,14 @@ members = [ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } -tokio-util = { version = "0.7.1", features = ["codec"] } +tokio-util = { version = "0.7.1", features = ["codec", "io"] } tokio = { version = "1", features = ["io-util"] } bytes = "1" http = "0.2" -tracing = { version = "0.1.21", default-features = false, features = ["std"] } +tracing = { version = "0.1.35", default-features = false, features = ["std"] } fnv = "1.0.5" slab = "0.4.2" -indexmap = { version = "1.5.2", features = ["std"] } +indexmap = { version = "2", features = ["std"] } [dev-dependencies] @@ -67,9 +67,9 @@ serde_json = "1.0.0" # Examples tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } -env_logger = { version = "0.9", default-features = false } -tokio-rustls = "0.23.2" -webpki-roots = "0.22.2" +env_logger = { version = "0.10", default-features = false } +tokio-rustls = "0.24" +webpki-roots = "0.25" [package.metadata.docs.rs] features = ["stream"] diff --git a/examples/akamai.rs b/examples/akamai.rs index 1d0b17baf..788bf3005 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -17,7 +17,7 @@ pub async fn main() -> Result<(), Box> { let tls_client_config = std::sync::Arc::new({ let mut root_store = RootCertStore::empty(); - root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| { + root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| { OwnedTrustAnchor::from_subject_spki_name_constraints( ta.subject, ta.spki, diff --git a/src/client.rs b/src/client.rs index 4147e8a46..29dd0ef2c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -336,6 +336,12 @@ pub struct Builder { /// The stream ID of the first (lowest) stream. Subsequent streams will use /// monotonically increasing stream IDs. stream_id: StreamId, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + local_max_error_reset_streams: Option, } #[derive(Debug)] @@ -510,8 +516,10 @@ where self.inner .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) - .map(|stream| { - if stream.is_pending_open() { + .map(|(stream, is_full)| { + if stream.is_pending_open() && is_full { + // Only prevent sending another request when the request queue + // is not full. self.pending = Some(stream.clone_to_opaque()); } @@ -643,6 +651,7 @@ impl Builder { initial_max_send_streams: usize::MAX, settings: Default::default(), stream_id: 1.into(), + local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } } @@ -971,6 +980,23 @@ impl Builder { self } + /// Sets the maximum number of local resets due to protocol errors made by the remote end. + /// + /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. + /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. + /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. + /// + /// When the number of local resets exceeds this threshold, the client will close the connection. + /// + /// If you really want to disable this, supply [`Option::None`] here. + /// Disabling this is not recommended and may expose you to DOS attacks. + /// + /// The default value is currently 1024, but could change. + pub fn max_local_error_reset_streams(&mut self, max: Option) -> &mut Self { + self.local_max_error_reset_streams = max; + self + } + /// Sets the maximum number of pending-accept remotely-reset streams. /// /// Streams that have been received by the peer, but not accepted by the @@ -1021,7 +1047,7 @@ impl Builder { /// stream have been written to the connection, the send buffer capacity /// will be freed up again. /// - /// The default is currently ~400MB, but may change. + /// The default is currently ~400KB, but may change. /// /// # Panics /// @@ -1070,6 +1096,39 @@ impl Builder { self } + /// Sets the header table size. + /// + /// This setting informs the peer of the maximum size of the header compression + /// table used to encode header blocks, in octets. The encoder may select any value + /// equal to or less than the header table size specified by the sender. + /// + /// The default value is 4,096. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::client::*; + /// # use bytes::Bytes; + /// # + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> + /// # { + /// // `client_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let client_fut = Builder::new() + /// .header_table_size(1_000_000) + /// .handshake(my_io); + /// # client_fut.await + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn header_table_size(&mut self, size: u32) -> &mut Self { + self.settings.set_header_table_size(Some(size)); + self + } + /// Sets the first stream ID to something other than 1. #[cfg(feature = "unstable")] pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { @@ -1258,6 +1317,7 @@ where reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, remote_reset_stream_max: builder.pending_accept_reset_stream_max, + local_error_reset_streams_max: builder.local_max_error_reset_streams, settings: builder.settings.clone(), }, ); @@ -1571,9 +1631,11 @@ impl proto::Peer for Peer { proto::DynPeer::Client } + /* fn is_server() -> bool { false } + */ fn convert_poll_message( pseudo: Pseudo, diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index a874d7732..9270a8635 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -30,6 +30,8 @@ pub struct FramedRead { max_header_list_size: usize, + max_continuation_frames: usize, + partial: Option, } @@ -41,6 +43,8 @@ struct Partial { /// Partial header payload buf: BytesMut, + + continuation_frames_count: usize, } #[derive(Debug)] @@ -51,10 +55,14 @@ enum Continuable { impl FramedRead { pub fn new(inner: InnerFramedRead) -> FramedRead { + let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE; + let max_continuation_frames = + calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length()); FramedRead { inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), - max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, + max_header_list_size, + max_continuation_frames, partial: None, } } @@ -68,7 +76,6 @@ impl FramedRead { } /// Returns the current max frame size setting - #[cfg(feature = "unstable")] #[inline] pub fn max_frame_size(&self) -> usize { self.inner.decoder().max_frame_length() @@ -80,14 +87,33 @@ impl FramedRead { #[inline] pub fn set_max_frame_size(&mut self, val: usize) { assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize); - self.inner.decoder_mut().set_max_frame_length(val) + self.inner.decoder_mut().set_max_frame_length(val); + // Update max CONTINUATION frames too, since its based on this + self.max_continuation_frames = calc_max_continuation_frames(self.max_header_list_size, val); } /// Update the max header list size setting. #[inline] pub fn set_max_header_list_size(&mut self, val: usize) { self.max_header_list_size = val; + // Update max CONTINUATION frames too, since its based on this + self.max_continuation_frames = calc_max_continuation_frames(val, self.max_frame_size()); } + + /// Update the header table size setting. + #[inline] + pub fn set_header_table_size(&mut self, val: usize) { + self.hpack.queue_size_update(val); + } +} + +fn calc_max_continuation_frames(header_max: usize, frame_max: usize) -> usize { + // At least this many frames needed to use max header list size + let min_frames_for_list = (header_max / frame_max).max(1); + // Some padding for imperfectly packed frames + // 25% without floats + let padding = min_frames_for_list >> 2; + min_frames_for_list.saturating_add(padding).max(5) } /// Decodes a frame. @@ -96,6 +122,7 @@ impl FramedRead { fn decode_frame( hpack: &mut hpack::Decoder, max_header_list_size: usize, + max_continuation_frames: usize, partial_inout: &mut Option, mut bytes: BytesMut, ) -> Result, Error> { @@ -163,6 +190,7 @@ fn decode_frame( *partial_inout = Some(Partial { frame: Continuable::$frame(frame), buf: payload, + continuation_frames_count: 0, }); return Ok(None); @@ -267,6 +295,22 @@ fn decode_frame( return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } + // Check for CONTINUATION flood + if is_end_headers { + partial.continuation_frames_count = 0; + } else { + let cnt = partial.continuation_frames_count + 1; + if cnt > max_continuation_frames { + tracing::debug!("too_many_continuations, max = {}", max_continuation_frames); + return Err(Error::library_go_away_data( + Reason::ENHANCE_YOUR_CALM, + "too_many_continuations", + )); + } else { + partial.continuation_frames_count = cnt; + } + } + // Extend the buf if partial.buf.is_empty() { partial.buf = bytes.split_off(frame::HEADER_LEN); @@ -348,9 +392,16 @@ where ref mut hpack, max_header_list_size, ref mut partial, + max_continuation_frames, .. } = *self; - if let Some(frame) = decode_frame(hpack, max_header_list_size, partial, bytes)? { + if let Some(frame) = decode_frame( + hpack, + max_header_list_size, + max_continuation_frames, + partial, + bytes, + )? { tracing::debug!(?frame, "received"); return Poll::Ready(Some(Ok(frame))); } diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 4b1b4accc..c88af02da 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio_util::io::poll_write_buf; -use std::io::{self, Cursor, IoSlice}; +use std::io::{self, Cursor}; // A macro to get around a method needing to borrow &mut self macro_rules! limited_write_buf { @@ -45,8 +46,11 @@ struct Encoder { /// Max frame size, this is specified by the peer max_frame_size: FrameSize, - /// Whether or not the wrapped `AsyncWrite` supports vectored IO. - is_write_vectored: bool, + /// Chain payloads bigger than this. + chain_threshold: usize, + + /// Min buffer required to attempt to write a frame + min_buffer_capacity: usize, } #[derive(Debug)] @@ -61,14 +65,16 @@ enum Next { /// frame that big. const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024; -/// Min buffer required to attempt to write a frame -const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD; - -/// Chain payloads bigger than this. The remote will never advertise a max frame -/// size less than this (well, the spec says the max frame size can't be less -/// than 16kb, so not even close). +/// Chain payloads bigger than this when vectored I/O is enabled. The remote +/// will never advertise a max frame size less than this (well, the spec says +/// the max frame size can't be less than 16kb, so not even close). const CHAIN_THRESHOLD: usize = 256; +/// Chain payloads bigger than this when vectored I/O is **not** enabled. +/// A larger value in this scenario will reduce the number of small and +/// fragmented data being sent, and hereby improve the throughput. +const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024; + // TODO: Make generic impl FramedWrite where @@ -76,7 +82,11 @@ where B: Buf, { pub fn new(inner: T) -> FramedWrite { - let is_write_vectored = inner.is_write_vectored(); + let chain_threshold = if inner.is_write_vectored() { + CHAIN_THRESHOLD + } else { + CHAIN_THRESHOLD_WITHOUT_VECTORED_IO + }; FramedWrite { inner, encoder: Encoder { @@ -85,7 +95,8 @@ where next: None, last_data_frame: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, - is_write_vectored, + chain_threshold, + min_buffer_capacity: chain_threshold + frame::HEADER_LEN, }, } } @@ -126,23 +137,17 @@ where Some(Next::Data(ref mut frame)) => { tracing::trace!(queued_data_frame = true); let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut buf, - cx, - ))? + ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? } _ => { tracing::trace!(queued_data_frame = false); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut self.encoder.buf, + ready!(poll_write_buf( + Pin::new(&mut self.inner), cx, + &mut self.encoder.buf ))? } - } + }; } match self.encoder.unset_frame() { @@ -165,30 +170,6 @@ where } } -fn write( - writer: &mut T, - is_write_vectored: bool, - buf: &mut B, - cx: &mut Context<'_>, -) -> Poll> -where - T: AsyncWrite + Unpin, - B: Buf, -{ - // TODO(eliza): when tokio-util 0.5.1 is released, this - // could just use `poll_write_buf`... - const MAX_IOVS: usize = 64; - let n = if is_write_vectored { - let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; - let cnt = buf.chunks_vectored(&mut bufs); - ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))? - } else { - ready!(Pin::new(writer).poll_write(cx, buf.chunk()))? - }; - buf.advance(n); - Ok(()).into() -} - #[must_use] enum ControlFlow { Continue, @@ -240,12 +221,17 @@ where return Err(PayloadTooBig); } - if len >= CHAIN_THRESHOLD { + if len >= self.chain_threshold { let head = v.head(); // Encode the frame head to the buffer head.encode(len, self.buf.get_mut()); + if self.buf.get_ref().remaining() < self.chain_threshold { + let extra_bytes = self.chain_threshold - self.buf.remaining(); + self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); + } + // Save the data frame self.next = Some(Next::Data(v)); } else { @@ -305,7 +291,9 @@ where } fn has_capacity(&self) -> bool { - self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY + self.next.is_none() + && (self.buf.get_ref().capacity() - self.buf.get_ref().len() + >= self.min_buffer_capacity) } fn is_empty(&self) -> bool { diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 359adf6e4..6cbdc1e18 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -95,6 +95,11 @@ impl Codec { self.framed_write().set_header_table_size(val) } + /// Set the decoder header table size size. + pub fn set_recv_header_table_size(&mut self, val: usize) { + self.inner.set_header_table_size(val) + } + /// Set the max header list size that can be received. pub fn set_max_recv_header_list_size(&mut self, val: usize) { self.inner.set_max_header_list_size(val); diff --git a/src/error.rs b/src/error.rs index 1b1438e48..96a471bcb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,7 @@ pub struct Error { #[derive(Debug)] enum Kind { /// A RST_STREAM frame was received or sent. + #[allow(dead_code)] Reset(StreamId, Reason, Initiator), /// A GO_AWAY frame was received or sent. @@ -104,7 +105,7 @@ impl Error { ) } - /// Returns true if the error was created by `h2. + /// Returns true if the error was created by `h2`. /// /// Such as noticing some protocol error and sending a GOAWAY or RST_STREAM. pub fn is_library(&self) -> bool { diff --git a/src/frame/data.rs b/src/frame/data.rs index d0cdf5f69..5ed3c31b5 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -148,7 +148,7 @@ impl Data { /// /// Panics if `dst` cannot contain the data frame. pub(crate) fn encode_chunk(&mut self, dst: &mut U) { - let len = self.data.remaining() as usize; + let len = self.data.remaining(); assert!(dst.remaining_mut() >= len); diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 91d9c4c6b..99330e981 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -8,7 +8,6 @@ use crate::frame::{self, Error, Head, Kind, Reason, StreamId}; pub struct GoAway { last_stream_id: StreamId, error_code: Reason, - #[allow(unused)] debug_data: Bytes, } @@ -21,6 +20,14 @@ impl GoAway { } } + pub fn with_debug_data(last_stream_id: StreamId, reason: Reason, debug_data: Bytes) -> Self { + Self { + last_stream_id, + error_code: reason, + debug_data, + } + } + pub fn last_stream_id(&self) -> StreamId { self.last_stream_id } @@ -52,9 +59,10 @@ impl GoAway { pub fn encode(&self, dst: &mut B) { tracing::trace!("encoding GO_AWAY; code={:?}", self.error_code); let head = Head::new(Kind::GoAway, 0, StreamId::zero()); - head.encode(8, dst); + head.encode(8 + self.debug_data.len(), dst); dst.put_u32(self.last_stream_id.into()); dst.put_u32(self.error_code.into()); + dst.put(self.debug_data.slice(..)); } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 9d5c8cefe..fb8d6b146 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -12,6 +12,7 @@ use std::fmt; use std::io::Cursor; type EncodeBuf<'a> = bytes::buf::Limit<&'a mut BytesMut>; + /// Header frame /// /// This could be either a request or a response. @@ -87,6 +88,9 @@ struct HeaderBlock { /// The decoded header fields fields: HeaderMap, + /// Precomputed size of all of our header fields, for perf reasons + field_size: usize, + /// Set to true if decoding went over the max header list size. is_over_size: bool, @@ -115,6 +119,7 @@ impl Headers { stream_id, stream_dep: None, header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo, @@ -131,6 +136,7 @@ impl Headers { stream_id, stream_dep: None, header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo: Pseudo::default(), @@ -196,6 +202,7 @@ impl Headers { stream_dep, header_block: HeaderBlock { fields: HeaderMap::new(), + field_size: 0, is_over_size: false, pseudo: Pseudo::default(), }, @@ -350,6 +357,7 @@ impl PushPromise { PushPromise { flags: PushPromiseFlag::default(), header_block: HeaderBlock { + field_size: calculate_headermap_size(&fields), fields, is_over_size: false, pseudo, @@ -441,6 +449,7 @@ impl PushPromise { flags, header_block: HeaderBlock { fields: HeaderMap::new(), + field_size: 0, is_over_size: false, pseudo: Pseudo::default(), }, @@ -892,6 +901,8 @@ impl HeaderBlock { headers_size += decoded_header_size(name.as_str().len(), value.len()); if headers_size < max_header_list_size { + self.field_size += + decoded_header_size(name.as_str().len(), value.len()); self.fields.append(name, value); } else if !self.is_over_size { tracing::trace!("load_hpack; header list size over max"); @@ -958,14 +969,16 @@ impl HeaderBlock { + pseudo_size!(status) + pseudo_size!(authority) + pseudo_size!(path) - + self - .fields - .iter() - .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) - .sum::() + + self.field_size } } +fn calculate_headermap_size(map: &HeaderMap) -> usize { + map.iter() + .map(|(name, value)| decoded_header_size(name.as_str().len(), value.len())) + .sum::() +} + fn decoded_header_size(name: usize, value: usize) -> usize { name + value + 32 } @@ -974,8 +987,6 @@ fn decoded_header_size(name: usize, value: usize) -> usize { mod test { use std::iter::FromIterator; - use http::HeaderValue; - use super::*; use crate::frame; use crate::hpack::{huffman, Encoder}; diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 570a162a8..0e8e7035c 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -69,7 +69,7 @@ pub use crate::hpack::BytesStr; pub use self::settings::{ DEFAULT_INITIAL_WINDOW_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, - MAX_INITIAL_WINDOW_SIZE, MAX_MAX_FRAME_SIZE, + MAX_MAX_FRAME_SIZE, }; pub type FrameSize = u32; diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 0c913f059..484498a9d 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -121,11 +121,9 @@ impl Settings { self.header_table_size } - /* pub fn set_header_table_size(&mut self, size: Option) { self.header_table_size = size; } - */ pub fn load(head: Head, payload: &[u8]) -> Result { use self::Setting::*; diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index b45c37927..e48976c36 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -447,7 +447,7 @@ fn decode_int(buf: &mut B, prefix_size: u8) -> Result(buf: &mut B) -> Option { +fn peek_u8(buf: &B) -> Option { if buf.has_remaining() { Some(buf.chunk()[0]) } else { @@ -829,15 +829,14 @@ pub fn get_static(idx: usize) -> Header { #[cfg(test)] mod test { use super::*; - use crate::hpack::Header; #[test] fn test_peek_u8() { let b = 0xff; let mut buf = Cursor::new(vec![b]); - assert_eq!(peek_u8(&mut buf), Some(b)); + assert_eq!(peek_u8(&buf), Some(b)); assert_eq!(buf.get_u8(), b); - assert_eq!(peek_u8(&mut buf), None); + assert_eq!(peek_u8(&buf), None); } #[test] diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index d121a2aaf..bd49056f6 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -299,7 +299,6 @@ fn position(buf: &BytesMut) -> usize { #[cfg(test)] mod test { use super::*; - use crate::hpack::Header; use http::*; #[test] diff --git a/src/hpack/table.rs b/src/hpack/table.rs index a1a780451..3e45f413b 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -319,7 +319,7 @@ impl Table { let mut probe = probe + 1; probe_loop!(probe < self.indices.len(), { - let pos = &mut self.indices[probe as usize]; + let pos = &mut self.indices[probe]; prev = match mem::replace(pos, Some(prev)) { Some(p) => p, @@ -656,12 +656,12 @@ fn to_raw_capacity(n: usize) -> usize { #[inline] fn desired_pos(mask: usize, hash: HashValue) -> usize { - (hash.0 & mask) as usize + hash.0 & mask } #[inline] fn probe_distance(mask: usize, hash: HashValue, current: usize) -> usize { - current.wrapping_sub(desired_pos(mask, hash)) & mask as usize + current.wrapping_sub(desired_pos(mask, hash)) & mask } fn hash_header(header: &Header) -> HashValue { diff --git a/src/hpack/test/fixture.rs b/src/hpack/test/fixture.rs index 0d33ca2de..d3f76e3bf 100644 --- a/src/hpack/test/fixture.rs +++ b/src/hpack/test/fixture.rs @@ -100,7 +100,7 @@ fn test_story(story: Value) { let mut input: Vec<_> = case .expect .iter() - .map(|&(ref name, ref value)| { + .map(|(name, value)| { Header::new(name.clone().into(), value.clone().into()) .unwrap() .into() diff --git a/src/lib.rs b/src/lib.rs index 420e0fee1..91fa322f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,10 +78,14 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.18")] -#![deny(missing_debug_implementations, missing_docs)] -#![cfg_attr(test, deny(warnings))] +#![deny( + missing_debug_implementations, + missing_docs, + clippy::missing_safety_doc, + clippy::undocumented_unsafe_blocks +)] #![allow(clippy::type_complexity, clippy::manual_range_contains)] +#![cfg_attr(test, deny(warnings))] macro_rules! proto_err { (conn: $($msg:tt)+) => { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 619973df8..1fef38408 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,18 +1,18 @@ use crate::codec::UserError; use crate::frame::{Reason, StreamId}; -use crate::{client, frame, server}; +use crate::{client, server}; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures_core::Stream; use std::io; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; /// An H2 connection #[derive(Debug)] @@ -81,6 +81,7 @@ pub(crate) struct Config { pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub remote_reset_stream_max: usize, + pub local_error_reset_streams_max: Option, pub settings: frame::Settings, } @@ -125,6 +126,7 @@ where .settings .max_concurrent_streams() .map(|max| max as usize), + local_max_error_reset_streams: config.local_error_reset_streams_max, } } let streams = Streams::new(streams_config(&config)); @@ -145,7 +147,9 @@ where /// connection flow control pub(crate) fn set_target_window_size(&mut self, size: WindowSize) { - self.inner.streams.set_target_connection_window_size(size); + let _res = self.inner.streams.set_target_connection_window_size(size); + // TODO: proper error handling + debug_assert!(_res.is_ok()); } /// Send a new SETTINGS frame with an updated initial window size. @@ -398,6 +402,12 @@ where self.go_away.go_away_now(frame); } + fn go_away_now_data(&mut self, e: Reason, data: Bytes) { + let last_processed_id = self.streams.last_processed_id(); + let frame = frame::GoAway::with_debug_data(last_processed_id, e, data); + self.go_away.go_away_now(frame); + } + fn go_away_from_user(&mut self, e: Reason) { let last_processed_id = self.streams.last_processed_id(); let frame = frame::GoAway::new(last_processed_id, e); @@ -418,7 +428,7 @@ where // error. This is handled by setting a GOAWAY frame followed by // terminating the connection. Err(Error::GoAway(debug_data, reason, initiator)) => { - let e = Error::GoAway(debug_data, reason, initiator); + let e = Error::GoAway(debug_data.clone(), reason, initiator); tracing::debug!(error = ?e, "Connection::poll; connection error"); // We may have already sent a GOAWAY for this error, @@ -435,7 +445,7 @@ where // Reset all active streams self.streams.handle_error(e); - self.go_away_now(reason); + self.go_away_now_data(reason, debug_data); Ok(()) } // Attempting to read a frame resulted in a stream level error. diff --git a/src/proto/error.rs b/src/proto/error.rs index 2c00c7ea6..ad023317e 100644 --- a/src/proto/error.rs +++ b/src/proto/error.rs @@ -40,6 +40,10 @@ impl Error { Self::GoAway(Bytes::new(), reason, Initiator::Library) } + pub(crate) fn library_go_away_data(reason: Reason, debug_data: impl Into) -> Self { + Self::GoAway(debug_data.into(), reason, Initiator::Library) + } + pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self { Self::Reset(stream_id, reason, Initiator::Remote) } diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 759427878..d52252cd7 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -26,10 +26,6 @@ pub(super) struct GoAway { /// were a `frame::GoAway`, it might appear like we eventually wanted to /// serialize it. We **only** want to be able to look up these fields at a /// later time. -/// -/// (Technically, `frame::GoAway` should gain an opaque_debug_data field as -/// well, and we wouldn't want to save that here to accidentally dump in logs, -/// or waste struct space.) #[derive(Debug)] pub(crate) struct GoingAway { /// Stores the highest stream ID of a GOAWAY that has been sent. diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d71ee9c42..560927598 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -30,8 +30,9 @@ pub type PingPayload = [u8; 8]; pub type WindowSize = u32; // Constants -pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; +pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32 pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20; +pub const DEFAULT_LOCAL_RESET_COUNT_MAX: usize = 1024; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index d62d9e24e..cbe7fb289 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -14,7 +14,7 @@ pub(crate) trait Peer { fn r#dyn() -> Dyn; - fn is_server() -> bool; + //fn is_server() -> bool; fn convert_poll_message( pseudo: Pseudo, @@ -22,10 +22,12 @@ pub(crate) trait Peer { stream_id: StreamId, ) -> Result; + /* fn is_local_init(id: StreamId) -> bool { assert!(!id.is_zero()); Self::is_server() == id.is_server_initiated() } + */ } /// A dynamic representation of `Peer`. diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 6cc617209..93949d4f5 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,5 @@ use crate::codec::UserError; use crate::error::Reason; -use crate::frame; use crate::proto::*; use std::task::{Context, Poll}; @@ -60,6 +59,10 @@ impl Settings { codec.set_max_recv_header_list_size(max as usize); } + if let Some(val) = local.header_table_size() { + codec.set_recv_header_table_size(val as usize); + } + streams.apply_local_settings(local)?; self.local = Local::Synced; Ok(()) diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 6a5aa9ccd..710d42c8d 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -31,6 +31,16 @@ pub(super) struct Counts { /// Current number of "pending accept" streams that were remotely reset num_remote_reset_streams: usize, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + max_local_error_reset_streams: Option, + + /// Total number of locally reset streams due to protocol error across the + /// lifetime of the connection. + num_local_error_reset_streams: usize, } impl Counts { @@ -46,9 +56,19 @@ impl Counts { num_local_reset_streams: 0, max_remote_reset_streams: config.remote_reset_max, num_remote_reset_streams: 0, + max_local_error_reset_streams: config.local_max_error_reset_streams, + num_local_error_reset_streams: 0, } } + /// Returns true when the next opened stream will reach capacity of outbound streams + /// + /// The number of client send streams is incremented in prioritize; send_request has to guess if + /// it should wait before allowing another request to be sent. + pub fn next_send_stream_will_reach_capacity(&self) -> bool { + self.max_send_streams <= (self.num_send_streams + 1) + } + /// Returns the current peer pub fn peer(&self) -> peer::Dyn { self.peer @@ -58,6 +78,26 @@ impl Counts { self.num_send_streams != 0 || self.num_recv_streams != 0 } + /// Returns true if we can issue another local reset due to protocol error. + pub fn can_inc_num_local_error_resets(&self) -> bool { + if let Some(max) = self.max_local_error_reset_streams { + max > self.num_local_error_reset_streams + } else { + true + } + } + + pub fn inc_num_local_error_resets(&mut self) { + assert!(self.can_inc_num_local_error_resets()); + + // Increment the number of remote initiated streams + self.num_local_error_reset_streams += 1; + } + + pub(crate) fn max_local_error_resets(&self) -> Option { + self.max_local_error_reset_streams + } + /// Returns true if the receive stream concurrency can be incremented pub fn can_inc_num_recv_streams(&self) -> bool { self.max_recv_streams > self.num_recv_streams diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 73a7754db..57a935825 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -75,12 +75,12 @@ impl FlowControl { self.window_size > self.available } - pub fn claim_capacity(&mut self, capacity: WindowSize) { - self.available -= capacity; + pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { + self.available.decrease_by(capacity) } - pub fn assign_capacity(&mut self, capacity: WindowSize) { - self.available += capacity; + pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { + self.available.increase_by(capacity) } /// If a WINDOW_UPDATE frame should be sent, returns a positive number @@ -136,22 +136,23 @@ impl FlowControl { /// /// This is called after receiving a SETTINGS frame with a lower /// INITIAL_WINDOW_SIZE value. - pub fn dec_send_window(&mut self, sz: WindowSize) { + pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "dec_window; sz={}; window={}, available={}", sz, self.window_size, self.available ); - // This should not be able to overflow `window_size` from the bottom. - self.window_size -= sz; + // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can. + self.window_size.decrease_by(sz)?; + Ok(()) } /// Decrement the recv-side window size. /// /// This is called after receiving a SETTINGS ACK frame with a lower /// INITIAL_WINDOW_SIZE value. - pub fn dec_recv_window(&mut self, sz: WindowSize) { + pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "dec_recv_window; sz={}; window={}, available={}", sz, @@ -159,13 +160,14 @@ impl FlowControl { self.available ); // This should not be able to overflow `window_size` from the bottom. - self.window_size -= sz; - self.available -= sz; + self.window_size.decrease_by(sz)?; + self.available.decrease_by(sz)?; + Ok(()) } /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. - pub fn send_data(&mut self, sz: WindowSize) { + pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "send_data; sz={}; window={}; available={}", sz, @@ -176,12 +178,13 @@ impl FlowControl { // If send size is zero it's meaningless to update flow control window if sz > 0 { // Ensure that the argument is correct - assert!(self.window_size >= sz as usize); + assert!(self.window_size.0 >= sz as i32); // Update values - self.window_size -= sz; - self.available -= sz; + self.window_size.decrease_by(sz)?; + self.available.decrease_by(sz)?; } + Ok(()) } } @@ -208,6 +211,29 @@ impl Window { assert!(self.0 >= 0, "negative Window"); self.0 as WindowSize } + + pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> { + if let Some(v) = self.0.checked_sub(other as i32) { + self.0 = v; + Ok(()) + } else { + Err(Reason::FLOW_CONTROL_ERROR) + } + } + + pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> { + let other = self.add(other)?; + self.0 = other.0; + Ok(()) + } + + pub fn add(&self, other: WindowSize) -> Result { + if let Some(v) = self.0.checked_add(other as i32) { + Ok(Self(v)) + } else { + Err(Reason::FLOW_CONTROL_ERROR) + } + } } impl PartialEq for Window { @@ -230,25 +256,6 @@ impl PartialOrd for Window { } } -impl ::std::ops::SubAssign for Window { - fn sub_assign(&mut self, other: WindowSize) { - self.0 -= other as i32; - } -} - -impl ::std::ops::Add for Window { - type Output = Self; - fn add(self, other: WindowSize) -> Self::Output { - Window(self.0 + other as i32) - } -} - -impl ::std::ops::AddAssign for Window { - fn add_assign(&mut self, other: WindowSize) { - self.0 += other as i32; - } -} - impl fmt::Display for Window { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.0, f) diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index fbe32c7b0..b347442af 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -69,4 +69,10 @@ pub struct Config { /// Maximum number of remote initiated streams pub remote_max_initiated: Option, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + pub local_max_error_reset_streams: Option, } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 88204ddcc..14b37e223 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,12 +1,12 @@ use super::store::Resolve; use super::*; -use crate::frame::{Reason, StreamId}; +use crate::frame::Reason; use crate::codec::UserError; use crate::codec::UserError::*; -use bytes::buf::{Buf, Take}; +use bytes::buf::Take; use std::{ cmp::{self, Ordering}, fmt, io, mem, @@ -87,7 +87,9 @@ impl Prioritize { flow.inc_window(config.remote_init_window_sz) .expect("invalid initial window size"); - flow.assign_capacity(config.remote_init_window_sz); + // TODO: proper error handling + let _res = flow.assign_capacity(config.remote_init_window_sz); + debug_assert!(_res.is_ok()); tracing::trace!("Prioritize::new; flow={:?}", flow); @@ -182,7 +184,15 @@ impl Prioritize { stream.requested_send_capacity = cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; - self.try_assign_capacity(stream); + // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity + // cannot be assigned at the time it is called. + // + // Streams over the max concurrent count will still call `send_data` so we should be + // careful not to put it into `pending_capacity` as it will starve the connection + // capacity for other streams + if !stream.is_pending_open { + self.try_assign_capacity(stream); + } } if frame.is_end_stream() { @@ -253,7 +263,9 @@ impl Prioritize { if available as usize > capacity { let diff = available - capacity as WindowSize; - stream.send_flow.claim_capacity(diff); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(diff); + debug_assert!(_res.is_ok()); self.assign_connection_capacity(diff, stream, counts); } @@ -324,7 +336,9 @@ impl Prioritize { pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { let available = stream.send_flow.available().as_size(); if available > 0 { - stream.send_flow.claim_capacity(available); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(available); + debug_assert!(_res.is_ok()); // Re-assign all capacity to the connection self.assign_connection_capacity(available, stream, counts); } @@ -337,7 +351,9 @@ impl Prioritize { if stream.requested_send_capacity as usize > stream.buffered_send_data { let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; - stream.send_flow.claim_capacity(reserved); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(reserved); + debug_assert!(_res.is_ok()); self.assign_connection_capacity(reserved, stream, counts); } } @@ -363,7 +379,9 @@ impl Prioritize { let span = tracing::trace_span!("assign_connection_capacity", inc); let _e = span.enter(); - self.flow.assign_capacity(inc); + // TODO: proper error handling + let _res = self.flow.assign_capacity(inc); + debug_assert!(_res.is_ok()); // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { @@ -443,7 +461,9 @@ impl Prioritize { stream.assign_capacity(assign, self.max_buffer_size); // Claim the capacity from the connection - self.flow.claim_capacity(assign); + // TODO: proper error handling + let _res = self.flow.claim_capacity(assign); + debug_assert!(_res.is_ok()); } tracing::trace!( @@ -508,7 +528,10 @@ impl Prioritize { tracing::trace!("poll_complete"); loop { - self.schedule_pending_open(store, counts); + if let Some(mut stream) = self.pop_pending_open(store, counts) { + self.pending_send.push_front(&mut stream); + self.try_assign_capacity(&mut stream); + } match self.pop_frame(buffer, store, max_frame_len, counts) { Some(frame) => { @@ -763,12 +786,16 @@ impl Prioritize { // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. - self.flow.assign_capacity(len); + // TODO: proper error handling + let _res = self.flow.assign_capacity(len); + debug_assert!(_res.is_ok()); }); let (eos, len) = tracing::trace_span!("updating connection flow") .in_scope(|| { - self.flow.send_data(len); + // TODO: proper error handling + let _res = self.flow.send_data(len); + debug_assert!(_res.is_ok()); // Wrap the frame's data payload to ensure that the // correct amount of data gets written. @@ -858,20 +885,24 @@ impl Prioritize { } } - fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { + fn pop_pending_open<'s>( + &mut self, + store: &'s mut Store, + counts: &mut Counts, + ) -> Option> { tracing::trace!("schedule_pending_open"); // check for any pending open streams - while counts.can_inc_num_send_streams() { + if counts.can_inc_num_send_streams() { if let Some(mut stream) = self.pending_open.pop(store) { tracing::trace!("schedule_pending_open; stream={:?}", stream.id); counts.inc_num_send_streams(&mut stream); - self.pending_send.push(&mut stream); stream.notify_send(); - } else { - return; + return Some(stream); } } + + None } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0fe2bdd57..71ec7901c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,14 +1,14 @@ use super::*; use crate::codec::UserError; -use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use crate::proto::{self, Error}; +use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::proto; use http::{HeaderMap, Request, Response}; use std::cmp::Ordering; use std::io; use std::task::{Context, Poll, Waker}; -use std::time::{Duration, Instant}; +use std::time::Instant; #[derive(Debug)] pub(super) struct Recv { @@ -90,7 +90,7 @@ impl Recv { // settings flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) .expect("invalid initial remote window size"); - flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); + flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap(); Recv { init_window_sz: config.local_init_window_sz, @@ -229,6 +229,11 @@ impl Recv { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); } + if pseudo.status.is_some() && counts.peer().is_server() { + proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + if !pseudo.is_informational() { let message = counts .peer() @@ -239,12 +244,14 @@ impl Recv { .pending_recv .push_back(&mut self.buffer, Event::Headers(message)); stream.notify_recv(); - } - // Only servers can receive a headers frame that initiates the stream. - // This is verified in `Streams` before calling this function. - if counts.peer().is_server() { - self.pending_accept.push(stream); + // Only servers can receive a headers frame that initiates the stream. + // This is verified in `Streams` before calling this function. + if counts.peer().is_server() { + // Correctness: never push a stream to `pending_accept` without having the + // corresponding headers frame pushed to `stream.pending_recv`. + self.pending_accept.push(stream); + } } Ok(()) @@ -252,13 +259,16 @@ impl Recv { /// Called by the server to get the request /// - /// TODO: Should this fn return `Result`? + /// # Panics + /// + /// Panics if `stream.pending_recv` has no `Event::Headers` queued. + /// pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { use super::peer::PollMessage::*; match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Headers(Server(request))) => request, - _ => panic!(), + _ => unreachable!("server stream queue must start with Headers"), } } @@ -308,7 +318,13 @@ impl Recv { Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), Some(_) => panic!("poll_response called after response returned"), None => { - stream.state.ensure_recv_open()?; + if !stream.state.ensure_recv_open()? { + proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); + return Poll::Ready(Err(Error::library_reset( + stream.id, + Reason::PROTOCOL_ERROR, + ))); + } stream.recv_task = Some(cx.waker().clone()); Poll::Pending @@ -353,7 +369,9 @@ impl Recv { self.in_flight_data -= capacity; // Assign capacity to connection - self.flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = self.flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); if self.flow.unclaimed_capacity().is_some() { if let Some(task) = task.take() { @@ -381,7 +399,9 @@ impl Recv { stream.in_flight_recv_data -= capacity; // Assign capacity to stream - stream.recv_flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = stream.recv_flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); if stream.recv_flow.unclaimed_capacity().is_some() { // Queue the stream for sending the WINDOW_UPDATE frame. @@ -427,7 +447,11 @@ impl Recv { /// /// The `task` is an optional parked task for the `Connection` that might /// be blocked on needing more window capacity. - pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option) { + pub fn set_target_connection_window( + &mut self, + target: WindowSize, + task: &mut Option, + ) -> Result<(), Reason> { tracing::trace!( "set_target_connection_window; target={}; available={}, reserved={}", target, @@ -440,11 +464,15 @@ impl Recv { // // Update the flow controller with the difference between the new // target and the current target. - let current = (self.flow.available() + self.in_flight_data).checked_size(); + let current = self + .flow + .available() + .add(self.in_flight_data)? + .checked_size(); if target > current { - self.flow.assign_capacity(target - current); + self.flow.assign_capacity(target - current)?; } else { - self.flow.claim_capacity(current - target); + self.flow.claim_capacity(current - target)?; } // If changing the target capacity means we gained a bunch of capacity, @@ -455,6 +483,7 @@ impl Recv { task.wake(); } } + Ok(()) } pub(crate) fn apply_local_settings( @@ -494,9 +523,13 @@ impl Recv { let dec = old_sz - target; tracing::trace!("decrementing all windows; dec={}", dec); - store.for_each(|mut stream| { - stream.recv_flow.dec_recv_window(dec); - }) + store.try_for_each(|mut stream| { + stream + .recv_flow + .dec_recv_window(dec) + .map_err(proto::Error::library_go_away)?; + Ok::<_, proto::Error>(()) + })?; } Ordering::Greater => { // We must increase the (local) window on every open stream. @@ -509,7 +542,10 @@ impl Recv { .recv_flow .inc_window(inc) .map_err(proto::Error::library_go_away)?; - stream.recv_flow.assign_capacity(inc); + stream + .recv_flow + .assign_capacity(inc) + .map_err(proto::Error::library_go_away)?; Ok::<_, proto::Error>(()) })?; } @@ -537,7 +573,7 @@ impl Recv { let sz = sz as WindowSize; - let is_ignoring_frame = stream.state.is_local_reset(); + let is_ignoring_frame = stream.state.is_local_error(); if !is_ignoring_frame && !stream.state.is_recv_streaming() { // TODO: There are cases where this can be a stream error of @@ -616,7 +652,10 @@ impl Recv { } // Update stream level flow control - stream.recv_flow.send_data(sz); + stream + .recv_flow + .send_data(sz) + .map_err(proto::Error::library_go_away)?; // Track the data as in-flight stream.in_flight_recv_data += sz; @@ -657,7 +696,7 @@ impl Recv { } // Update connection level flow control - self.flow.send_data(sz); + self.flow.send_data(sz).map_err(Error::library_go_away)?; // Track the data as in-flight self.in_flight_data += sz; @@ -763,7 +802,10 @@ impl Recv { "recv_reset; remotely-reset pending-accept streams reached limit ({:?})", counts.max_remote_reset_streams(), ); - return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM)); + return Err(Error::library_go_away_data( + Reason::ENHANCE_YOUR_CALM, + "too_many_resets", + )); } } @@ -850,21 +892,12 @@ impl Recv { /// Add a locally reset stream to queue to be eventually reaped. pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { - if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { + if !stream.state.is_local_error() || stream.is_pending_reset_expiration() { return; } tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); - if !counts.can_inc_num_reset_streams() { - // try to evict 1 stream if possible - // if max allow is 0, this won't be able to evict, - // and then we'll just bail after - if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { - counts.transition_after(evicted, true); - } - } - if counts.can_inc_num_reset_streams() { counts.inc_num_reset_streams(); self.pending_reset_expired.push(stream); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 20aba38d4..626e61a33 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::codec::UserError; use crate::frame::{self, Reason}; -use crate::proto::{Error, Initiator}; +use crate::proto::{self, Error, Initiator}; use bytes::Buf; use tokio::io::AsyncWrite; @@ -143,22 +143,27 @@ impl Send { // Update the state stream.state.send_open(end_stream)?; - if counts.peer().is_local_init(frame.stream_id()) { - // If we're waiting on a PushPromise anyway - // handle potentially queueing the stream at that point - if !stream.is_pending_push { - if counts.can_inc_num_send_streams() { - counts.inc_num_send_streams(stream); - } else { - self.prioritize.queue_open(stream); - } - } + let mut pending_open = false; + if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push { + self.prioritize.queue_open(stream); + pending_open = true; } // Queue the frame for sending + // + // This call expects that, since new streams are in the open queue, new + // streams won't be pushed on pending_send. self.prioritize .queue_frame(frame.into(), buffer, stream, task); + // Need to notify the connection when pushing onto pending_open since + // queue_frame only notifies for pending_send. + if pending_open { + if let Some(task) = task.take() { + task.wake(); + } + } + Ok(()) } @@ -458,10 +463,21 @@ impl Send { tracing::trace!("decrementing all windows; dec={}", dec); let mut total_reclaimed = 0; - store.for_each(|mut stream| { + store.try_for_each(|mut stream| { let stream = &mut *stream; - stream.send_flow.dec_send_window(dec); + tracing::trace!( + "decrementing stream window; id={:?}; decr={}; flow={:?}", + stream.id, + dec, + stream.send_flow + ); + + // TODO: this decrement can underflow based on received frames! + stream + .send_flow + .dec_send_window(dec) + .map_err(proto::Error::library_go_away)?; // It's possible that decreasing the window causes // `window_size` (the stream-specific window) to fall below @@ -474,7 +490,10 @@ impl Send { let reclaimed = if available > window_size { // Drop down to `window_size`. let reclaim = available - window_size; - stream.send_flow.claim_capacity(reclaim); + stream + .send_flow + .claim_capacity(reclaim) + .map_err(proto::Error::library_go_away)?; total_reclaimed += reclaim; reclaim } else { @@ -492,7 +511,9 @@ impl Send { // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - }); + + Ok::<_, proto::Error>(()) + })?; self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 76638fc87..5256f09cf 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -64,8 +64,9 @@ enum Inner { Closed(Cause), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] enum Peer { + #[default] AwaitingHeaders, Streaming, } @@ -352,7 +353,7 @@ impl State { matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) } - pub fn is_local_reset(&self) -> bool { + pub fn is_local_error(&self) -> bool { match self.inner { Closed(Cause::Error(ref e)) => e.is_local(), Closed(Cause::ScheduledLibraryReset(..)) => true, @@ -361,10 +362,10 @@ impl State { } pub fn is_remote_reset(&self) -> bool { - match self.inner { - Closed(Cause::Error(ref e)) => !e.is_local(), - _ => false, - } + matches!( + self.inner, + Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) + ) } /// Returns true if the stream is already reset. @@ -466,9 +467,3 @@ impl Default for State { State { inner: Inner::Idle } } } - -impl Default for Peer { - fn default() -> Self { - AwaitingHeaders - } -} diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index d33a01cce..35fd6f25e 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -127,6 +127,7 @@ impl Store { } } + #[allow(clippy::blocks_in_conditions)] pub(crate) fn for_each(&mut self, mut f: F) where F: FnMut(Ptr), @@ -256,7 +257,7 @@ where /// /// If the stream is already contained by the list, return `false`. pub fn push(&mut self, stream: &mut store::Ptr) -> bool { - tracing::trace!("Queue::push"); + tracing::trace!("Queue::push_back"); if N::is_queued(stream) { tracing::trace!(" -> already queued"); @@ -292,6 +293,46 @@ where true } + /// Queue the stream + /// + /// If the stream is already contained by the list, return `false`. + pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool { + tracing::trace!("Queue::push_front"); + + if N::is_queued(stream) { + tracing::trace!(" -> already queued"); + return false; + } + + N::set_queued(stream, true); + + // The next pointer shouldn't be set + debug_assert!(N::next(stream).is_none()); + + // Queue the stream + match self.indices { + Some(ref mut idxs) => { + tracing::trace!(" -> existing entries"); + + // Update the provided stream to point to the head node + let head_key = stream.resolve(idxs.head).key(); + N::set_next(stream, Some(head_key)); + + // Update the head pointer + idxs.head = stream.key(); + } + None => { + tracing::trace!(" -> first entry"); + self.indices = Some(store::Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + + true + } + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> where R: Resolve, diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 2888d744b..43e313647 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -146,7 +146,9 @@ impl Stream { recv_flow .inc_window(init_recv_window) .expect("invalid initial receive window"); - recv_flow.assign_capacity(init_recv_window); + // TODO: proper error handling? + let _res = recv_flow.assign_capacity(init_recv_window); + debug_assert!(_res.is_ok()); send_flow .inc_window(init_send_window) @@ -275,7 +277,9 @@ impl Stream { pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); debug_assert!(capacity > 0); - self.send_flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = self.send_flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); tracing::trace!( " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", @@ -294,7 +298,9 @@ impl Stream { pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); - self.send_flow.send_data(len); + // TODO: proper error handling + let _res = self.send_flow.send_data(len); + debug_assert!(_res.is_ok()); // Decrement the stream's buffered data counter debug_assert!(self.buffered_send_data >= len as usize); diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index dbaebfa7a..7c00cd517 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -118,7 +118,7 @@ where } } - pub fn set_target_connection_window_size(&mut self, size: WindowSize) { + pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -216,7 +216,7 @@ where mut request: Request<()>, end_of_stream: bool, pending: Option<&OpaqueStreamRef>, - ) -> Result, SendError> { + ) -> Result<(StreamRef, bool), SendError> { use super::stream::ContentLength; use http::Method; @@ -298,10 +298,14 @@ where // the lock, so it can't. me.refs += 1; - Ok(StreamRef { - opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), - send_buffer: self.send_buffer.clone(), - }) + let is_full = me.counts.next_send_stream_will_reach_capacity(); + Ok(( + StreamRef { + opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), + send_buffer: self.send_buffer.clone(), + }, + is_full, + )) } pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { @@ -448,7 +452,7 @@ impl Inner { let stream = self.store.resolve(key); - if stream.state.is_local_reset() { + if stream.state.is_local_error() { // Locally reset streams must ignore frames "for some time". // This is because the remote may have sent trailers before // receiving the RST_STREAM frame. @@ -726,7 +730,11 @@ impl Inner { } // The stream must be receive open - stream.state.ensure_recv_open()?; + if !stream.state.ensure_recv_open()? { + proto_err!(conn: "recv_push_promise: initiating stream is not opened"); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + stream.key() } None => { @@ -1534,10 +1542,24 @@ impl Actions { ) -> Result<(), Error> { if let Err(Error::Reset(stream_id, reason, initiator)) = res { debug_assert_eq!(stream_id, stream.id); - // Reset the stream. - self.send - .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); - Ok(()) + + if counts.can_inc_num_local_error_resets() { + counts.inc_num_local_error_resets(); + + // Reset the stream. + self.send + .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); + Ok(()) + } else { + tracing::warn!( + "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})", + counts.max_local_error_resets().unwrap(), + ); + Err(Error::library_go_away_data( + Reason::ENHANCE_YOUR_CALM, + "too_many_internal_resets", + )) + } } else { res } diff --git a/src/server.rs b/src/server.rs index 9ab64ad93..65d2d8301 100644 --- a/src/server.rs +++ b/src/server.rs @@ -252,6 +252,12 @@ pub struct Builder { /// Maximum amount of bytes to "buffer" for writing per stream. max_send_buffer_size: usize, + + /// Maximum number of locally reset streams due to protocol error across + /// the lifetime of the connection. + /// + /// When this gets exceeded, we issue GOAWAYs. + local_max_error_reset_streams: Option, } /// Send a response back to the client @@ -650,6 +656,8 @@ impl Builder { settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, + + local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } } @@ -887,6 +895,24 @@ impl Builder { self } + /// Sets the maximum number of local resets due to protocol errors made by the remote end. + /// + /// Invalid frames and many other protocol errors will lead to resets being generated for those streams. + /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers. + /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate. + /// + /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of + /// `ENHANCE_YOUR_CALM` to the client. + /// + /// If you really want to disable this, supply [`Option::None`] here. + /// Disabling this is not recommended and may expose you to DOS attacks. + /// + /// The default value is currently 1024, but could change. + pub fn max_local_error_reset_streams(&mut self, max: Option) -> &mut Self { + self.local_max_error_reset_streams = max; + self + } + /// Sets the maximum number of pending-accept remotely-reset streams. /// /// Streams that have been received by the peer, but not accepted by the @@ -937,7 +963,7 @@ impl Builder { /// stream have been written to the connection, the send buffer capacity /// will be freed up again. /// - /// The default is currently ~400MB, but may change. + /// The default is currently ~400KB, but may change. /// /// # Panics /// @@ -1361,6 +1387,9 @@ where reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, remote_reset_stream_max: self.builder.pending_accept_reset_stream_max, + local_error_reset_streams_max: self + .builder + .local_max_error_reset_streams, settings: self.builder.settings.clone(), }, ); @@ -1472,9 +1501,11 @@ impl proto::Peer for Peer { const NAME: &'static str = "Server"; + /* fn is_server() -> bool { true } + */ fn r#dyn() -> proto::DynPeer { proto::DynPeer::Server diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index f178178eb..522d904cb 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -2,6 +2,7 @@ name = "h2-support" version = "0.1.0" authors = ["Carl Lerche "] +publish = false edition = "2018" [dependencies] diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index bc4e2e708..858bf770b 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -2,7 +2,7 @@ use std::convert::TryInto; use std::fmt; use bytes::Bytes; -use http::{self, HeaderMap, StatusCode}; +use http::{HeaderMap, StatusCode}; use h2::{ ext::Protocol, @@ -305,8 +305,23 @@ impl Mock { self.reason(frame::Reason::NO_ERROR) } + pub fn data(self, debug_data: I) -> Self + where + I: Into, + { + Mock(frame::GoAway::with_debug_data( + self.0.last_stream_id(), + self.0.reason(), + debug_data.into(), + )) + } + pub fn reason(self, reason: frame::Reason) -> Self { - Mock(frame::GoAway::new(self.0.last_stream_id(), reason)) + Mock(frame::GoAway::with_debug_data( + self.0.last_stream_id(), + reason, + self.0.debug_data().clone(), + )) } } @@ -376,6 +391,11 @@ impl Mock { self.0.set_enable_connect_protocol(Some(val)); self } + + pub fn header_table_size(mut self, val: u32) -> Self { + self.0.set_header_table_size(Some(val)); + self + } } impl From> for frame::Settings { diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 18d084841..30824943c 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -2,7 +2,7 @@ use crate::SendFrame; use h2::frame::{self, Frame}; use h2::proto::Error; -use h2::{self, SendError}; +use h2::SendError; use futures::future::poll_fn; use futures::{ready, Stream, StreamExt}; diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index aa7fb2c54..02b6450d0 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -1,5 +1,3 @@ -use h2; - use bytes::{BufMut, Bytes}; use futures::ready; use std::future::Future; diff --git a/tests/h2-tests/Cargo.toml b/tests/h2-tests/Cargo.toml index 33436f3c4..6afdf9053 100644 --- a/tests/h2-tests/Cargo.toml +++ b/tests/h2-tests/Cargo.toml @@ -11,4 +11,4 @@ edition = "2018" h2-support = { path = "../h2-support" } tracing = "0.1.13" futures = { version = "0.3", default-features = false, features = ["alloc"] } -tokio = { version = "1", features = ["macros", "net", "rt", "io-util"] } +tokio = { version = "1", features = ["macros", "net", "rt", "io-util", "rt-multi-thread"] } diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index aff39f5c1..88c7df464 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -239,6 +239,8 @@ async fn request_over_max_concurrent_streams_errors() { // first request is allowed let (resp1, mut stream1) = client.send_request(request, false).unwrap(); + // as long as we let the connection internals tick + client = h2.drive(client.ready()).await.unwrap(); let request = Request::builder() .method(Method::POST) @@ -284,6 +286,90 @@ async fn request_over_max_concurrent_streams_errors() { join(srv, h2).await; } +#[tokio::test] +async fn recv_decrement_max_concurrent_streams_when_requests_queued() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + + srv.ping_pong([0; 8]).await; + + // limit this server later in life + srv.send_frame(frames::settings().max_concurrent_streams(1)) + .await; + srv.recv_frame(frames::settings_ack()).await; + srv.recv_frame( + frames::headers(3) + .request("POST", "https://example.com/") + .eos(), + ) + .await; + srv.ping_pong([1; 8]).await; + srv.send_frame(frames::headers(3).response(200).eos()).await; + + srv.recv_frame( + frames::headers(5) + .request("POST", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(5).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.expect("handshake"); + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + // first request is allowed + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // first request is allowed + let (resp1, _) = client.send_request(request, true).unwrap(); + + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // second request is put into pending_open + let (resp2, _) = client.send_request(request, true).unwrap(); + + h2.drive(async move { + resp1.await.expect("req"); + }) + .await; + join(async move { h2.await.unwrap() }, async move { + resp2.await.unwrap() + }) + .await; + }; + + join(srv, h2).await; +} + #[tokio::test] async fn send_request_poll_ready_when_connection_error() { h2_support::trace_init!(); @@ -336,6 +422,8 @@ async fn send_request_poll_ready_when_connection_error() { // first request is allowed let (resp1, _) = client.send_request(request, true).unwrap(); + // as long as we let the connection internals tick + client = h2.drive(client.ready()).await.unwrap(); let request = Request::builder() .method(Method::POST) @@ -1454,6 +1542,125 @@ async fn extended_connect_request() { join(srv, h2).await; } +#[tokio::test] +async fn rogue_server_odd_headers() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.send_frame(frames::headers(1)).await; + srv.recv_frame(frames::go_away(0).protocol_error()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + + let err = h2.await.unwrap_err(); + assert!(err.is_go_away()); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn rogue_server_even_headers() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.send_frame(frames::headers(2)).await; + srv.recv_frame(frames::go_away(0).protocol_error()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + + let err = h2.await.unwrap_err(); + assert!(err.is_go_away()); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn rogue_server_reused_headers() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://camembert.fromage") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.send_frame(frames::headers(1)).await; + srv.recv_frame(frames::reset(1).stream_closed()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + h2.drive(async { + let request = Request::builder() + .method(Method::GET) + .uri("https://camembert.fromage") + .body(()) + .unwrap(); + let _res = client.send_request(request, true).unwrap().0.await.unwrap(); + }) + .await; + + h2.await.unwrap(); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn client_builder_header_table_size() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + let mut settings = frame::Settings::default(); + + settings.set_header_table_size(Some(10000)); + + let srv = async move { + let recv_settings = srv.assert_client_handshake().await; + assert_frame_eq(recv_settings, settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let mut builder = client::Builder::new(); + builder.header_table_size(10000); + + let h2 = async move { + let (mut client, mut h2) = builder.handshake::<_, Bytes>(io).await.unwrap(); + let request = Request::get("https://example.com/").body(()).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + }; + + join(srv, h2).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 5caa2ec3a..dbb933286 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1858,3 +1858,139 @@ async fn poll_capacity_wakeup_after_window_update() { join(srv, h2).await; } + +#[tokio::test] +async fn window_size_decremented_past_zero() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + // let _ = client.assert_server_handshake().await; + + // preface + client.write_preface().await; + + // the following http 2 bytes are fuzzer-generated + client.send_bytes(&[0, 0, 0, 4, 0, 0, 0, 0, 0]).await; + client + .send_bytes(&[ + 0, 0, 23, 1, 1, 0, 249, 255, 191, 131, 1, 1, 1, 70, 1, 1, 1, 1, 65, 1, 1, 65, 1, 1, + 65, 1, 1, 1, 1, 1, 1, 190, + ]) + .await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client + .send_bytes(&[ + 0, 0, 9, 247, 0, 121, 255, 255, 184, 1, 65, 1, 1, 1, 1, 1, 1, 190, + ]) + .await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client + .send_bytes(&[0, 0, 3, 0, 1, 0, 249, 255, 191, 1, 1, 190]) + .await; + client + .send_bytes(&[0, 0, 2, 50, 107, 0, 0, 0, 1, 0, 0]) + .await; + client + .send_bytes(&[0, 0, 5, 2, 0, 0, 0, 0, 1, 128, 0, 55, 0, 0]) + .await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 126, 4, 39, 184, 171, 125, 33, 0, 3, 107, 50, 98, + ]) + .await; + client + .send_bytes(&[0, 0, 6, 4, 0, 0, 0, 0, 0, 3, 4, 76, 255, 71, 131]) + .await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 0, 4, 39, 184, 171, 74, 33, 0, 3, 107, 50, 98, + ]) + .await; + client + .send_bytes(&[ + 0, 0, 30, 4, 0, 0, 0, 0, 0, 0, 4, 56, 184, 171, 125, 65, 0, 35, 65, 65, 65, 61, + 232, 87, 115, 89, 116, 0, 4, 0, 58, 33, 125, 33, 79, 3, 107, 49, 98, + ]) + .await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 0, 4, 39, 184, 171, 125, 33, 0, 3, 107, 50, 98, + ]) + .await; + client.send_bytes(&[0, 0, 0, 4, 0, 0, 0, 0, 0]).await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 126, 4, 39, 184, 171, 125, 33, 0, 3, 107, 50, 98, + ]) + .await; + client + .send_bytes(&[ + 0, 0, 177, 1, 44, 0, 0, 0, 1, 67, 67, 67, 67, 67, 67, 131, 134, 5, 61, 67, 67, 67, + 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, + 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, + 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 115, 102, 1, 3, 48, 43, + 101, 64, 31, 37, 99, 99, 97, 97, 97, 97, 49, 97, 54, 97, 97, 97, 97, 49, 97, 54, + 97, 99, 54, 53, 53, 51, 53, 99, 99, 97, 97, 99, 97, 97, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, + ]) + .await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 0, 4, 0, 58, 171, 125, 33, 79, 3, 107, 49, 98, + ]) + .await; + client + .send_bytes(&[0, 0, 6, 4, 0, 0, 0, 0, 0, 0, 4, 87, 115, 89, 116]) + .await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 126, 4, 39, 184, 171, 125, 33, 0, 3, 107, 50, 98, + ]) + .await; + client + .send_bytes(&[ + 0, 0, 129, 1, 44, 0, 0, 0, 1, 67, 67, 67, 67, 67, 67, 131, 134, 5, 18, 67, 67, 61, + 67, 67, 67, 67, 67, 67, 67, 67, 67, 67, 48, 54, 53, 55, 114, 1, 4, 97, 49, 51, 116, + 64, 2, 117, 115, 4, 103, 101, 110, 116, 64, 8, 57, 111, 110, 116, 101, 110, 115, + 102, 7, 43, 43, 49, 48, 48, 43, 101, 192, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]) + .await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client.send_bytes(&[0, 0, 0, 0, 0, 0, 0, 0, 1]).await; + client + .send_bytes(&[ + 0, 0, 12, 4, 0, 0, 0, 0, 0, 0, 4, 0, 58, 171, 125, 33, 79, 3, 107, 49, 98, + ]) + .await; + + // TODO: is CANCEL the right error code to expect here? + // client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let builder = server::Builder::new(); + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + // just keep it open + let res = poll_fn(move |cx| srv.poll_closed(cx)).await; + tracing::debug!("{:?}", res); + }; + + join(client, srv).await; +} diff --git a/tests/h2-tests/tests/hammer.rs b/tests/h2-tests/tests/hammer.rs index a5cba3dfa..4b5d04341 100644 --- a/tests/h2-tests/tests/hammer.rs +++ b/tests/h2-tests/tests/hammer.rs @@ -8,7 +8,6 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - thread, }; use tokio::net::{TcpListener, TcpStream}; diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index 7c2681068..11d2c2ccf 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -1,5 +1,6 @@ -use futures::future::join; -use futures::{FutureExt, StreamExt}; +use futures::future::{join, select}; +use futures::{pin_mut, FutureExt, StreamExt}; + use h2_support::prelude::*; use h2_support::DEFAULT_WINDOW_SIZE; use std::task::Context; @@ -408,3 +409,95 @@ async fn send_data_receive_window_update() { join(mock, h2).await; } + +#[tokio::test] +async fn stream_count_over_max_stream_limit_does_not_starve_capacity() { + use tokio::sync::oneshot; + + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let (tx, rx) = oneshot::channel(); + + let srv = async move { + let _ = srv + .assert_client_handshake_with_settings( + frames::settings() + // super tiny server + .max_concurrent_streams(1), + ) + .await; + srv.recv_frame(frames::headers(1).request("POST", "http://example.com/")) + .await; + + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + + // All of these connection capacities should be assigned to stream 3 + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16383)).await; + + // StreamId(3) should be able to send all of its request with the conn capacity + srv.recv_frame(frames::headers(3).request("POST", "http://example.com/")) + .await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(3).response(200).eos()).await; + + // Then all the future stream is guaranteed to be send-able by induction + tx.send(()).unwrap(); + }; + + fn request() -> Request<()> { + Request::builder() + .method(Method::POST) + .uri("http://example.com/") + .body(()) + .unwrap() + } + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let (req1, mut send1) = client.send_request(request(), false).unwrap(); + let (req2, mut send2) = client.send_request(request(), false).unwrap(); + + // Use up the connection window. + send1.send_data(vec![0; 65535].into(), true).unwrap(); + // Queue up for more connection window. + send2.send_data(vec![0; 65535].into(), true).unwrap(); + + // Queue up more pending open streams + for _ in 0..5 { + let (_, mut send) = client.send_request(request(), false).unwrap(); + send.send_data(vec![0; 65535].into(), true).unwrap(); + } + + let response = conn.drive(req1).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = conn.drive(req2).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let _ = rx.await; + }; + + let task = join(srv, client); + pin_mut!(task); + + let t = tokio::time::sleep(Duration::from_secs(5)).map(|_| panic!("time out")); + pin_mut!(t); + + select(task, t).await; +} diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index c8c1c9d1c..49bd8fcfd 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, poll_fn}; +use futures::future::join; use futures::StreamExt; use h2_support::prelude::*; use tokio::io::AsyncWriteExt; @@ -296,10 +296,10 @@ async fn push_request_against_concurrency() { .await; client.recv_frame(frames::data(2, &b""[..]).eos()).await; client - .recv_frame(frames::headers(1).response(200).eos()) + .recv_frame(frames::headers(4).response(200).eos()) .await; client - .recv_frame(frames::headers(4).response(200).eos()) + .recv_frame(frames::headers(1).response(200).eos()) .await; }; @@ -504,11 +504,21 @@ async fn recv_invalid_authority() { let settings = client.assert_server_handshake().await; assert_default_settings!(settings); client.send_frame(bad_headers).await; - client.recv_frame(frames::reset(1).protocol_error()).await; + // grpc-uds patch: must now still be accepted + client + .recv_frame(frames::headers(1).response(200).eos()) + .await; }; let srv = async move { let mut srv = server::handshake(io).await.expect("handshake"); + + // grpc-uds patch: must now still be accepted + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + assert_eq!(req.method(), &http::Method::GET); + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, true).unwrap(); + assert!(srv.next().await.is_none()); }; @@ -553,7 +563,7 @@ async fn recv_connection_header() { } #[tokio::test] -async fn sends_reset_cancel_when_req_body_is_dropped() { +async fn sends_reset_no_error_when_req_body_is_dropped() { h2_support::trace_init!(); let (io, mut client) = mock::new(); @@ -563,8 +573,11 @@ async fn sends_reset_cancel_when_req_body_is_dropped() { client .send_frame(frames::headers(1).request("POST", "https://example.com/")) .await; + // server responded with data before consuming POST-request's body, resulting in `RST_STREAM(NO_ERROR)`. + client.recv_frame(frames::headers(1).response(200)).await; + client.recv_frame(frames::data(1, vec![0; 16384])).await; client - .recv_frame(frames::headers(1).response(200).eos()) + .recv_frame(frames::data(1, vec![0; 16384]).eos()) .await; client .recv_frame(frames::reset(1).reason(Reason::NO_ERROR)) @@ -578,7 +591,8 @@ async fn sends_reset_cancel_when_req_body_is_dropped() { assert_eq!(req.method(), &http::Method::POST); let rsp = http::Response::builder().status(200).body(()).unwrap(); - stream.send_response(rsp, true).unwrap(); + let mut tx = stream.send_response(rsp, false).unwrap(); + tx.send_data(vec![0; 16384 * 2].into(), true).unwrap(); } assert!(srv.next().await.is_none()); }; @@ -879,6 +893,55 @@ async fn too_big_headers_sends_reset_after_431_if_not_eos() { join(client, srv).await; } +#[tokio::test] +async fn too_many_continuation_frames_sends_goaway() { + h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_frame_eq(settings, frames::settings().max_header_list_size(1024 * 32)); + + // the mock impl automatically splits into CONTINUATION frames if the + // headers are too big for one frame. So without a max header list size + // set, we'll send a bunch of headers that will eventually get nuked. + client + .send_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .field("a".repeat(10_000), "b".repeat(10_000)) + .field("c".repeat(10_000), "d".repeat(10_000)) + .field("e".repeat(10_000), "f".repeat(10_000)) + .field("g".repeat(10_000), "h".repeat(10_000)) + .field("i".repeat(10_000), "j".repeat(10_000)) + .field("k".repeat(10_000), "l".repeat(10_000)) + .field("m".repeat(10_000), "n".repeat(10_000)) + .field("o".repeat(10_000), "p".repeat(10_000)) + .field("y".repeat(10_000), "z".repeat(10_000)), + ) + .await; + client + .recv_frame(frames::go_away(0).calm().data("too_many_continuations")) + .await; + }; + + let srv = async move { + let mut srv = server::Builder::new() + // should mean ~3 continuation + .max_header_list_size(1024 * 32) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let err = srv.next().await.unwrap().expect_err("server"); + assert!(err.is_go_away()); + assert!(err.is_library()); + assert_eq!(err.reason(), Some(Reason::ENHANCE_YOUR_CALM)); + }; + + join(client, srv).await; +} + #[tokio::test] async fn pending_accept_recv_illegal_content_length_data() { h2_support::trace_init!(); @@ -1378,3 +1441,36 @@ async fn reject_non_authority_target_on_connect_request() { join(client, srv).await; } + +#[tokio::test] +async fn reject_informational_status_header_in_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let _ = client.assert_server_handshake().await; + + let status_code = 128; + assert!(StatusCode::from_u16(status_code) + .unwrap() + .is_informational()); + + client + .send_frame(frames::headers(1).response(status_code)) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let builder = server::Builder::new(); + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 138328efa..1159815e3 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, join3, lazy, poll_fn, try_join}; +use futures::future::{join, join3, lazy, try_join}; use futures::{FutureExt, StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once; @@ -211,9 +211,14 @@ async fn reset_streams_dont_grow_memory_continuously() { .await; client.send_frame(frames::reset(n).protocol_error()).await; } + tokio::time::timeout( std::time::Duration::from_secs(1), - client.recv_frame(frames::go_away((MAX * 2 + 1) as u32).calm()), + client.recv_frame( + frames::go_away((MAX * 2 + 1) as u32) + .data("too_many_resets") + .calm(), + ), ) .await .expect("client goaway"); @@ -235,6 +240,53 @@ async fn reset_streams_dont_grow_memory_continuously() { join(srv, client).await; } +#[tokio::test] +async fn go_away_with_pending_accepting() { + // h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + let (sent_go_away_tx, sent_go_away_rx) = oneshot::channel(); + let (recv_go_away_tx, recv_go_away_rx) = oneshot::channel(); + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + + client + .send_frame(frames::headers(1).request("GET", "https://baguette/").eos()) + .await; + + client + .send_frame(frames::headers(3).request("GET", "https://campagne/").eos()) + .await; + client.send_frame(frames::go_away(1).protocol_error()).await; + + sent_go_away_tx.send(()).unwrap(); + + recv_go_away_rx.await.unwrap(); + }; + + let srv = async move { + let mut srv = server::Builder::new() + .max_pending_accept_reset_streams(1) + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let (_req_1, _send_response_1) = srv.accept().await.unwrap().unwrap(); + + poll_fn(|cx| srv.poll_closed(cx)) + .drive(sent_go_away_rx) + .await + .unwrap(); + + let (_req_2, _send_response_2) = srv.accept().await.unwrap().unwrap(); + + recv_go_away_tx.send(()).unwrap(); + }; + join(srv, client).await; +} + #[tokio::test] async fn pending_accept_reset_streams_decrement_too() { h2_support::trace_init!(); @@ -456,6 +508,7 @@ async fn recv_goaway_with_higher_last_processed_id() { } #[tokio::test] +#[ignore = "authority validation disabled by grpc-uds"] async fn recv_next_stream_id_updated_by_malformed_headers() { h2_support::trace_init!(); let (io, mut client) = mock::new(); @@ -698,14 +751,14 @@ async fn rst_stream_max() { srv.recv_frame(frames::reset(1).cancel()).await; srv.recv_frame(frames::reset(3).cancel()).await; // sending frame after canceled! - // newer streams trump older streams - // 3 is still being ignored - srv.send_frame(frames::data(3, vec![0; 16]).eos()).await; + // olders streams trump newer streams + // 1 is still being ignored + srv.send_frame(frames::data(1, vec![0; 16]).eos()).await; // ping pong to be sure of no goaway srv.ping_pong([1; 8]).await; - // 1 has been evicted, will get a reset - srv.send_frame(frames::data(1, vec![0; 16]).eos()).await; - srv.recv_frame(frames::reset(1).stream_closed()).await; + // 3 has been evicted, will get a reset + srv.send_frame(frames::data(3, vec![0; 16]).eos()).await; + srv.recv_frame(frames::reset(3).stream_closed()).await; }; let client = async move {