Skip to content

Simplify control streams #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,8 @@ impl Peer for Client {
type Send = http::request::Head;
type Poll = http::response::Head;

fn is_valid_local_stream_id(id: StreamId) -> bool {
id.is_client_initiated()
}

fn is_valid_remote_stream_id(id: StreamId) -> bool {
id.is_server_initiated()
}

fn local_can_open() -> bool {
true
fn is_server() -> bool {
false
}

fn convert_send_message(
Expand Down
18 changes: 1 addition & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,7 @@ pub trait Peer {
/// Message type polled from the transport
type Poll;

/// Returns `true` if `id` is a valid StreamId for a stream initiated by the
/// local node.
fn is_valid_local_stream_id(id: StreamId) -> bool;

/// Returns `true` if `id` is a valid StreamId for a stream initiated by the
/// remote node.
fn is_valid_remote_stream_id(id: StreamId) -> bool;

fn local_can_open() -> bool;
fn remote_can_open() -> bool {
!Self::local_can_open()
}

//fn can_reserve_local_stream() -> bool;
// fn can_reserve_remote_stream() -> bool {
// !self.can_reserve_local_stream
// }
fn is_server() -> bool;

#[doc(hidden)]
fn convert_send_message(
Expand Down
4 changes: 2 additions & 2 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::marker::PhantomData;
/// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P, B: IntoBuf = Bytes> {
inner: Transport<T, P, B::Buf>,
inner: Transport<T, B::Buf>,
// Set to `true` as long as the connection is in a valid state.
active: bool,
_phantom: PhantomData<(P, B)>,
}

pub fn new<T, P, B>(transport: Transport<T, P, B::Buf>)
pub fn new<T, P, B>(transport: Transport<T, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
Expand Down
194 changes: 6 additions & 188 deletions src/proto/control_streams.rs
Original file line number Diff line number Diff line change
@@ -1,204 +1,22 @@
use ConnectionError;
use proto::*;

/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
/// Determines whether the given stream could theoretically be opened by the local
/// side of this connection.
fn local_valid_id(id: StreamId) -> bool;
fn streams(&self) -> &Streams;

/// Determines whether the given stream could theoretically be opened by the remote
/// side of this connection.
fn remote_valid_id(id: StreamId) -> bool;

/// Indicates whether this local endpoint may open streams (with HEADERS).
///
/// Implies that this endpoint is a client.
fn local_can_open() -> bool;

/// Indicates whether this remote endpoint may open streams (with HEADERS).
///
/// Implies that this endpoint is a server.
fn remote_can_open() -> bool {
!Self::local_can_open()
}

// TODO push promise
// fn local_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;
// fn remote_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;

/// Creates a new stream in the OPEN state from the local side (i.e. as a Client).
///
/// Must only be called when local_can_open returns true.
fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>;

/// Create a new stream in the OPEN state from the remote side (i.e. as a Server).
///
/// Must only be called when remote_can_open returns true.
fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>;

/// Prepare the receive side of a local stream to receive data from the remote.
///
/// Typically called when a client receives a response header.
fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>;

/// Prepare the send side of a remote stream to receive data from the local endpoint.
///
/// Typically called when a server sends a response header.
fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>;

// TODO push promise
// fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;
// fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;

/// Closes the send half of a stream.
///
/// Fails with a ProtocolError if send half of the stream was not open.
fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>;

/// Closes the recv half of a stream.
///
/// Fails with a ProtocolError if recv half of the stream was not open.
fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>;

/// Resets the given stream.
///
/// If the stream was already reset, the stored cause is updated.
fn reset_stream(&mut self, id: StreamId, cause: Reason);

/// Get the reason the stream was reset, if it was reset.
fn get_reset(&self, id: StreamId) -> Option<Reason>;

/// Returns true if the given stream was opened by the local peer and is not yet
/// closed.
fn is_local_active(&self, id: StreamId) -> bool;

/// Returns true if the given stream was opened by the remote peer and is not yet
/// closed.
fn is_remote_active(&self, id: StreamId) -> bool;

/// Returns true if the given stream was opened and is not yet closed.
fn is_active(&self, id: StreamId) -> bool {
if Self::local_valid_id(id) {
self.is_local_active(id)
} else {
self.is_remote_active(id)
}
}

/// Returns the number of open streams initiated by the local peer.
fn local_active_len(&self) -> usize;

/// Returns the number of open streams initiated by the remote peer.
fn remote_active_len(&self) -> usize;

/// Returns true iff the recv half of the given stream is open.
fn is_recv_open(&mut self, id: StreamId) -> bool;

/// Returns true iff the send half of the given stream is open.
fn is_send_open(&mut self, id: StreamId) -> bool;

/// If the given stream ID is active and able to recv data, get its mutable recv flow
/// control state.
fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>;

/// If the given stream ID is active and able to send data, get its mutable send flow
/// control state.
fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>;

/// Updates the initial window size for the local peer.
fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize);

/// Updates the initial window size for the remote peer.
fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize);
fn streams_mut(&mut self) -> &mut Streams;
}

macro_rules! proxy_control_streams {
($outer:ident) => (
impl<T: ControlStreams> ControlStreams for $outer<T> {
fn local_valid_id(id: StreamId) -> bool {
T::local_valid_id(id)
}

fn remote_valid_id(id: StreamId) -> bool {
T::remote_valid_id(id)
}

fn local_can_open() -> bool {
T::local_can_open()
}

fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.local_open(id, sz)
}

fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.remote_open(id, sz)
}

fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.local_open_recv_half(id, sz)
}

fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.remote_open_send_half(id, sz)
}

fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_send_half(id)
}

fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_recv_half(id)
}

fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}

fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}

fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}

fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}

fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}

fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}

fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
self.inner.update_inital_recv_window_size(old_sz, new_sz)
}

fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
self.inner.update_inital_send_window_size(old_sz, new_sz)
}

fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.recv_flow_controller(id)
}

fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.send_flow_controller(id)
}

fn is_send_open(&mut self, id: StreamId) -> bool {
self.inner.is_send_open(id)
fn streams(&self) -> &Streams {
self.inner.streams()
}

fn is_recv_open(&mut self, id: StreamId) -> bool {
self.inner.is_recv_open(id)
fn streams_mut(&mut self) -> &mut Streams {
self.inner.streams_mut()
}
}
)
Expand Down
13 changes: 7 additions & 6 deletions src/proto/flow_control_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<T, U> FlowControlRecv<T>
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> {
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
let added = match self.recv_flow_controller(id) {
let added = match self.streams_mut().recv_flow_controller(id) {
None => false,
Some(mut fc) => {
fc.expand_window(incr);
Expand All @@ -57,7 +57,7 @@ impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> {
self.pending_streams.push_back(id);
}
Ok(())
} else if let Some(rst) = self.inner.get_reset(id) {
} else if let Some(rst) = self.streams().get_reset(id) {
Err(error::User::StreamReset(rst).into())
} else {
Err(error::User::InvalidStreamId.into())
Expand All @@ -80,8 +80,8 @@ impl<T, U> FlowControlRecv<T>
}

while let Some(id) = self.pending_streams.pop_front() {
if self.inner.get_reset(id).is_none() {
let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update());
if self.streams().get_reset(id).is_none() {
let update = self.streams_mut().recv_flow_controller(id).and_then(|s| s.apply_window_update());
if let Some(incr) = update {
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
}
Expand Down Expand Up @@ -124,8 +124,9 @@ impl<T> Stream for FlowControlRecv<T>
return Err(error::Reason::FlowControlError.into());
}

let fc = self.inner.recv_flow_controller(id)
let fc = self.inner.streams_mut().recv_flow_controller(id)
.expect("receiving data with no flow controller");

if fc.claim_window(sz).is_err() {
// TODO this should cause a GO_AWAY
return Err(error::Reason::FlowControlError.into());
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<T> ApplySettings for FlowControlRecv<T>
return Ok(());
}

self.inner.update_inital_recv_window_size(old_window_size, new_window_size);
self.streams_mut().update_inital_recv_window_size(old_window_size, new_window_size);
self.initial_window_size = new_window_size;
}
Ok(())
Expand Down
11 changes: 6 additions & 5 deletions src/proto/flow_control_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<T: ControlStreams> ControlFlowSend for FlowControlSend<T> {

// TODO this should probably account for stream priority?
while let Some(id) = self.pending_streams.pop_front() {
if let Some(mut flow) = self.send_flow_controller(id) {
if let Some(mut flow) = self.streams_mut().send_flow_controller(id) {
if let Some(incr) = flow.apply_window_update() {
return Ok(Async::Ready(WindowUpdate::new(id, incr)));
}
Expand Down Expand Up @@ -84,7 +84,7 @@ impl<T> Stream for FlowControlSend<T>
} else {
// The remote may send window updates for streams that the local
// now considers closed. It's okay.
if let Some(fc) = self.inner.send_flow_controller(id) {
if let Some(fc) = self.streams_mut().send_flow_controller(id) {
fc.expand_window(sz);
}
}
Expand All @@ -110,7 +110,7 @@ impl<T, U> Sink for FlowControlSend<T>
type SinkError = T::SinkError;

fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
debug_assert!(self.inner.get_reset(frame.stream_id()).is_none());
debug_assert!(self.streams().get_reset(frame.stream_id()).is_none());

// Ensures that the underlying transport is will accept the frame. It's important
// that this be checked before claiming capacity from the flow controllers.
Expand All @@ -130,8 +130,9 @@ impl<T, U> Sink for FlowControlSend<T>
}

// Ensure there's enough capacity on stream.
let mut fc = self.inner.send_flow_controller(v.stream_id())
let mut fc = self.inner.streams_mut().send_flow_controller(v.stream_id())
.expect("no remote stream for data frame");

if fc.claim_window(sz).is_err() {
return Err(error::User::FlowControlViolation.into())
}
Expand Down Expand Up @@ -195,7 +196,7 @@ impl<T> ApplySettings for FlowControlSend<T>
return Ok(());
}

self.inner.update_inital_send_window_size(old_window_size, new_window_size);
self.streams_mut().update_inital_send_window_size(old_window_size, new_window_size);
self.initial_window_size = new_window_size;
}

Expand Down
Loading