From ef0ab81daac4cebb97c7f80fea0b247ec87438d1 Mon Sep 17 00:00:00 2001 From: Julian Antonielli Date: Mon, 17 Oct 2022 15:56:26 +0100 Subject: [PATCH 1/3] Port over failing miri test --- Cargo.toml | 1 + src/lib.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index dcfd3c3da9..3f1665c9f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ include = [ [dependencies] bytes = "1" +bytes-utils = "0.1" futures-core = { version = "0.3", default-features = false } futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index 3a2202dff6..5c50a746ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,3 +107,54 @@ cfg_feature! { #[doc(no_inline)] pub use crate::server::Server; } + +use pin_project_lite::pin_project; + +pin_project! { + #[derive(Debug)] + struct Inner { + #[pin] + body: Body, + } +} + +use bytes::Bytes; +use bytes_utils::SegmentedBuf; + +use crate::body::HttpBody; + +impl Inner { + fn new(body: Body) -> Self { + Self { body } + } + + async fn collect(self) -> std::result::Result, crate::Error> { + let mut output = SegmentedBuf::new(); + let body = self.body; + futures_util::pin_mut!(body); + while let Some(buf) = body.data().await { + output.push(buf?); + } + Ok(output) + } +} + +use bytes::Buf; + +#[tokio::test] +async fn read_from_channel_body() { + let (mut sender, body) = Body::channel(); + let byte_stream = Inner::new(body); + tokio::spawn(async move { + sender.send_data(Bytes::from("data 1")).await.unwrap(); + sender.send_data(Bytes::from("data 2")).await.unwrap(); + sender.send_data(Bytes::from("data 3")).await.unwrap(); + }); + + let mut aggregated_bytes: SegmentedBuf = byte_stream.collect().await.expect("no errors"); + + assert_eq!( + aggregated_bytes.copy_to_bytes(aggregated_bytes.remaining()), + Bytes::from("data 1data 2data 3") + ); +} From 9f63c171fc00d93e0f0eefc04c4ef66035d9f5b8 Mon Sep 17 00:00:00 2001 From: Julian Antonielli Date: Mon, 17 Oct 2022 16:03:22 +0100 Subject: [PATCH 2/3] Move miri failure to a more appropriate module --- src/body/body.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 51 ------------------------------------------------ 2 files changed, 50 insertions(+), 51 deletions(-) diff --git a/src/body/body.rs b/src/body/body.rs index 9dc1a034f9..86600f6a9e 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -733,6 +733,56 @@ mod tests { assert!(rx.data().await.is_none()); } + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + struct Inner { + #[pin] + body: Body, + } + } + + use bytes::Bytes; + use bytes_utils::SegmentedBuf; + + impl Inner { + fn new(body: Body) -> Self { + Self { body } + } + + async fn collect(self) -> std::result::Result, crate::Error> { + let mut output = SegmentedBuf::new(); + let body = self.body; + futures_util::pin_mut!(body); + while let Some(buf) = body.data().await { + output.push(buf?); + } + Ok(output) + } + } + + use bytes::Buf; + + #[tokio::test] + async fn read_from_channel_body() { + let (mut sender, body) = Body::channel(); + let byte_stream = Inner::new(body); + tokio::spawn(async move { + sender.send_data(Bytes::from("data 1")).await.unwrap(); + sender.send_data(Bytes::from("data 2")).await.unwrap(); + sender.send_data(Bytes::from("data 3")).await.unwrap(); + }); + + let mut aggregated_bytes: SegmentedBuf = + byte_stream.collect().await.expect("no errors"); + + assert_eq!( + aggregated_bytes.copy_to_bytes(aggregated_bytes.remaining()), + Bytes::from("data 1data 2data 3") + ); + } + #[test] fn channel_ready() { let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); diff --git a/src/lib.rs b/src/lib.rs index 5c50a746ca..3a2202dff6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,54 +107,3 @@ cfg_feature! { #[doc(no_inline)] pub use crate::server::Server; } - -use pin_project_lite::pin_project; - -pin_project! { - #[derive(Debug)] - struct Inner { - #[pin] - body: Body, - } -} - -use bytes::Bytes; -use bytes_utils::SegmentedBuf; - -use crate::body::HttpBody; - -impl Inner { - fn new(body: Body) -> Self { - Self { body } - } - - async fn collect(self) -> std::result::Result, crate::Error> { - let mut output = SegmentedBuf::new(); - let body = self.body; - futures_util::pin_mut!(body); - while let Some(buf) = body.data().await { - output.push(buf?); - } - Ok(output) - } -} - -use bytes::Buf; - -#[tokio::test] -async fn read_from_channel_body() { - let (mut sender, body) = Body::channel(); - let byte_stream = Inner::new(body); - tokio::spawn(async move { - sender.send_data(Bytes::from("data 1")).await.unwrap(); - sender.send_data(Bytes::from("data 2")).await.unwrap(); - sender.send_data(Bytes::from("data 3")).await.unwrap(); - }); - - let mut aggregated_bytes: SegmentedBuf = byte_stream.collect().await.expect("no errors"); - - assert_eq!( - aggregated_bytes.copy_to_bytes(aggregated_bytes.remaining()), - Bytes::from("data 1data 2data 3") - ); -} From 5522a71e83597cf301e3b57e6a2fba02ffa3044d Mon Sep 17 00:00:00 2001 From: Julian Antonielli Date: Tue, 18 Oct 2022 09:22:24 +0100 Subject: [PATCH 3/3] Remove bytes-utils dependency --- Cargo.toml | 1 - src/body/body.rs | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f1665c9f9..dcfd3c3da9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ include = [ [dependencies] bytes = "1" -bytes-utils = "0.1" futures-core = { version = "0.3", default-features = false } futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } diff --git a/src/body/body.rs b/src/body/body.rs index 86600f6a9e..14c62573ac 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -743,16 +743,16 @@ mod tests { } } + use crate::common::buf::BufList; use bytes::Bytes; - use bytes_utils::SegmentedBuf; impl Inner { fn new(body: Body) -> Self { Self { body } } - async fn collect(self) -> std::result::Result, crate::Error> { - let mut output = SegmentedBuf::new(); + async fn collect(self) -> std::result::Result, crate::Error> { + let mut output = BufList::new(); let body = self.body; futures_util::pin_mut!(body); while let Some(buf) = body.data().await { @@ -774,8 +774,7 @@ mod tests { sender.send_data(Bytes::from("data 3")).await.unwrap(); }); - let mut aggregated_bytes: SegmentedBuf = - byte_stream.collect().await.expect("no errors"); + let mut aggregated_bytes: BufList = byte_stream.collect().await.expect("no errors"); assert_eq!( aggregated_bytes.copy_to_bytes(aggregated_bytes.remaining()),