Skip to content

Commit 1b0c922

Browse files
committed
feat(rt): replace IO traits with hyper::rt ones
1 parent d977f20 commit 1b0c922

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1007
-284
lines changed

benches/end_to_end.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
extern crate test;
55
mod support;
66

7-
// TODO: Reimplement Opts::bench using hyper::server::conn and hyper::client::conn
8-
// (instead of Server and HttpClient).
7+
// TODO: Reimplement parallel for HTTP/1
98

109
use std::convert::Infallible;
1110
use std::net::SocketAddr;
@@ -315,7 +314,8 @@ impl Opts {
315314

316315
let mut client = rt.block_on(async {
317316
if self.http2 {
318-
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
317+
let tcp = tokio::net::TcpStream::connect(&addr).await.unwrap();
318+
let io = support::TokioIo::new(tcp);
319319
let (tx, conn) = hyper::client::conn::http2::Builder::new(support::TokioExecutor)
320320
.initial_stream_window_size(self.http2_stream_window)
321321
.initial_connection_window_size(self.http2_conn_window)
@@ -328,7 +328,8 @@ impl Opts {
328328
} else if self.parallel_cnt > 1 {
329329
todo!("http/1 parallel >1");
330330
} else {
331-
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
331+
let tcp = tokio::net::TcpStream::connect(&addr).await.unwrap();
332+
let io = support::TokioIo::new(tcp);
332333
let (tx, conn) = hyper::client::conn::http1::Builder::new()
333334
.handshake(io)
334335
.await
@@ -414,14 +415,15 @@ fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
414415
let opts = opts.clone();
415416
rt.spawn(async move {
416417
while let Ok((sock, _)) = listener.accept().await {
418+
let io = support::TokioIo::new(sock);
417419
if opts.http2 {
418420
tokio::spawn(
419421
hyper::server::conn::http2::Builder::new(support::TokioExecutor)
420422
.initial_stream_window_size(opts.http2_stream_window)
421423
.initial_connection_window_size(opts.http2_conn_window)
422424
.adaptive_window(opts.http2_adaptive_window)
423425
.serve_connection(
424-
sock,
426+
io,
425427
service_fn(move |req: Request<hyper::body::Incoming>| async move {
426428
let mut req_body = req.into_body();
427429
while let Some(_chunk) = req_body.frame().await {}
@@ -433,7 +435,7 @@ fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
433435
);
434436
} else {
435437
tokio::spawn(hyper::server::conn::http1::Builder::new().serve_connection(
436-
sock,
438+
io,
437439
service_fn(move |req: Request<hyper::body::Incoming>| async move {
438440
let mut req_body = req.into_body();
439441
while let Some(_chunk) = req_body.frame().await {}

benches/pipeline.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
extern crate test;
55

6+
mod support;
7+
68
use std::convert::Infallible;
79
use std::io::{Read, Write};
810
use std::net::{SocketAddr, TcpStream};
@@ -40,11 +42,12 @@ fn hello_world_16(b: &mut test::Bencher) {
4042
rt.spawn(async move {
4143
loop {
4244
let (stream, _addr) = listener.accept().await.expect("accept");
45+
let io = support::TokioIo::new(stream);
4346

4447
http1::Builder::new()
4548
.pipeline_flush(true)
4649
.serve_connection(
47-
stream,
50+
io,
4851
service_fn(|_| async {
4952
Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(
5053
"Hello, World!",

benches/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
extern crate test;
55

6+
mod support;
7+
68
use std::io::{Read, Write};
79
use std::net::{SocketAddr, TcpListener, TcpStream};
810
use std::sync::mpsc;
@@ -38,10 +40,11 @@ macro_rules! bench_server {
3840
rt.spawn(async move {
3941
loop {
4042
let (stream, _) = listener.accept().await.expect("accept");
43+
let io = support::TokioIo::new(stream);
4144

4245
http1::Builder::new()
4346
.serve_connection(
44-
stream,
47+
io,
4548
service_fn(|_| async {
4649
Ok::<_, hyper::Error>(
4750
Response::builder()

benches/support/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
mod tokiort;
2-
pub use tokiort::{TokioExecutor, TokioTimer};
2+
pub use tokiort::{TokioExecutor, TokioIo, TokioTimer};

benches/support/tokiort.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,149 @@ impl Future for TokioSleep {
7979
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
8080

8181
impl Sleep for TokioSleep {}
82+
83+
pin_project! {
84+
#[derive(Debug)]
85+
pub struct TokioIo<T> {
86+
#[pin]
87+
inner: T,
88+
}
89+
}
90+
91+
impl<T> TokioIo<T> {
92+
pub fn new(inner: T) -> Self {
93+
Self { inner }
94+
}
95+
96+
pub fn inner(self) -> T {
97+
self.inner
98+
}
99+
}
100+
101+
impl<T> hyper::rt::Read for TokioIo<T>
102+
where
103+
T: tokio::io::AsyncRead,
104+
{
105+
fn poll_read(
106+
self: Pin<&mut Self>,
107+
cx: &mut Context<'_>,
108+
mut buf: hyper::rt::ReadBufCursor<'_>,
109+
) -> Poll<Result<(), std::io::Error>> {
110+
let n = unsafe {
111+
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
112+
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
113+
Poll::Ready(Ok(())) => tbuf.filled().len(),
114+
other => return other,
115+
}
116+
};
117+
118+
unsafe {
119+
buf.advance(n);
120+
}
121+
Poll::Ready(Ok(()))
122+
}
123+
}
124+
125+
impl<T> hyper::rt::Write for TokioIo<T>
126+
where
127+
T: tokio::io::AsyncWrite,
128+
{
129+
fn poll_write(
130+
self: Pin<&mut Self>,
131+
cx: &mut Context<'_>,
132+
buf: &[u8],
133+
) -> Poll<Result<usize, std::io::Error>> {
134+
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
135+
}
136+
137+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
138+
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
139+
}
140+
141+
fn poll_shutdown(
142+
self: Pin<&mut Self>,
143+
cx: &mut Context<'_>,
144+
) -> Poll<Result<(), std::io::Error>> {
145+
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
146+
}
147+
148+
fn is_write_vectored(&self) -> bool {
149+
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
150+
}
151+
152+
fn poll_write_vectored(
153+
self: Pin<&mut Self>,
154+
cx: &mut Context<'_>,
155+
bufs: &[std::io::IoSlice<'_>],
156+
) -> Poll<Result<usize, std::io::Error>> {
157+
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
158+
}
159+
}
160+
161+
impl<T> tokio::io::AsyncRead for TokioIo<T>
162+
where
163+
T: hyper::rt::Read,
164+
{
165+
fn poll_read(
166+
self: Pin<&mut Self>,
167+
cx: &mut Context<'_>,
168+
tbuf: &mut tokio::io::ReadBuf<'_>,
169+
) -> Poll<Result<(), std::io::Error>> {
170+
//let init = tbuf.initialized().len();
171+
let filled = tbuf.filled().len();
172+
let sub_filled = unsafe {
173+
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
174+
175+
match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
176+
Poll::Ready(Ok(())) => buf.filled().len(),
177+
other => return other,
178+
}
179+
};
180+
181+
let n_filled = filled + sub_filled;
182+
// At least sub_filled bytes had to have been initialized.
183+
let n_init = sub_filled;
184+
unsafe {
185+
tbuf.assume_init(n_init);
186+
tbuf.set_filled(n_filled);
187+
}
188+
189+
Poll::Ready(Ok(()))
190+
}
191+
}
192+
193+
impl<T> tokio::io::AsyncWrite for TokioIo<T>
194+
where
195+
T: hyper::rt::Write,
196+
{
197+
fn poll_write(
198+
self: Pin<&mut Self>,
199+
cx: &mut Context<'_>,
200+
buf: &[u8],
201+
) -> Poll<Result<usize, std::io::Error>> {
202+
hyper::rt::Write::poll_write(self.project().inner, cx, buf)
203+
}
204+
205+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
206+
hyper::rt::Write::poll_flush(self.project().inner, cx)
207+
}
208+
209+
fn poll_shutdown(
210+
self: Pin<&mut Self>,
211+
cx: &mut Context<'_>,
212+
) -> Poll<Result<(), std::io::Error>> {
213+
hyper::rt::Write::poll_shutdown(self.project().inner, cx)
214+
}
215+
216+
fn is_write_vectored(&self) -> bool {
217+
hyper::rt::Write::is_write_vectored(&self.inner)
218+
}
219+
220+
fn poll_write_vectored(
221+
self: Pin<&mut Self>,
222+
cx: &mut Context<'_>,
223+
bufs: &[std::io::IoSlice<'_>],
224+
) -> Poll<Result<usize, std::io::Error>> {
225+
hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
226+
}
227+
}

examples/client.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use hyper::Request;
88
use tokio::io::{self, AsyncWriteExt as _};
99
use tokio::net::TcpStream;
1010

11+
#[path = "../benches/support/mod.rs"]
12+
mod support;
13+
use support::TokioIo;
14+
1115
// A simple type alias so as to DRY.
1216
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1317

@@ -40,8 +44,9 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
4044
let port = url.port_u16().unwrap_or(80);
4145
let addr = format!("{}:{}", host, port);
4246
let stream = TcpStream::connect(addr).await?;
47+
let io = TokioIo::new(stream);
4348

44-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
49+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4550
tokio::task::spawn(async move {
4651
if let Err(err) = conn.await {
4752
println!("Connection failed: {:?}", err);

examples/client_json.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use hyper::{body::Buf, Request};
77
use serde::Deserialize;
88
use tokio::net::TcpStream;
99

10+
#[path = "../benches/support/mod.rs"]
11+
mod support;
12+
use support::TokioIo;
13+
1014
// A simple type alias so as to DRY.
1115
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1216

@@ -29,8 +33,9 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
2933
let addr = format!("{}:{}", host, port);
3034

3135
let stream = TcpStream::connect(addr).await?;
36+
let io = TokioIo::new(stream);
3237

33-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
38+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
3439
tokio::task::spawn(async move {
3540
if let Err(err) = conn.await {
3641
println!("Connection failed: {:?}", err);

examples/echo.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{body::Body, Method, Request, Response, StatusCode};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
/// This is our service handler. It receives a Request, routes on its
1418
/// path, and returns a Future of a Response.
1519
async fn echo(
@@ -92,10 +96,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
9296
println!("Listening on http://{}", addr);
9397
loop {
9498
let (stream, _) = listener.accept().await?;
99+
let io = TokioIo::new(stream);
95100

96101
tokio::task::spawn(async move {
97102
if let Err(err) = http1::Builder::new()
98-
.serve_connection(stream, service_fn(echo))
103+
.serve_connection(io, service_fn(echo))
99104
.await
100105
{
101106
println!("Error serving connection: {:?}", err);

examples/gateway.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ use hyper::{server::conn::http1, service::service_fn};
44
use std::net::SocketAddr;
55
use tokio::net::{TcpListener, TcpStream};
66

7+
#[path = "../benches/support/mod.rs"]
8+
mod support;
9+
use support::TokioIo;
10+
711
#[tokio::main]
812
async fn main() -> Result<(), Box<dyn std::error::Error>> {
913
pretty_env_logger::init();
@@ -20,6 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2024

2125
loop {
2226
let (stream, _) = listener.accept().await?;
27+
let io = TokioIo::new(stream);
2328

2429
// This is the `Service` that will handle the connection.
2530
// `service_fn` is a helper to convert a function that
@@ -42,9 +47,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4247

4348
async move {
4449
let client_stream = TcpStream::connect(addr).await.unwrap();
50+
let io = TokioIo::new(client_stream);
4551

46-
let (mut sender, conn) =
47-
hyper::client::conn::http1::handshake(client_stream).await?;
52+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4853
tokio::task::spawn(async move {
4954
if let Err(err) = conn.await {
5055
println!("Connection failed: {:?}", err);
@@ -56,10 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5661
});
5762

5863
tokio::task::spawn(async move {
59-
if let Err(err) = http1::Builder::new()
60-
.serve_connection(stream, service)
61-
.await
62-
{
64+
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
6365
println!("Failed to serve the connection: {:?}", err);
6466
}
6567
});

examples/hello.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{Request, Response};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
// An async function that consumes a request, does nothing with it and returns a
1418
// response.
1519
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
@@ -35,7 +39,10 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3539
// has work to do. In this case, a connection arrives on the port we are listening on and
3640
// the task is woken up, at which point the task is then put back on a thread, and is
3741
// driven forward by the runtime, eventually yielding a TCP stream.
38-
let (stream, _) = listener.accept().await?;
42+
let (tcp, _) = listener.accept().await?;
43+
// Use an adapter to access something implementing `tokio::io` traits as if they implement
44+
// `hyper::rt` IO traits.
45+
let io = TokioIo::new(tcp);
3946

4047
// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
4148
// current task without waiting for the processing of the HTTP1 connection we just received
@@ -44,7 +51,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4451
// Handle the connection from the client using HTTP1 and pass any
4552
// HTTP requests received on that connection to the `hello` function
4653
if let Err(err) = http1::Builder::new()
47-
.serve_connection(stream, service_fn(hello))
54+
.serve_connection(io, service_fn(hello))
4855
.await
4956
{
5057
println!("Error serving connection: {:?}", err);

0 commit comments

Comments
 (0)