Skip to content

Commit a3cbf2d

Browse files
committed
Wire up Trailers frame
1 parent 33bdc05 commit a3cbf2d

File tree

6 files changed

+50
-18
lines changed

6 files changed

+50
-18
lines changed

src/frame/headers.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use super::StreamId;
22
use hpack;
33
use frame::{self, Frame, Head, Kind, Error};
4+
use HeaderMap;
45

56
use http::{request, response, version, uri, Method, StatusCode, Uri};
6-
use http::header::{self, HeaderMap, HeaderName, HeaderValue};
7+
use http::header::{self, HeaderName, HeaderValue};
78

89
use bytes::{BytesMut, Bytes};
910
use byteorder::{BigEndian, ByteOrder};
@@ -23,7 +24,7 @@ pub struct Headers {
2324
stream_dep: Option<StreamDependency>,
2425

2526
/// The decoded header fields
26-
fields: HeaderMap<HeaderValue>,
27+
fields: HeaderMap,
2728

2829
/// Pseudo headers, these are broken out as they must be sent as part of the
2930
/// headers frame.
@@ -110,7 +111,7 @@ const ALL: u8 = END_STREAM
110111
// ===== impl Headers =====
111112

112113
impl Headers {
113-
pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap<HeaderValue>) -> Self {
114+
pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self {
114115
Headers {
115116
stream_id: stream_id,
116117
stream_dep: None,
@@ -251,6 +252,10 @@ impl Headers {
251252
request
252253
}
253254

255+
pub fn into_fields(self) -> HeaderMap {
256+
self.fields
257+
}
258+
254259
pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut)
255260
-> Option<Continuation>
256261
{

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub use proto::Connection;
4141
use bytes::Bytes;
4242

4343
pub type FrameSize = u32;
44+
// TODO: remove if carllerche/http#90 lands
45+
pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;
4446

4547
/// An H2 connection frame
4648
#[derive(Debug)]
@@ -57,7 +59,7 @@ pub enum Frame<T, B = Bytes> {
5759
},
5860
Trailers {
5961
id: StreamId,
60-
headers: (),
62+
headers: HeaderMap,
6163
},
6264
PushPromise {
6365
id: StreamId,

src/proto/connection.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use {ConnectionError, Frame, Peer};
2+
use HeaderMap;
23
use frame::{self, StreamId};
34
use client::Client;
45
use server::Server;
@@ -108,6 +109,17 @@ impl<T, P, B> Connection<T, P, B>
108109
})
109110
}
110111

112+
pub fn send_trailers(self,
113+
id: StreamId,
114+
headers: HeaderMap)
115+
-> sink::Send<Self>
116+
{
117+
self.send(Frame::Trailers {
118+
id,
119+
headers,
120+
})
121+
}
122+
111123
pub fn start_ping(&mut self, _body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
112124
unimplemented!();
113125
}
@@ -167,7 +179,7 @@ impl<T, P, B> Connection<T, P, B>
167179
Some(Headers(frame)) => {
168180
trace!("recv HEADERS; frame={:?}", frame);
169181
// Update stream state while ensuring that the headers frame
170-
// can be received
182+
// can be received.
171183
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
172184
let frame = Self::convert_poll_message(frame);
173185
return Ok(Some(frame).into());
@@ -224,8 +236,10 @@ impl<T, P, B> Connection<T, P, B>
224236

225237
fn convert_poll_message(frame: frame::Headers) -> Frame<P::Poll> {
226238
if frame.is_trailers() {
227-
// TODO: return trailers
228-
unimplemented!();
239+
Frame::Trailers {
240+
id: frame.stream_id(),
241+
headers: frame.into_fields()
242+
}
229243
} else {
230244
Frame::Headers {
231245
id: frame.stream_id(),
@@ -328,7 +342,6 @@ impl<T, P, B> Sink for Connection<T, P, B>
328342

329343
frame::Frame::Headers(frame)
330344
}
331-
332345
Frame::Data { id, data, end_of_stream } => {
333346
let frame = frame::Data::from_buf(
334347
id, data.into_buf(), end_of_stream);
@@ -337,13 +350,20 @@ impl<T, P, B> Sink for Connection<T, P, B>
337350

338351
frame.into()
339352
}
340-
341353
Frame::Reset { id, error } => frame::Reset::new(id, error).into(),
342-
343-
/*
344354
Frame::Trailers { id, headers } => {
345-
unimplemented!();
355+
let mut frame = frame::Headers::new(
356+
id,
357+
frame::Pseudo::default(),
358+
headers);
359+
360+
frame.set_end_stream();
361+
362+
self.streams.send_headers(&frame)?;
363+
364+
frame::Frame::Headers(frame)
346365
}
366+
/*
347367
Frame::PushPromise { id, promise } => {
348368
unimplemented!();
349369
}

src/proto/streams/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ impl<P: Peer> Streams<P> {
8787
};
8888

8989
if frame.is_trailers() {
90-
try!(self.inner.recv.recv_trailers(state, frame.is_end_stream()));
90+
if !frame.is_end_stream() {
91+
// TODO: What error should this return?
92+
unimplemented!();
93+
}
94+
95+
try!(self.inner.recv.recv_eos(state));
9196
} else {
9297
try!(self.inner.recv.recv_headers(state, frame.is_end_stream()));
9398
}
@@ -174,7 +179,7 @@ impl<P: Peer> Streams<P> {
174179
};
175180

176181
if frame.is_trailers() {
177-
try!(self.inner.send.send_trailers(state, frame.is_end_stream()));
182+
try!(self.inner.send.send_eos(state));
178183
} else {
179184
try!(self.inner.send.send_headers(state, frame.is_end_stream()));
180185
}

src/proto/streams/recv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ impl<P: Peer> Recv<P> {
7070
state.recv_open(self.init_window_sz, eos)
7171
}
7272

73-
pub fn recv_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
73+
pub fn recv_eos(&mut self, state: &mut state::Stream)
7474
-> Result<(), ConnectionError>
7575
{
76-
unimplemented!();
76+
state.recv_close()
7777
}
7878

7979
pub fn recv_data(&mut self,

src/proto/streams/send.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ impl<P: Peer> Send<P> {
7272
state.send_open(self.init_window_sz, eos)
7373
}
7474

75-
pub fn send_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
75+
pub fn send_eos(&mut self, state: &mut state::Stream)
7676
-> Result<(), ConnectionError>
7777
{
78-
unimplemented!();
78+
state.send_close()
7979
}
8080

8181
pub fn send_data<B: Buf>(&mut self,

0 commit comments

Comments
 (0)