Skip to content

Commit df27e02

Browse files
committed
Wire up Trailers frames
1 parent 27afa19 commit df27e02

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

src/frame/headers.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ impl Headers {
198198
self.flags.set_end_stream()
199199
}
200200

201+
pub fn into_fields(self) -> HeaderMap<HeaderValue> {
202+
self.fields
203+
}
204+
201205
pub fn into_response(self) -> response::Head {
202206
let mut response = response::Head::default();
203207

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use bytes::Bytes;
4242

4343
pub type FrameSize = u32;
4444

45+
pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;
46+
4547
/// An H2 connection frame
4648
#[derive(Debug)]
4749
pub enum Frame<T, B = Bytes> {
@@ -57,7 +59,7 @@ pub enum Frame<T, B = Bytes> {
5759
},
5860
Trailers {
5961
id: StreamId,
60-
headers: (),
62+
headers: HeaderMap,
6163
},
6264
PushPromise {
6365
id: StreamId,

src/proto/connection.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use {ConnectionError, Frame, Peer};
1+
use {ConnectionError, HeaderMap, Frame, Peer};
22
use frame::{self, StreamId};
33
use client::Client;
44
use server::Server;
@@ -110,6 +110,17 @@ impl<T, P, B> Connection<T, P, B>
110110
})
111111
}
112112

113+
pub fn send_trailers(self,
114+
id: StreamId,
115+
headers: HeaderMap)
116+
-> sink::Send<Self>
117+
{
118+
self.send(Frame::Trailers {
119+
id,
120+
headers,
121+
})
122+
}
123+
113124
// ===== Private =====
114125

115126
/// Returns `Ready` when the `Connection` is ready to receive a frame from
@@ -165,7 +176,7 @@ impl<T, P, B> Connection<T, P, B>
165176
Some(Headers(frame)) => {
166177
trace!("recv HEADERS; frame={:?}", frame);
167178
// Update stream state while ensuring that the headers frame
168-
// can be received
179+
// can be received.
169180
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
170181
let frame = Self::convert_poll_message(frame);
171182
return Ok(Some(frame).into());
@@ -222,8 +233,10 @@ impl<T, P, B> Connection<T, P, B>
222233

223234
fn convert_poll_message(frame: frame::Headers) -> Frame<P::Poll> {
224235
if frame.is_trailers() {
225-
// TODO: return trailers
226-
unimplemented!();
236+
Frame::Trailers {
237+
id: frame.stream_id(),
238+
headers: frame.into_fields()
239+
}
227240
} else {
228241
Frame::Headers {
229242
id: frame.stream_id(),
@@ -338,10 +351,16 @@ impl<T, P, B> Sink for Connection<T, P, B>
338351

339352
Frame::Reset { id, error } => frame::Reset::new(id, error).into(),
340353

341-
/*
342354
Frame::Trailers { id, headers } => {
343-
unimplemented!();
355+
let mut f = frame::Headers::new(id, frame::Pseudo::default(), headers);
356+
f.set_end_stream();
357+
358+
self.streams.send_headers(&f)?;
359+
360+
frame::Frame::Headers(f)
344361
}
362+
363+
/*
345364
Frame::PushPromise { id, promise } => {
346365
unimplemented!();
347366
}

src/proto/streams.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl Streams {
133133
};
134134

135135
if frame.is_trailers() {
136-
try!(self.inner.recv_trailers(id, state, frame.is_end_stream()));
136+
try!(self.inner.recv_eos(id, state));
137137
} else {
138138
try!(self.inner.recv_headers(id, state, frame.is_end_stream()));
139139
}
@@ -222,7 +222,7 @@ impl Streams {
222222
};
223223

224224
if frame.is_trailers() {
225-
try!(self.inner.send_trailers(id, state, frame.is_end_stream()));
225+
try!(self.inner.send_eos(id, state));
226226
} else {
227227
try!(self.inner.send_headers(id, state, frame.is_end_stream()));
228228
}
@@ -326,10 +326,16 @@ impl Inner {
326326
Ok(())
327327
}
328328

329-
fn recv_trailers(&mut self, _id: StreamId, _state: &mut state::Stream, _eos: bool)
329+
fn recv_eos(&mut self, id: StreamId, state: &mut state::Stream)
330330
-> Result<(), ConnectionError>
331331
{
332-
unimplemented!();
332+
try!(state.recv_close());
333+
334+
if state.is_closed() {
335+
self.stream_closed(id)
336+
}
337+
338+
Ok(())
333339
}
334340

335341
fn recv_data(&mut self,
@@ -356,11 +362,7 @@ impl Inner {
356362
}
357363

358364
if eos {
359-
try!(state.recv_close());
360-
361-
if state.is_closed() {
362-
self.stream_closed(id)
363-
}
365+
self.recv_eos(id, state)?
364366
}
365367

366368
Ok(())
@@ -396,10 +398,16 @@ impl Inner {
396398
Ok(())
397399
}
398400

399-
fn send_trailers(&mut self, _id: StreamId, _state: &mut state::Stream, _eos: bool)
401+
fn send_eos(&mut self, id: StreamId, state: &mut state::Stream)
400402
-> Result<(), ConnectionError>
401403
{
402-
unimplemented!();
404+
try!(state.send_close());
405+
406+
if state.is_closed() {
407+
self.stream_closed(id);
408+
}
409+
410+
Ok(())
403411
}
404412

405413
fn send_data(&mut self,
@@ -435,11 +443,7 @@ impl Inner {
435443
}
436444

437445
if eos {
438-
try!(state.send_close());
439-
440-
if state.is_closed() {
441-
self.stream_closed(id)
442-
}
446+
self.send_eos(id, state)?;
443447
}
444448

445449
Ok(())

0 commit comments

Comments
 (0)