From ae27733a7a99c7c1e58dbfee6c55a3fbce8ca064 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 8 May 2023 17:33:58 +0100 Subject: [PATCH] feat!: remove tokio-03 crate feature --- CHANGELOG.md | 2 + Cargo.lock | 68 +----- Cargo.toml | 8 +- src/lib.rs | 11 - src/stream/mod.rs | 36 --- src/tokio_03/bufread/generic/decoder.rs | 145 ------------ src/tokio_03/bufread/generic/encoder.rs | 117 ---------- src/tokio_03/bufread/generic/mod.rs | 4 - src/tokio_03/bufread/macros/decoder.rs | 84 ------- src/tokio_03/bufread/macros/encoder.rs | 76 ------- src/tokio_03/bufread/macros/mod.rs | 4 - src/tokio_03/bufread/mod.rs | 10 - src/tokio_03/mod.rs | 4 - src/tokio_03/write/buf_write.rs | 32 --- src/tokio_03/write/buf_writer.rs | 209 ------------------ src/tokio_03/write/generic/decoder.rs | 175 --------------- src/tokio_03/write/generic/encoder.rs | 163 -------------- src/tokio_03/write/generic/mod.rs | 4 - src/tokio_03/write/macros/decoder.rs | 91 -------- src/tokio_03/write/macros/encoder.rs | 90 -------- src/tokio_03/write/macros/mod.rs | 4 - src/tokio_03/write/mod.rs | 17 -- tests/proptest.rs | 3 - tests/utils/algos.rs | 3 - tests/utils/impls.rs | 70 ------ tests/utils/mod.rs | 2 - tests/utils/test_cases.rs | 3 - tests/utils/tokio_03_ext/copy_buf.rs | 52 ----- .../utils/tokio_03_ext/interleave_pending.rs | 66 ------ tests/utils/tokio_03_ext/limited.rs | 35 --- tests/utils/tokio_03_ext/mod.rs | 23 -- tests/utils/track_closed.rs | 24 -- tests/utils/track_eof.rs | 43 ---- 33 files changed, 4 insertions(+), 1674 deletions(-) delete mode 100644 src/tokio_03/bufread/generic/decoder.rs delete mode 100644 src/tokio_03/bufread/generic/encoder.rs delete mode 100644 src/tokio_03/bufread/generic/mod.rs delete mode 100644 src/tokio_03/bufread/macros/decoder.rs delete mode 100644 src/tokio_03/bufread/macros/encoder.rs delete mode 100644 src/tokio_03/bufread/macros/mod.rs delete mode 100644 src/tokio_03/bufread/mod.rs delete mode 100644 src/tokio_03/mod.rs delete mode 100644 src/tokio_03/write/buf_write.rs delete mode 100644 src/tokio_03/write/buf_writer.rs delete mode 100644 src/tokio_03/write/generic/decoder.rs delete mode 100644 src/tokio_03/write/generic/encoder.rs delete mode 100644 src/tokio_03/write/generic/mod.rs delete mode 100644 src/tokio_03/write/macros/decoder.rs delete mode 100644 src/tokio_03/write/macros/encoder.rs delete mode 100644 src/tokio_03/write/macros/mod.rs delete mode 100644 src/tokio_03/write/mod.rs delete mode 100644 tests/utils/tokio_03_ext/copy_buf.rs delete mode 100644 tests/utils/tokio_03_ext/interleave_pending.rs delete mode 100644 tests/utils/tokio_03_ext/limited.rs delete mode 100644 tests/utils/tokio_03_ext/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 963b6481..6d751b45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0), ## Unreleased +- Remove Tokio v0.3.x support (`tokio-03` crate feature). + ## 0.3.15 - 2022-10-08 - `Level::Default::into_zstd()` now returns libzstd's default value `3`. diff --git a/Cargo.lock b/Cargo.lock index 85f9c595..4b380147 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,7 +29,6 @@ version = "0.3.15" dependencies = [ "brotli", "bytes 0.5.6", - "bytes 0.6.0", "bytes 1.1.0", "bzip2", "flate2", @@ -44,12 +43,8 @@ dependencies = [ "proptest-derive", "rand", "tokio 0.2.25", - "tokio 0.3.7", "tokio 1.24.2", - "tokio-util 0.3.1", - "tokio-util 0.4.0", - "tokio-util 0.5.1", - "tokio-util 0.6.9", + "tokio-util", "xz2", "zstd", "zstd-safe", @@ -115,12 +110,6 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" -[[package]] -name = "bytes" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" - [[package]] name = "bytes" version = "1.1.0" @@ -776,19 +765,6 @@ dependencies = [ "tokio-macros", ] -[[package]] -name = "tokio" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46409491c9375a693ce7032101970a54f8a2010efb77e13f70788f0d84489e39" -dependencies = [ - "autocfg", - "bytes 0.6.0", - "futures-core", - "memchr", - "pin-project-lite 0.2.8", -] - [[package]] name = "tokio" version = "1.24.2" @@ -813,48 +789,6 @@ dependencies = [ "syn 1.0.86", ] -[[package]] -name = "tokio-util" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.1.12", - "tokio 0.2.25", -] - -[[package]] -name = "tokio-util" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24793699f4665ba0416ed287dc794fe6b11a4aa5e4e95b58624f45f6c46b97d4" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.1.12", - "tokio 0.3.7", -] - -[[package]] -name = "tokio-util" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3137de2b078e95274b696cc522e87f22c9a753fe3ef3344116ffb94f104f10a3" -dependencies = [ - "bytes 0.6.0", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.2.8", - "tokio 0.3.7", -] - [[package]] name = "tokio-util" version = "0.6.9" diff --git a/Cargo.toml b/Cargo.toml index 8e9b0823..f24adb36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"] # groups default = [] all = ["all-implementations", "all-algorithms"] -all-implementations = ["futures-io", "stream", "tokio-02", "tokio-03", "tokio"] +all-implementations = ["futures-io", "stream", "tokio-02", "tokio"] all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lzma", "xz", "zlib", "zstd"] # algorithms @@ -48,7 +48,6 @@ libzstd = { package = "zstd", version = "0.11.1", optional = true, default-featu zstd-safe = { version = "5.0.1", optional = true, default-features = false } memchr = "2.2.1" tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } -tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } tokio = { version = "1.0.0", optional = true, default-features = false } [dev-dependencies] @@ -59,14 +58,9 @@ futures = "0.3.5" futures-test = "0.3.5" ntest = "0.8.1" bytes-05 = { package = "bytes", version = "0.5.0" } -bytes-06 = { package = "bytes", version = "0.6.0" } bytes = "1.0.0" tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream", "macros", "io-std"] } -tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] } tokio = { version = "1.0.0", default-features = false, features = ["io-util"] } -tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = false, features = ["codec"] } -tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } -tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } tokio-util-06 = { package = "tokio-util", version = "0.6.0", default-features = false, features = ["io"] } [[test]] diff --git a/src/lib.rs b/src/lib.rs index 108cb859..1ce78287 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,14 +61,6 @@ not(feature = "tokio-02"), doc = "`tokio-02` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`" )] -#![cfg_attr( - feature = "tokio-03", - doc = "[`tokio-03`](crate::tokio_03) | [`tokio::io::AsyncBufRead`](::tokio_03::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio_03::io::AsyncWrite)" -)] -#![cfg_attr( - not(feature = "tokio-03"), - doc = "`tokio-03` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`" -)] #![cfg_attr( feature = "tokio", doc = "[`tokio`](crate::tokio) | [`tokio::io::AsyncBufRead`](::tokio::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio::io::AsyncWrite)" @@ -178,9 +170,6 @@ pub mod tokio; #[cfg(feature = "tokio-02")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))] pub mod tokio_02; -#[cfg(feature = "tokio-03")] -#[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))] -pub mod tokio_03; mod unshared; mod util; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7dae314f..fe2e7473 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -40,28 +40,6 @@ //! ).map_ok(|bytes| bytes.freeze()) //! } //! -//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.5 using `tokio-util` v0.4 -//! fn tokio_03_bytes_05( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_04::io::ReaderStream::new( -//! async_compression::tokio_03::bufread::GzipEncoder::new( -//! tokio_util_04::io::StreamReader::new(input), -//! ), -//! ) -//! } -//! -//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.6 using `tokio-util` v0.5 -//! fn tokio_03_bytes_06( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_05::io::ReaderStream::new( -//! async_compression::tokio_03::bufread::GzipEncoder::new( -//! tokio_util_05::io::StreamReader::new(input), -//! ), -//! ) -//! } -//! //! /// Upgrade replacement with `tokio` v1.0 and `bytes` v1.0 using `tokio-util` v0.6 //! fn tokio_bytes( //! input: impl Stream>, @@ -109,20 +87,6 @@ //! # ); //! # assert_eq!( //! # expected, -//! # tokio_03_bytes_05(data().map_ok(bytes_05::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, -//! # tokio_03_bytes_06(data().map_ok(bytes_06::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, //! # tokio_bytes(data().map_ok(bytes::Bytes::from)) //! # .map_ok(|bytes| bytes.as_ref().into()) //! # .try_collect::>>() diff --git a/src/tokio_03/bufread/generic/decoder.rs b/src/tokio_03/bufread/generic/decoder.rs deleted file mode 100644 index 796218c9..00000000 --- a/src/tokio_03/bufread/generic/decoder.rs +++ /dev/null @@ -1,145 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Decode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf}; - -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - state: State, - multiple_members: bool, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Decoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - if done { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Decoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = PartialBuffer::new(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written().len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } - } -} diff --git a/src/tokio_03/bufread/generic/encoder.rs b/src/tokio_03/bufread/generic/encoder.rs deleted file mode 100644 index 0c47ce2b..00000000 --- a/src/tokio_03/bufread/generic/encoder.rs +++ /dev/null @@ -1,117 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Encode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf}; - -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - reader: R, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Encoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - State::Encoding - } - } - - State::Flushing => { - if this.encoder.finish(output)? { - State::Done - } else { - State::Flushing - } - } - - State::Done => State::Done, - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Encoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = PartialBuffer::new(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written().len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } - } -} diff --git a/src/tokio_03/bufread/generic/mod.rs b/src/tokio_03/bufread/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_03/bufread/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_03/bufread/macros/decoder.rs b/src/tokio_03/bufread/macros/decoder.rs deleted file mode 100644 index 55affc12..00000000 --- a/src/tokio_03/bufread/macros/decoder.rs +++ /dev/null @@ -1,84 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_03::io::AsyncRead) interface and will - /// read compressed data from an underlying stream and emit a stream of uncompressed data. - pub struct $name { - #[pin] - inner: crate::tokio_03::bufread::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will read compressed data from the given stream and - /// emit a uncompressed stream. - pub fn new(read: R) -> $name { - $name { - inner: crate::tokio_03::bufread::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Configure multi-member/frame decoding, if enabled this will reset the decoder state - /// when reaching the end of a compressed member/frame and expect either EOF or another - /// compressed member/frame to follow it in the stream. - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &R { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut R> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> R { - self.inner.into_inner() - } - } - - impl tokio_03::io::AsyncRead for $name { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio_03::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/bufread/macros/encoder.rs b/src/tokio_03/bufread/macros/encoder.rs deleted file mode 100644 index 3204d15d..00000000 --- a/src/tokio_03/bufread/macros/encoder.rs +++ /dev/null @@ -1,76 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_03::io::AsyncRead) interface and will - /// read uncompressed data from an underlying stream and emit a stream of compressed data. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_03::bufread::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_03::io::AsyncBufRead> $name<$inner> { - $( - /// Creates a new encoder which will read uncompressed data from the given stream - /// and emit a compressed stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying reader that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying reader. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_03::io::AsyncBufRead> tokio_03::io::AsyncRead for $name<$inner> { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio_03::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/bufread/macros/mod.rs b/src/tokio_03/bufread/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_03/bufread/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_03/bufread/mod.rs b/src/tokio_03/bufread/mod.rs deleted file mode 100644 index 9dc41344..00000000 --- a/src/tokio_03/bufread/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Types which operate over [`AsyncBufRead`](::tokio_03::io::AsyncBufRead) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -pub(crate) use generic::{Decoder, Encoder}; - -algos!(tokio_03::bufread); diff --git a/src/tokio_03/mod.rs b/src/tokio_03/mod.rs deleted file mode 100644 index 45f3e0d9..00000000 --- a/src/tokio_03/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Implementations for IO traits exported by [`tokio` v0.3](::tokio_03). - -pub mod bufread; -pub mod write; diff --git a/src/tokio_03/write/buf_write.rs b/src/tokio_03/write/buf_write.rs deleted file mode 100644 index 5ca99731..00000000 --- a/src/tokio_03/write/buf_write.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::{ - io, - pin::Pin, - task::{Context, Poll}, -}; - -pub(crate) trait AsyncBufWrite { - /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if - /// it is full. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and - /// arranges for the current task context (`cx`) to receive a notification when the object - /// becomes readable or is closed. - fn poll_partial_flush_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be - /// written out to the underlying IO when possible. - /// - /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to - /// function properly. This function does not perform any I/O, it simply informs this object - /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should - /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't - /// called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. - fn produce(self: Pin<&mut Self>, amt: usize); -} diff --git a/src/tokio_03/write/buf_writer.rs b/src/tokio_03/write/buf_writer.rs deleted file mode 100644 index f10f7074..00000000 --- a/src/tokio_03/write/buf_writer.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that -// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient -// with those methods. - -use super::AsyncBufWrite; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - cmp::min, - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; -use tokio_03::io::AsyncWrite; - -const DEFAULT_BUF_SIZE: usize = 8192; - -pin_project! { - pub struct BufWriter { - #[pin] - inner: W, - buf: Box<[u8]>, - written: usize, - buffered: usize, - } -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: vec![0; cap].into(), - written: 0, - buffered: 0, - } - } - - fn partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered]) - { - Poll::Pending => { - break; - } - Poll::Ready(Ok(0)) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Poll::Ready(Ok(n)) => *this.written += n, - Poll::Ready(Err(e)) => { - ret = Err(e); - break; - } - } - } - - if *this.written > 0 { - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - - Poll::Ready(ret) - } else if *this.buffered == 0 { - Poll::Ready(ret) - } else { - ret?; - Poll::Pending - } - } - - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match ready!(this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered])) - { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *this.written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - Poll::Ready(ret) - } - - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.as_mut().project(); - if *this.buffered + buf.len() > this.buf.len() { - ready!(self.as_mut().partial_flush_buf(cx))?; - } - - let this = self.as_mut().project(); - if buf.len() >= this.buf.len() { - if *this.buffered == 0 { - this.inner.poll_write(cx, buf) - } else { - // The only way that `partial_flush_buf` would have returned with - // `this.buffered != 0` is if it were Pending, so our waker was already queued - Poll::Pending - } - } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); - this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); - *this.buffered += len; - Poll::Ready(Ok(len)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_shutdown(cx) - } -} - -impl AsyncBufWrite for BufWriter { - fn poll_partial_flush_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().partial_flush_buf(cx))?; - let this = self.project(); - Poll::Ready(Ok(&mut this.buf[*this.buffered..])) - } - - fn produce(self: Pin<&mut Self>, amt: usize) { - *self.project().buffered += amt; - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buffered, self.buf.len()), - ) - .field("written", &self.written) - .finish() - } -} diff --git a/src/tokio_03/write/generic/decoder.rs b/src/tokio_03/write/generic/decoder.rs deleted file mode 100644 index 944b35b8..00000000 --- a/src/tokio_03/write/generic/decoder.rs +++ /dev/null @@ -1,175 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::{Error, ErrorKind, Result}; - -use crate::{ - codec::Decode, - tokio_03::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Decoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - writer: BufWriter, - decoder: D, - state: State, - } -} - -impl Decoder { - pub fn new(writer: W, decoder: D) -> Self { - Self { - writer: BufWriter::new(writer), - decoder, - state: State::Decoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Decoding => { - if this.decoder.decode(input, &mut output)? { - State::Finishing - } else { - State::Decoding - } - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => panic!("Write after end of stream"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let (state, done) = match this.state { - State::Decoding => { - let done = this.decoder.flush(&mut output)?; - (State::Decoding, done) - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - (State::Done, false) - } else { - (State::Finishing, false) - } - } - - State::Done => (State::Done, true), - }; - - *this.state = state; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Decoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let State::Decoding = self.as_mut().project().state { - *self.as_mut().project().state = State::Finishing; - } - - ready!(self.as_mut().do_poll_flush(cx))?; - - if let State::Done = self.as_mut().project().state { - ready!(self.as_mut().project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(Error::new( - ErrorKind::Other, - "Attempt to shutdown before finishing input", - ))) - } - } -} diff --git a/src/tokio_03/write/generic/encoder.rs b/src/tokio_03/write/generic/encoder.rs deleted file mode 100644 index 2121ce1d..00000000 --- a/src/tokio_03/write/generic/encoder.rs +++ /dev/null @@ -1,163 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{ - codec::Encode, - tokio_03::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Encoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - writer: BufWriter, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding => { - this.encoder.encode(input, &mut output)?; - State::Encoding - } - - State::Finishing | State::Done => panic!("Write after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let done = match this.state { - State::Encoding => this.encoder.flush(&mut output)?, - - State::Finishing | State::Done => panic!("Flush after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding | State::Finishing => { - if this.encoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Encoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_shutdown(cx))?; - ready!(self.project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } -} diff --git a/src/tokio_03/write/generic/mod.rs b/src/tokio_03/write/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_03/write/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_03/write/macros/decoder.rs b/src/tokio_03/write/macros/decoder.rs deleted file mode 100644 index c573fe2e..00000000 --- a/src/tokio_03/write/macros/decoder.rs +++ /dev/null @@ -1,91 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_03::io::AsyncWrite) interface and will - /// take in compressed data and write it uncompressed to an underlying stream. - pub struct $name { - #[pin] - inner: crate::tokio_03::write::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will take in compressed data and write it uncompressedd - /// to the given stream. - pub fn new(read: W) -> $name { - $name { - inner: crate::tokio_03::write::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &W { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut W { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut W> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> W { - self.inner.into_inner() - } - } - - impl tokio_03::io::AsyncWrite for $name { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/write/macros/encoder.rs b/src/tokio_03/write/macros/encoder.rs deleted file mode 100644 index 449813c6..00000000 --- a/src/tokio_03/write/macros/encoder.rs +++ /dev/null @@ -1,90 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_03::io::AsyncWrite) interface and will - /// take in uncompressed data and write it compressed to an underlying stream. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_03::write::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_03::io::AsyncWrite> $name<$inner> { - $( - /// Creates a new encoder which will take in uncompressed data and write it - /// compressed to the given stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying writer that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying writer. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_03::io::AsyncWrite> tokio_03::io::AsyncWrite for $name<$inner> { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/write/macros/mod.rs b/src/tokio_03/write/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_03/write/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_03/write/mod.rs b/src/tokio_03/write/mod.rs deleted file mode 100644 index 5387da70..00000000 --- a/src/tokio_03/write/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Types which operate over [`AsyncWrite`](tokio_03::io::AsyncWrite) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -mod buf_write; -mod buf_writer; - -use self::{ - buf_write::AsyncBufWrite, - buf_writer::BufWriter, - generic::{Decoder, Encoder}, -}; - -algos!(tokio_03::write); diff --git a/tests/proptest.rs b/tests/proptest.rs index 1a0e23ee..02f80d62 100644 --- a/tests/proptest.rs +++ b/tests/proptest.rs @@ -155,9 +155,6 @@ macro_rules! tests { #[cfg(feature = "tokio-02")] io_tests!(tokio_02, $variant); - #[cfg(feature = "tokio-03")] - io_tests!(tokio_03, $variant); - #[cfg(feature = "tokio")] io_tests!(tokio, $variant); } diff --git a/tests/utils/algos.rs b/tests/utils/algos.rs index 8cdfeb66..e1589708 100644 --- a/tests/utils/algos.rs +++ b/tests/utils/algos.rs @@ -81,9 +81,6 @@ macro_rules! algos { #[cfg(feature = "tokio-02")] io_algo!(tokio_02, $name($encoder, $decoder)); - #[cfg(feature = "tokio-03")] - io_algo!(tokio_03, $name($encoder, $decoder)); - #[cfg(feature = "tokio")] io_algo!(tokio, $name($encoder, $decoder)); } diff --git a/tests/utils/impls.rs b/tests/utils/impls.rs index 18cea2eb..edd8fd65 100644 --- a/tests/utils/impls.rs +++ b/tests/utils/impls.rs @@ -166,76 +166,6 @@ pub mod tokio_02 { } } -#[cfg(feature = "tokio-03")] -pub mod tokio_03 { - pub mod bufread { - use crate::utils::{InputStream, TrackEof}; - pub use tokio_03::io::AsyncBufRead; - use tokio_util_04::io::StreamReader; - - pub fn from(input: &InputStream) -> impl AsyncBufRead { - // By using the stream here we ensure that each chunk will require a separate - // read/poll_fill_buf call to process to help test reading multiple chunks. - TrackEof::new(StreamReader::new(input.bytes_05_stream())) - } - } - - pub mod read { - use crate::utils::{block_on, pin_mut, tokio_03_ext::copy_buf}; - use std::io::Cursor; - use tokio_03::io::{AsyncRead, AsyncReadExt, BufReader}; - - pub fn to_vec(read: impl AsyncRead) -> Vec { - let mut output = Cursor::new(vec![0; 102_400]); - pin_mut!(read); - let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); - let mut output = output.into_inner(); - output.truncate(len as usize); - output - } - - pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result { - pin_mut!(reader); - block_on(reader.read(output)) - } - } - - pub mod write { - use crate::utils::{ - block_on, tokio_03_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, - }; - use std::io::Cursor; - use tokio_03::io::{AsyncWrite, AsyncWriteExt as _}; - - pub fn to_vec( - input: &[Vec], - create_writer: impl for<'a> FnOnce( - &'a mut (dyn AsyncWrite + Unpin), - ) -> Pin>, - limit: usize, - ) -> Vec { - let mut output = Cursor::new(Vec::new()); - { - let mut test_writer = TrackClosed::new( - (&mut output) - .limited_write(limit) - .interleave_pending_write(), - ); - { - let mut writer = create_writer(&mut test_writer); - for chunk in input { - block_on(writer.write_all(chunk)).unwrap(); - block_on(writer.flush()).unwrap(); - } - block_on(writer.shutdown()).unwrap(); - } - assert!(test_writer.is_closed()); - } - output.into_inner() - } - } -} - #[cfg(feature = "tokio")] pub mod tokio { pub mod bufread { diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 0f6de461..6b315174 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -3,8 +3,6 @@ mod input_stream; #[cfg(feature = "tokio-02")] mod tokio_02_ext; -#[cfg(feature = "tokio-03")] -mod tokio_03_ext; #[cfg(feature = "tokio")] mod tokio_ext; mod track_closed; diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index fc61d78a..73ef4827 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -697,9 +697,6 @@ macro_rules! test_cases { #[cfg(feature = "tokio-02")] io_test_cases!(tokio_02, $variant); - #[cfg(feature = "tokio-03")] - io_test_cases!(tokio_03, $variant); - #[cfg(feature = "tokio")] io_test_cases!(tokio, $variant); } diff --git a/tests/utils/tokio_03_ext/copy_buf.rs b/tests/utils/tokio_03_ext/copy_buf.rs deleted file mode 100644 index d144fd19..00000000 --- a/tests/utils/tokio_03_ext/copy_buf.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use futures::ready; -use tokio_03::io::{AsyncBufRead, AsyncWrite}; - -pub fn copy_buf(reader: R, writer: &mut W) -> CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - CopyBuf { - reader, - writer, - amt: 0, - } -} - -#[derive(Debug)] -pub struct CopyBuf<'a, R, W: ?Sized> { - reader: R, - writer: &'a mut W, - amt: u64, -} - -impl Future for CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - type Output = std::io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - let buffer = ready!(Pin::new(&mut this.reader).poll_fill_buf(cx))?; - if buffer.is_empty() { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; - if i == 0 { - return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); - } - this.amt += i as u64; - Pin::new(&mut this.reader).consume(i); - } - } -} diff --git a/tests/utils/tokio_03_ext/interleave_pending.rs b/tests/utils/tokio_03_ext/interleave_pending.rs deleted file mode 100644 index 4948b6e0..00000000 --- a/tests/utils/tokio_03_ext/interleave_pending.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -pub struct InterleavePending { - inner: T, - pended: bool, -} - -impl InterleavePending { - pub(crate) fn new(inner: T) -> Self { - Self { - inner, - pended: false, - } - } -} - -impl tokio_03::io::AsyncWrite for InterleavePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_write(cx, buf); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_flush(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_shutdown(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } -} diff --git a/tests/utils/tokio_03_ext/limited.rs b/tests/utils/tokio_03_ext/limited.rs deleted file mode 100644 index b943f470..00000000 --- a/tests/utils/tokio_03_ext/limited.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -#[derive(Debug)] -pub struct Limited { - io: Io, - limit: usize, -} - -impl Limited { - pub(crate) fn new(io: Io, limit: usize) -> Limited { - Limited { io, limit } - } -} - -impl tokio_03::io::AsyncWrite for Limited { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let limit = self.limit; - Pin::new(&mut self.io).poll_write(cx, &buf[..std::cmp::min(limit, buf.len())]) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_shutdown(cx) - } -} diff --git a/tests/utils/tokio_03_ext/mod.rs b/tests/utils/tokio_03_ext/mod.rs deleted file mode 100644 index 73d8f414..00000000 --- a/tests/utils/tokio_03_ext/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -mod copy_buf; -mod interleave_pending; -mod limited; - -pub use copy_buf::copy_buf; - -pub trait AsyncWriteTestExt: tokio_03::io::AsyncWrite { - fn interleave_pending_write(self) -> interleave_pending::InterleavePending - where - Self: Sized + Unpin, - { - interleave_pending::InterleavePending::new(self) - } - - fn limited_write(self, limit: usize) -> limited::Limited - where - Self: Sized + Unpin, - { - limited::Limited::new(self, limit) - } -} - -impl AsyncWriteTestExt for T {} diff --git a/tests/utils/track_closed.rs b/tests/utils/track_closed.rs index 081f3831..b3e5af03 100644 --- a/tests/utils/track_closed.rs +++ b/tests/utils/track_closed.rs @@ -80,30 +80,6 @@ impl tokio_02::io::AsyncWrite for TrackClos } } -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncWrite for TrackClosed { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - match Pin::new(&mut self.inner).poll_shutdown(cx) { - Poll::Ready(Ok(())) => { - self.closed = true; - Poll::Ready(Ok(())) - } - other => other, - } - } -} - #[cfg(feature = "tokio")] impl tokio::io::AsyncWrite for TrackClosed { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { diff --git a/tests/utils/track_eof.rs b/tests/utils/track_eof.rs index d2101d13..e8483d3b 100644 --- a/tests/utils/track_eof.rs +++ b/tests/utils/track_eof.rs @@ -97,49 +97,6 @@ impl tokio_02::io::AsyncBufRead for Track } } -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncRead for TrackEof { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut tokio_03::io::ReadBuf, - ) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - let len = buf.filled().len(); - match inner.poll_read(cx, buf) { - Poll::Ready(Ok(())) => { - if buf.filled().len() == len && buf.remaining() > 0 { - *eof = true; - } - Poll::Ready(Ok(())) - } - other => other, - } - } -} - -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncBufRead for TrackEof { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - match inner.poll_fill_buf(cx) { - Poll::Ready(Ok(buf)) => { - if buf.is_empty() { - *eof = true; - } - Poll::Ready(Ok(buf)) - } - other => other, - } - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().0.consume(amt) - } -} - #[cfg(feature = "tokio")] impl tokio::io::AsyncRead for TrackEof { fn poll_read(