From bf4e41589701b96ad458645b18e06f3b4ec1f3b3 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Thu, 2 Oct 2025 15:28:08 +0000 Subject: [PATCH 1/2] Deduplicate `generic::bufread::Decoder` impl Ref #384 --- .../src/futures/bufread/generic/decoder.rs | 109 +++------------ .../src/generic/bufread/decoder.rs | 126 ++++++++++++++++++ .../src/generic/bufread/mod.rs | 3 + crates/async-compression/src/generic/mod.rs | 1 + crates/async-compression/src/lib.rs | 3 + .../src/tokio/bufread/generic/decoder.rs | 100 +++----------- 6 files changed, 171 insertions(+), 171 deletions(-) create mode 100644 crates/async-compression/src/generic/bufread/decoder.rs create mode 100644 crates/async-compression/src/generic/bufread/mod.rs create mode 100644 crates/async-compression/src/generic/mod.rs diff --git a/crates/async-compression/src/futures/bufread/generic/decoder.rs b/crates/async-compression/src/futures/bufread/generic/decoder.rs index 7bce212..c31ba72 100644 --- a/crates/async-compression/src/futures/bufread/generic/decoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/decoder.rs @@ -1,22 +1,16 @@ use crate::codecs::Decode; use crate::core::util::PartialBuffer; +use crate::generic::bufread::{AsyncBufRead as GenericAsyncBufRead, Decoder as GenericDecoder}; use core::{ pin::Pin, task::{Context, Poll}, }; +use std::io::{IoSlice, Result}; + use futures_core::ready; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; use pin_project_lite::pin_project; -use std::io::{IoSlice, Result}; - -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} pin_project! { #[derive(Debug)] @@ -24,8 +18,7 @@ pin_project! { #[pin] reader: R, decoder: D, - state: State, - multiple_members: bool, + inner: GenericDecoder, } } @@ -34,8 +27,7 @@ impl Decoder { Self { reader, decoder, - state: State::Decoding, - multiple_members: false, + inner: GenericDecoder::default(), } } } @@ -58,94 +50,31 @@ impl Decoder { } pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; + self.inner.multiple_members(enabled); } } impl Decoder { - fn do_poll_read( + pub(crate) fn do_poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, output: &mut PartialBuffer<&mut [u8]>, ) -> Poll> { - let mut this = self.project(); - - let mut first = true; - - loop { - *this.state = match this.state { - State::Decoding => { - let input = if first { - &[][..] - } else { - ready!(this.reader.as_mut().poll_fill_buf(cx))? - }; - - if input.is_empty() && !first { - // Avoid attempting to reinitialise the decoder if the - // reader has returned EOF. - *this.multiple_members = false; - - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let res = this.decoder.decode(&mut input, output).or_else(|err| { - // ignore the first error, occurs when input is empty - // but we need to run decode to flush - if first { - Ok(false) - } else { - Err(err) - } - }); - - if !first { - let len = input.written().len(); - this.reader.as_mut().consume(len); - } - - first = false; - - if res? { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); + let this = self.project(); + + struct Reader<'a, R>(Pin<&'a mut R>); + + impl GenericAsyncBufRead for Reader<'_, R> { + fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_fill_buf(cx) } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); + fn consume(&mut self, bytes: usize) { + self.0.as_mut().consume(bytes) } } + + this.inner + .do_poll_read(cx, output, &mut Reader(this.reader), this.decoder) } } diff --git a/crates/async-compression/src/generic/bufread/decoder.rs b/crates/async-compression/src/generic/bufread/decoder.rs new file mode 100644 index 0000000..49520d9 --- /dev/null +++ b/crates/async-compression/src/generic/bufread/decoder.rs @@ -0,0 +1,126 @@ +use crate::codecs::Decode; +use crate::core::util::PartialBuffer; + +use core::task::{Context, Poll}; +use std::io::Result; + +use futures_core::ready; + +#[derive(Debug)] +enum State { + Decoding, + Flushing, + Done, + Next, +} + +pub(crate) trait AsyncBufRead { + fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll>; + fn consume(&mut self, bytes: usize); +} + +#[derive(Debug)] +pub struct Decoder { + state: State, + multiple_members: bool, +} + +impl Default for Decoder { + fn default() -> Self { + Self { + state: State::Decoding, + multiple_members: false, + } + } +} + +impl Decoder { + pub fn multiple_members(&mut self, enabled: bool) { + self.multiple_members = enabled; + } + + pub fn do_poll_read( + &mut self, + cx: &mut Context<'_>, + output: &mut PartialBuffer<&mut [u8]>, + reader: &mut dyn AsyncBufRead, + decoder: &mut D, + ) -> Poll> { + let mut first = true; + + loop { + self.state = match self.state { + State::Decoding => { + let input = if first { + &[][..] + } else { + ready!(reader.poll_fill_buf(cx))? + }; + + if input.is_empty() && !first { + // Avoid attempting to reinitialise the decoder if the + // reader has returned EOF. + self.multiple_members = false; + + State::Flushing + } else { + let mut input = PartialBuffer::new(input); + let res = decoder.decode(&mut input, output).or_else(|err| { + // ignore the first error, occurs when input is empty + // but we need to run decode to flush + if first { + Ok(false) + } else { + Err(err) + } + }); + + if !first { + let len = input.written().len(); + reader.consume(len); + } + + first = false; + + if res? { + State::Flushing + } else { + State::Decoding + } + } + } + + State::Flushing => { + if decoder.finish(output)? { + if self.multiple_members { + decoder.reinit()?; + State::Next + } else { + State::Done + } + } else { + State::Flushing + } + } + + State::Done => State::Done, + + State::Next => { + let input = ready!(reader.poll_fill_buf(cx))?; + if input.is_empty() { + State::Done + } else { + State::Decoding + } + } + }; + + if let State::Done = self.state { + return Poll::Ready(Ok(())); + } + if output.unwritten().is_empty() { + return Poll::Ready(Ok(())); + } + } + } +} diff --git a/crates/async-compression/src/generic/bufread/mod.rs b/crates/async-compression/src/generic/bufread/mod.rs new file mode 100644 index 0000000..83ce047 --- /dev/null +++ b/crates/async-compression/src/generic/bufread/mod.rs @@ -0,0 +1,3 @@ +mod decoder; + +pub(crate) use decoder::*; diff --git a/crates/async-compression/src/generic/mod.rs b/crates/async-compression/src/generic/mod.rs new file mode 100644 index 0000000..b480f00 --- /dev/null +++ b/crates/async-compression/src/generic/mod.rs @@ -0,0 +1 @@ +pub(crate) mod bufread; diff --git a/crates/async-compression/src/lib.rs b/crates/async-compression/src/lib.rs index ad64425..d4efbfd 100644 --- a/crates/async-compression/src/lib.rs +++ b/crates/async-compression/src/lib.rs @@ -149,6 +149,9 @@ #[macro_use] mod macros; +/// Generic, async runtime agonistc implementation of en/decoders +mod generic; + #[cfg(feature = "futures-io")] pub mod futures; #[cfg(feature = "tokio")] diff --git a/crates/async-compression/src/tokio/bufread/generic/decoder.rs b/crates/async-compression/src/tokio/bufread/generic/decoder.rs index f93b2bf..89f0984 100644 --- a/crates/async-compression/src/tokio/bufread/generic/decoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/decoder.rs @@ -1,12 +1,15 @@ use crate::codecs::Decode; use crate::core::util::PartialBuffer; +use crate::generic::bufread::{AsyncBufRead as GenericAsyncBufRead, Decoder as GenericDecoder}; + use core::{ pin::Pin, task::{Context, Poll}, }; +use std::io::{IoSlice, Result}; + use futures_core::ready; use pin_project_lite::pin_project; -use std::io::{IoSlice, Result}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] @@ -23,8 +26,7 @@ pin_project! { #[pin] reader: R, decoder: D, - state: State, - multiple_members: bool, + inner: GenericDecoder, } } @@ -33,8 +35,7 @@ impl Decoder { Self { reader, decoder, - state: State::Decoding, - multiple_members: false, + inner: GenericDecoder::default(), } } } @@ -57,7 +58,7 @@ impl Decoder { } pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; + self.inner.multiple_members(enabled); } } @@ -67,84 +68,21 @@ impl Decoder { cx: &mut Context<'_>, output: &mut PartialBuffer<&mut [u8]>, ) -> Poll> { - let mut this = self.project(); - - let mut first = true; - - loop { - *this.state = match this.state { - State::Decoding => { - let input = if first { - &[][..] - } else { - ready!(this.reader.as_mut().poll_fill_buf(cx))? - }; - - if input.is_empty() && !first { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let res = this.decoder.decode(&mut input, output).or_else(|err| { - // ignore the first error, occurs when input is empty - // but we need to run decode to flush - if first { - Ok(false) - } else { - Err(err) - } - }); - - if !first { - let len = input.written().len(); - this.reader.as_mut().consume(len); - } - - first = false; - - if res? { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); + let this = self.project(); + + struct Reader<'a, R>(Pin<&'a mut R>); + + impl GenericAsyncBufRead for Reader<'_, R> { + fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_fill_buf(cx) } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); + fn consume(&mut self, bytes: usize) { + self.0.as_mut().consume(bytes) } } + + this.inner + .do_poll_read(cx, output, &mut Reader(this.reader), this.decoder) } } From 2c24af2bf5ab611a11315223117afa5acd25c320 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Sun, 5 Oct 2025 13:34:37 +0000 Subject: [PATCH 2/2] Use `ControlFlow` style API design to avoid trait object --- .../src/futures/bufread/generic/decoder.rs | 72 +------ .../src/generic/bufread/decoder.rs | 184 ++++++++++++------ .../src/tokio/bufread/generic/decoder.rs | 80 +------- 3 files changed, 132 insertions(+), 204 deletions(-) diff --git a/crates/async-compression/src/futures/bufread/generic/decoder.rs b/crates/async-compression/src/futures/bufread/generic/decoder.rs index c31ba72..802eaa5 100644 --- a/crates/async-compression/src/futures/bufread/generic/decoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/decoder.rs @@ -1,6 +1,4 @@ -use crate::codecs::Decode; -use crate::core::util::PartialBuffer; -use crate::generic::bufread::{AsyncBufRead as GenericAsyncBufRead, Decoder as GenericDecoder}; +use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_do_poll_read}; use core::{ pin::Pin, @@ -8,75 +6,9 @@ use core::{ }; use std::io::{IoSlice, Result}; -use futures_core::ready; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; -use pin_project_lite::pin_project; -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - inner: GenericDecoder, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - inner: GenericDecoder::default(), - } - } -} - -impl Decoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } -} - -impl Decoder { - pub(crate) fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let this = self.project(); - - struct Reader<'a, R>(Pin<&'a mut R>); - - impl GenericAsyncBufRead for Reader<'_, R> { - fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.as_mut().poll_fill_buf(cx) - } - fn consume(&mut self, bytes: usize) { - self.0.as_mut().consume(bytes) - } - } - - this.inner - .do_poll_read(cx, output, &mut Reader(this.reader), this.decoder) - } -} +impl_do_poll_read!(); impl AsyncRead for Decoder { fn poll_read( diff --git a/crates/async-compression/src/generic/bufread/decoder.rs b/crates/async-compression/src/generic/bufread/decoder.rs index 49520d9..be45625 100644 --- a/crates/async-compression/src/generic/bufread/decoder.rs +++ b/crates/async-compression/src/generic/bufread/decoder.rs @@ -1,10 +1,7 @@ use crate::codecs::Decode; use crate::core::util::PartialBuffer; -use core::task::{Context, Poll}; -use std::io::Result; - -use futures_core::ready; +use std::{io::Result, ops::ControlFlow}; #[derive(Debug)] enum State { @@ -14,11 +11,6 @@ enum State { Next, } -pub(crate) trait AsyncBufRead { - fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll>; - fn consume(&mut self, bytes: usize); -} - #[derive(Debug)] pub struct Decoder { state: State, @@ -41,73 +33,61 @@ impl Decoder { pub fn do_poll_read( &mut self, - cx: &mut Context<'_>, output: &mut PartialBuffer<&mut [u8]>, - reader: &mut dyn AsyncBufRead, decoder: &mut D, - ) -> Poll> { - let mut first = true; - + input: &mut PartialBuffer<&[u8]>, + mut first: bool, + ) -> ControlFlow> { loop { self.state = match self.state { State::Decoding => { - let input = if first { - &[][..] - } else { - ready!(reader.poll_fill_buf(cx))? - }; - - if input.is_empty() && !first { + if input.unwritten().is_empty() && !first { // Avoid attempting to reinitialise the decoder if the // reader has returned EOF. self.multiple_members = false; State::Flushing } else { - let mut input = PartialBuffer::new(input); - let res = decoder.decode(&mut input, output).or_else(|err| { + match decoder.decode(input, output) { + Ok(true) => State::Flushing, // ignore the first error, occurs when input is empty // but we need to run decode to flush - if first { - Ok(false) - } else { - Err(err) - } - }); - - if !first { - let len = input.written().len(); - reader.consume(len); - } - - first = false; - - if res? { - State::Flushing - } else { - State::Decoding + Err(err) if !first => return ControlFlow::Break(Err(err)), + // poll for more data for the next decode + _ => break, } } } State::Flushing => { - if decoder.finish(output)? { - if self.multiple_members { - decoder.reinit()?; - State::Next - } else { - State::Done + match decoder.finish(output) { + Ok(true) => { + if self.multiple_members { + if let Err(err) = decoder.reinit() { + return ControlFlow::Break(Err(err)); + } + + // The decode stage might consume all the input, + // the next stage might need to poll again if it's empty. + first = true; + State::Next + } else { + State::Done + } } - } else { - State::Flushing + Ok(false) => State::Flushing, + Err(err) => return ControlFlow::Break(Err(err)), } } - State::Done => State::Done, + State::Done => return ControlFlow::Break(Ok(())), State::Next => { - let input = ready!(reader.poll_fill_buf(cx))?; - if input.is_empty() { + if input.unwritten().is_empty() { + if first { + // poll for more data to check if there's another stream + break; + } State::Done } else { State::Decoding @@ -115,12 +95,104 @@ impl Decoder { } }; - if let State::Done = self.state { - return Poll::Ready(Ok(())); - } if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); + return ControlFlow::Break(Ok(())); } } + + if output.unwritten().is_empty() { + ControlFlow::Break(Ok(())) + } else { + ControlFlow::Continue(()) + } } } + +macro_rules! impl_do_poll_read { + () => { + use crate::generic::bufread::Decoder as GenericDecoder; + + use std::ops::ControlFlow; + + use futures_core::ready; + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + pub struct Decoder { + #[pin] + reader: R, + decoder: D, + inner: GenericDecoder, + } + } + + impl Decoder { + pub fn new(reader: R, decoder: D) -> Self { + Self { + reader, + decoder, + inner: GenericDecoder::default(), + } + } + } + + impl Decoder { + pub fn get_ref(&self) -> &R { + &self.reader + } + + pub fn get_mut(&mut self) -> &mut R { + &mut self.reader + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().reader + } + + pub fn into_inner(self) -> R { + self.reader + } + + pub fn multiple_members(&mut self, enabled: bool) { + self.inner.multiple_members(enabled); + } + } + + impl Decoder { + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output: &mut PartialBuffer<&mut [u8]>, + ) -> Poll> { + let mut this = self.project(); + + if let ControlFlow::Break(res) = this.inner.do_poll_read( + output, + this.decoder, + &mut PartialBuffer::new(&[][..]), + true, + ) { + return Poll::Ready(res); + } + + loop { + let mut input = + PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?); + + let control_flow = + this.inner + .do_poll_read(output, this.decoder, &mut input, false); + + let bytes_read = input.written().len(); + this.reader.as_mut().consume(bytes_read); + + if let ControlFlow::Break(res) = control_flow { + break Poll::Ready(res); + } + } + } + } + }; +} +pub(crate) use impl_do_poll_read; diff --git a/crates/async-compression/src/tokio/bufread/generic/decoder.rs b/crates/async-compression/src/tokio/bufread/generic/decoder.rs index 89f0984..1a00792 100644 --- a/crates/async-compression/src/tokio/bufread/generic/decoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/decoder.rs @@ -1,6 +1,4 @@ -use crate::codecs::Decode; -use crate::core::util::PartialBuffer; -use crate::generic::bufread::{AsyncBufRead as GenericAsyncBufRead, Decoder as GenericDecoder}; +use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_do_poll_read}; use core::{ pin::Pin, @@ -8,83 +6,9 @@ use core::{ }; use std::io::{IoSlice, Result}; -use futures_core::ready; -use pin_project_lite::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - inner: GenericDecoder, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - inner: GenericDecoder::default(), - } - } -} - -impl Decoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } -} - -impl Decoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let this = self.project(); - - struct Reader<'a, R>(Pin<&'a mut R>); - - impl GenericAsyncBufRead for Reader<'_, R> { - fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.as_mut().poll_fill_buf(cx) - } - fn consume(&mut self, bytes: usize) { - self.0.as_mut().consume(bytes) - } - } - - this.inner - .do_poll_read(cx, output, &mut Reader(this.reader), this.decoder) - } -} +impl_do_poll_read!(); impl AsyncRead for Decoder { fn poll_read(