diff --git a/crates/async-compression/src/futures/bufread/generic/decoder.rs b/crates/async-compression/src/futures/bufread/generic/decoder.rs index 7bce212..802eaa5 100644 --- a/crates/async-compression/src/futures/bufread/generic/decoder.rs +++ b/crates/async-compression/src/futures/bufread/generic/decoder.rs @@ -1,153 +1,14 @@ -use crate::codecs::Decode; -use crate::core::util::PartialBuffer; +use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_do_poll_read}; use core::{ pin::Pin, task::{Context, Poll}, }; -use futures_core::ready; -use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; -use pin_project_lite::pin_project; use std::io::{IoSlice, Result}; -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - state: State, - multiple_members: bool, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } -} - -impl Decoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } -} - -impl Decoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - let mut first = true; - - loop { - *this.state = match this.state { - State::Decoding => { - let input = if first { - &[][..] - } else { - ready!(this.reader.as_mut().poll_fill_buf(cx))? - }; - - if input.is_empty() && !first { - // Avoid attempting to reinitialise the decoder if the - // reader has returned EOF. - *this.multiple_members = false; - - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let res = this.decoder.decode(&mut input, output).or_else(|err| { - // ignore the first error, occurs when input is empty - // but we need to run decode to flush - if first { - Ok(false) - } else { - Err(err) - } - }); - - if !first { - let len = input.written().len(); - this.reader.as_mut().consume(len); - } - - first = false; - - if res? { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} +impl_do_poll_read!(); impl AsyncRead for Decoder { fn poll_read( diff --git a/crates/async-compression/src/generic/bufread/decoder.rs b/crates/async-compression/src/generic/bufread/decoder.rs new file mode 100644 index 0000000..be45625 --- /dev/null +++ b/crates/async-compression/src/generic/bufread/decoder.rs @@ -0,0 +1,198 @@ +use crate::codecs::Decode; +use crate::core::util::PartialBuffer; + +use std::{io::Result, ops::ControlFlow}; + +#[derive(Debug)] +enum State { + Decoding, + Flushing, + Done, + Next, +} + +#[derive(Debug)] +pub struct Decoder { + state: State, + multiple_members: bool, +} + +impl Default for Decoder { + fn default() -> Self { + Self { + state: State::Decoding, + multiple_members: false, + } + } +} + +impl Decoder { + pub fn multiple_members(&mut self, enabled: bool) { + self.multiple_members = enabled; + } + + pub fn do_poll_read( + &mut self, + output: &mut PartialBuffer<&mut [u8]>, + decoder: &mut D, + input: &mut PartialBuffer<&[u8]>, + mut first: bool, + ) -> ControlFlow> { + loop { + self.state = match self.state { + State::Decoding => { + if input.unwritten().is_empty() && !first { + // Avoid attempting to reinitialise the decoder if the + // reader has returned EOF. + self.multiple_members = false; + + State::Flushing + } else { + match decoder.decode(input, output) { + Ok(true) => State::Flushing, + // ignore the first error, occurs when input is empty + // but we need to run decode to flush + Err(err) if !first => return ControlFlow::Break(Err(err)), + // poll for more data for the next decode + _ => break, + } + } + } + + State::Flushing => { + match decoder.finish(output) { + Ok(true) => { + if self.multiple_members { + if let Err(err) = decoder.reinit() { + return ControlFlow::Break(Err(err)); + } + + // The decode stage might consume all the input, + // the next stage might need to poll again if it's empty. + first = true; + State::Next + } else { + State::Done + } + } + Ok(false) => State::Flushing, + Err(err) => return ControlFlow::Break(Err(err)), + } + } + + State::Done => return ControlFlow::Break(Ok(())), + + State::Next => { + if input.unwritten().is_empty() { + if first { + // poll for more data to check if there's another stream + break; + } + State::Done + } else { + State::Decoding + } + } + }; + + if output.unwritten().is_empty() { + return ControlFlow::Break(Ok(())); + } + } + + if output.unwritten().is_empty() { + ControlFlow::Break(Ok(())) + } else { + ControlFlow::Continue(()) + } + } +} + +macro_rules! impl_do_poll_read { + () => { + use crate::generic::bufread::Decoder as GenericDecoder; + + use std::ops::ControlFlow; + + use futures_core::ready; + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + pub struct Decoder { + #[pin] + reader: R, + decoder: D, + inner: GenericDecoder, + } + } + + impl Decoder { + pub fn new(reader: R, decoder: D) -> Self { + Self { + reader, + decoder, + inner: GenericDecoder::default(), + } + } + } + + impl Decoder { + pub fn get_ref(&self) -> &R { + &self.reader + } + + pub fn get_mut(&mut self) -> &mut R { + &mut self.reader + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().reader + } + + pub fn into_inner(self) -> R { + self.reader + } + + pub fn multiple_members(&mut self, enabled: bool) { + self.inner.multiple_members(enabled); + } + } + + impl Decoder { + fn do_poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + output: &mut PartialBuffer<&mut [u8]>, + ) -> Poll> { + let mut this = self.project(); + + if let ControlFlow::Break(res) = this.inner.do_poll_read( + output, + this.decoder, + &mut PartialBuffer::new(&[][..]), + true, + ) { + return Poll::Ready(res); + } + + loop { + let mut input = + PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?); + + let control_flow = + this.inner + .do_poll_read(output, this.decoder, &mut input, false); + + let bytes_read = input.written().len(); + this.reader.as_mut().consume(bytes_read); + + if let ControlFlow::Break(res) = control_flow { + break Poll::Ready(res); + } + } + } + } + }; +} +pub(crate) use impl_do_poll_read; diff --git a/crates/async-compression/src/generic/bufread/mod.rs b/crates/async-compression/src/generic/bufread/mod.rs new file mode 100644 index 0000000..83ce047 --- /dev/null +++ b/crates/async-compression/src/generic/bufread/mod.rs @@ -0,0 +1,3 @@ +mod decoder; + +pub(crate) use decoder::*; diff --git a/crates/async-compression/src/generic/mod.rs b/crates/async-compression/src/generic/mod.rs new file mode 100644 index 0000000..b480f00 --- /dev/null +++ b/crates/async-compression/src/generic/mod.rs @@ -0,0 +1 @@ +pub(crate) mod bufread; diff --git a/crates/async-compression/src/lib.rs b/crates/async-compression/src/lib.rs index ad64425..d4efbfd 100644 --- a/crates/async-compression/src/lib.rs +++ b/crates/async-compression/src/lib.rs @@ -149,6 +149,9 @@ #[macro_use] mod macros; +/// Generic, async runtime agonistc implementation of en/decoders +mod generic; + #[cfg(feature = "futures-io")] pub mod futures; #[cfg(feature = "tokio")] diff --git a/crates/async-compression/src/tokio/bufread/generic/decoder.rs b/crates/async-compression/src/tokio/bufread/generic/decoder.rs index f93b2bf..1a00792 100644 --- a/crates/async-compression/src/tokio/bufread/generic/decoder.rs +++ b/crates/async-compression/src/tokio/bufread/generic/decoder.rs @@ -1,152 +1,14 @@ -use crate::codecs::Decode; -use crate::core::util::PartialBuffer; +use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_do_poll_read}; + use core::{ pin::Pin, task::{Context, Poll}, }; -use futures_core::ready; -use pin_project_lite::pin_project; use std::io::{IoSlice, Result}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; - -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - state: State, - multiple_members: bool, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } -} -impl Decoder { - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } -} - -impl Decoder { - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - let mut first = true; - - loop { - *this.state = match this.state { - State::Decoding => { - let input = if first { - &[][..] - } else { - ready!(this.reader.as_mut().poll_fill_buf(cx))? - }; - - if input.is_empty() && !first { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let res = this.decoder.decode(&mut input, output).or_else(|err| { - // ignore the first error, occurs when input is empty - // but we need to run decode to flush - if first { - Ok(false) - } else { - Err(err) - } - }); - - if !first { - let len = input.written().len(); - this.reader.as_mut().consume(len); - } - - first = false; - - if res? { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} +impl_do_poll_read!(); impl AsyncRead for Decoder { fn poll_read(