Skip to content

Start hooking up data receiving #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use proto::{self, Connection};
use error::Reason::*;

use http::{self, Request, Response};
use futures::{Future, Poll, Sink, AsyncSink};
use futures::{self, Future, Poll, Sink, AsyncSink};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};

Expand All @@ -23,12 +23,21 @@ pub struct Client<T, B: IntoBuf> {
connection: Connection<T, Peer, B>,
}

/// Client half of an active HTTP/2.0 stream.
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}

#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}

#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<Peer, B::Buf>,
}

impl<T> Client<T, Bytes>
where T: AsyncRead + AsyncWrite + 'static,
{
Expand Down Expand Up @@ -140,15 +149,18 @@ impl<T, B> fmt::Debug for Handshake<T, B>

impl<B: IntoBuf> Stream<B> {
/// Receive the HTTP/2.0 response, if it is ready.
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
self.inner.poll_response()
pub fn poll_response(&mut self) -> Poll<Response<Body<B>>, ConnectionError> {
let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
let body = Body { inner: self.inner.clone() };

Ok(Response::from_parts(parts, body).into())
}

/// Send data
pub fn send_data(&mut self, data: B, end_of_stream: bool)
-> Result<(), ConnectionError>
{
unimplemented!();
self.inner.send_data(data.into_buf(), end_of_stream)
}

/// Send trailers
Expand All @@ -160,14 +172,36 @@ impl<B: IntoBuf> Stream<B> {
}

impl<B: IntoBuf> Future for Stream<B> {
type Item = Response<()>;
type Item = Response<Body<B>>;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_response()
}
}

// ===== impl Body =====

impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data())
.map(|inner| Chunk { inner });

Ok(chunk.into())
}
}

// ===== impl Chunk =====

impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
}
}

// ===== impl Peer =====

impl proto::Peer for Peer {
Expand Down
9 changes: 9 additions & 0 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ pub enum Frame<T = Bytes> {
}

impl<T> Frame<T> {
/// Returns true if the frame is a DATA frame.
pub fn is_data(&self) -> bool {
use self::Frame::*;

match *self {
Data(..) => true,
_ => false,
}
}
}

impl<T> fmt::Debug for Frame<T> {
Expand Down
13 changes: 1 addition & 12 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,8 @@ impl<T, P, B> Connection<T, P, B>
*/
}
Some(Data(frame)) => {
unimplemented!();
/*
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data(&frame));

let frame = Frame::Data {
id: frame.stream_id(),
end_of_stream: frame.is_end_stream(),
data: frame.into_payload(),
};

return Ok(Some(frame).into());
*/
try!(self.streams.recv_data(frame));
}
Some(Reset(frame)) => {
unimplemented!();
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod settings;
mod streams;

pub use self::connection::Connection;
pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};

use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;
Expand Down
49 changes: 49 additions & 0 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,53 @@ impl<B> Deque<B> {
None => None,
}
}

pub fn take_while<F>(&mut self, buf: &mut Buffer<B>, mut f: F) -> Self
where F: FnMut(&Frame<B>) -> bool
{
match self.indices {
Some(mut idxs) => {
if !f(&buf.slab[idxs.head].frame) {
return Deque::new();
}

let head = idxs.head;
let mut tail = idxs.head;

loop {
let next = match buf.slab[tail].next {
Some(next) => next,
None => {
self.indices = None;
return Deque {
indices: Some(idxs),
_p: PhantomData,
};
}
};

if !f(&buf.slab[next].frame) {
// Split the linked list
buf.slab[tail].next = None;

self.indices = Some(Indices {
head: next,
tail: idxs.tail,
});

return Deque {
indices: Some(Indices {
head: head,
tail: tail,
}),
_p: PhantomData,
}
}

tail = next;
}
}
None => Deque::new(),
}
}
}
2 changes: 1 addition & 1 deletion src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod store;
mod stream;
mod streams;

pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};

use self::buffer::Buffer;
use self::flow_control::FlowControl;
Expand Down
45 changes: 43 additions & 2 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ pub(super) struct Recv<P, B> {
_p: PhantomData<(P, B)>,
}

#[derive(Debug)]
pub(super) struct Chunk {
/// Data frames pending receival
pub pending_recv: buffer::Deque<Bytes>,
}

impl<P, B> Recv<P, B>
where P: Peer,
B: Buf,
Expand Down Expand Up @@ -67,7 +73,7 @@ impl<P, B> Recv<P, B>
// Increment the number of remote initiated streams
self.num_streams += 1;

Ok(Some(Stream::new()))
Ok(Some(Stream::new(id)))
}

/// Transition the stream state based on receiving headers
Expand Down Expand Up @@ -98,7 +104,7 @@ impl<P, B> Recv<P, B>
}

pub fn recv_data(&mut self,
frame: &frame::Data,
frame: frame::Data,
stream: &mut Stream<B>)
-> Result<(), ConnectionError>
{
Expand Down Expand Up @@ -130,6 +136,10 @@ impl<P, B> Recv<P, B>
try!(stream.state.recv_close());
}

// Push the frame onto the recv buffer
stream.pending_recv.push_back(&mut self.buffer, frame.into());
stream.notify_recv();

Ok(())
}

Expand Down Expand Up @@ -218,6 +228,37 @@ impl<P, B> Recv<P, B>
Ok(().into())
}


pub fn poll_chunk(&mut self, stream: &mut Stream<B>)
-> Poll<Option<Chunk>, ConnectionError>
{
let frames = stream.pending_recv
.take_while(&mut self.buffer, |frame| frame.is_data());

if frames.is_empty() {
if stream.state.is_recv_closed() {
Ok(None.into())
} else {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
} else {
Ok(Some(Chunk {
pending_recv: frames,
}).into())
}
}

pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option<Bytes> {
match chunk.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Data(frame)) => {
Some(frame.into_payload())
}
None => None,
_ => panic!("unexpected frame type"),
}
}

/// Send stream level window update
pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store<B>,
Expand Down
10 changes: 6 additions & 4 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<P, B> Send<P, B>
/// Update state reflecting a new, locally opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open(&mut self) -> Result<(StreamId, Stream<B>), ConnectionError> {
pub fn open(&mut self) -> Result<Stream<B>, ConnectionError> {
try!(self.ensure_can_open());

if let Some(max) = self.max_streams {
Expand All @@ -76,7 +76,7 @@ impl<P, B> Send<P, B>
}
}

let ret = (self.next_stream_id, Stream::new());
let ret = Stream::new(self.next_stream_id);

// Increment the number of locally initiated streams
self.num_streams += 1;
Expand Down Expand Up @@ -106,8 +106,8 @@ impl<P, B> Send<P, B>
}

pub fn send_data(&mut self,
frame: &frame::Data<B>,
stream: &mut Stream<B>)
frame: frame::Data<B>,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
let sz = frame.payload().remaining();
Expand Down Expand Up @@ -148,6 +148,8 @@ impl<P, B> Send<P, B>
try!(stream.state.send_close());
}

self.prioritize.queue_frame(frame.into(), stream);

Ok(())
}

Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ impl State {
}
}

pub fn is_recv_closed(&self) -> bool {
match self.inner {
Closed(..) | HalfClosedRemote(..) => true,
_ => false,
}
}

pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
match self.inner {
Open { ref mut remote, .. } |
Expand Down
6 changes: 5 additions & 1 deletion src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use super::*;

#[derive(Debug)]
pub(super) struct Stream<B> {
/// The h2 stream identifier
pub id: StreamId,

/// Current state of the stream
pub state: State,

Expand All @@ -22,8 +25,9 @@ pub(super) struct Stream<B> {
}

impl<B> Stream<B> {
pub fn new() -> Stream<B> {
pub fn new(id: StreamId) -> Stream<B> {
Stream {
id,
state: State::default(),
pending_recv: buffer::Deque::new(),
recv_task: None,
Expand Down
Loading