From 5113024ce5483176da67ace9909a510e3f384aa8 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Tue, 4 Nov 2025 08:54:19 +0000 Subject: [PATCH 1/3] Dedup `write::Decoder::poll_write` --- .../src/generic/write/decoder.rs | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/crates/async-compression/src/generic/write/decoder.rs b/crates/async-compression/src/generic/write/decoder.rs index 757ad475..7c2353c5 100644 --- a/crates/async-compression/src/generic/write/decoder.rs +++ b/crates/async-compression/src/generic/write/decoder.rs @@ -27,7 +27,7 @@ impl Default for Decoder { } impl Decoder { - pub fn do_poll_write( + fn do_poll_write( &mut self, cx: &mut Context<'_>, input: &mut PartialBuffer<&[u8]>, @@ -73,6 +73,25 @@ impl Decoder { } } + pub fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + writer: Pin<&mut dyn AsyncBufWrite>, + decoder: &mut impl Decode, + ) -> Poll> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + let mut input = PartialBuffer::new(buf); + + match self.do_poll_write(cx, &mut input, writer, decoder)? { + Poll::Pending if input.written().is_empty() => Poll::Pending, + _ => Poll::Ready(Ok(input.written().len())), + } + } + pub fn do_poll_flush( &mut self, cx: &mut Context<'_>, @@ -169,17 +188,6 @@ macro_rules! impl_decoder { } impl Decoder { - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - this.inner - .do_poll_write(cx, input, this.writer, this.decoder) - } - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); @@ -193,16 +201,9 @@ macro_rules! impl_decoder { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); + let mut this = self.project(); - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } + this.inner.poll_write(cx, buf, this.writer, this.decoder) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { From 3d3b52fd6d9f92cd600e1218a79f94786222970c Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Tue, 4 Nov 2025 14:44:10 +0000 Subject: [PATCH 2/3] Dedup `write::Encoder` impl --- .../src/futures/write/generic/encoder.rs | 190 +------------- .../src/generic/write/encoder.rs | 237 ++++++++++++++++++ .../src/generic/write/mod.rs | 2 + .../src/tokio/write/generic/encoder.rs | 204 +-------------- 4 files changed, 244 insertions(+), 389 deletions(-) create mode 100644 crates/async-compression/src/generic/write/encoder.rs diff --git a/crates/async-compression/src/futures/write/generic/encoder.rs b/crates/async-compression/src/futures/write/generic/encoder.rs index 4d7c4489..98bb7bda 100644 --- a/crates/async-compression/src/futures/write/generic/encoder.rs +++ b/crates/async-compression/src/futures/write/generic/encoder.rs @@ -1,186 +1,12 @@ +use crate::{futures::write::BufWriter, generic::write::impl_encoder}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSliceMut}; use std::{ io, pin::Pin, task::{Context, Poll}, }; -use crate::codecs::Encode; -use crate::core::util::PartialBuffer; -use crate::futures::write::{AsyncBufWrite, BufWriter}; -use futures_core::ready; -use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSliceMut}; -use pin_project_lite::pin_project; - -#[derive(Debug)] -enum State { - Encoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - writer: BufWriter, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub(crate) fn get_encoder_ref(&self) -> &E { - &self.encoder - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } -} - -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - - pub fn with_capacity(writer: W, encoder: E, cap: usize) -> Self { - Self { - writer: BufWriter::with_capacity(cap, writer), - encoder, - state: State::Encoding, - } - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding => { - this.encoder.encode(input, &mut output)?; - State::Encoding - } - - State::Finishing | State::Done => { - return Poll::Ready(Err(io::Error::other("Write after close"))) - } - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let done = match this.state { - State::Encoding => this.encoder.flush(&mut output)?, - - State::Finishing | State::Done => { - return Poll::Ready(Err(io::Error::other("Flush after close"))) - } - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding | State::Finishing => { - if this.encoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Encoder { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_close(cx))?; - ready!(self.project().writer.as_mut().poll_close(cx))?; - Poll::Ready(Ok(())) - } -} +impl_encoder!(poll_close); impl AsyncRead for Encoder { fn poll_read( @@ -199,13 +25,3 @@ impl AsyncRead for Encoder { self.get_pin_mut().poll_read_vectored(cx, bufs) } } - -impl AsyncBufRead for Encoder { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_pin_mut().consume(amt) - } -} diff --git a/crates/async-compression/src/generic/write/encoder.rs b/crates/async-compression/src/generic/write/encoder.rs new file mode 100644 index 00000000..faac492a --- /dev/null +++ b/crates/async-compression/src/generic/write/encoder.rs @@ -0,0 +1,237 @@ +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::{codecs::Encode, core::util::PartialBuffer, generic::write::AsyncBufWrite}; +use futures_core::ready; + +#[derive(Debug)] +enum State { + Encoding, + Finishing, + Done, +} + +#[derive(Debug)] +pub struct Encoder { + state: State, +} + +impl Default for Encoder { + fn default() -> Self { + Self { + state: State::Encoding, + } + } +} + +impl Encoder { + fn do_poll_write( + &mut self, + cx: &mut Context<'_>, + input: &mut PartialBuffer<&[u8]>, + mut writer: Pin<&mut dyn AsyncBufWrite>, + encoder: &mut impl Encode, + ) -> Poll> { + loop { + let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; + let mut output = PartialBuffer::new(output); + + self.state = match self.state { + State::Encoding => { + encoder.encode(input, &mut output)?; + State::Encoding + } + + State::Finishing | State::Done => { + break Poll::Ready(Err(io::Error::other("Write after close"))) + } + }; + + let produced = output.written().len(); + writer.as_mut().produce(produced); + + if input.unwritten().is_empty() { + break Poll::Ready(Ok(())); + } + } + } + + pub fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + writer: Pin<&mut dyn AsyncBufWrite>, + encoder: &mut impl Encode, + ) -> Poll> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + let mut input = PartialBuffer::new(buf); + + match self.do_poll_write(cx, &mut input, writer, encoder)? { + Poll::Pending if input.written().is_empty() => Poll::Pending, + _ => Poll::Ready(Ok(input.written().len())), + } + } + + pub fn do_poll_flush( + &mut self, + cx: &mut Context<'_>, + mut writer: Pin<&mut dyn AsyncBufWrite>, + encoder: &mut impl Encode, + ) -> Poll> { + loop { + let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; + let mut output = PartialBuffer::new(output); + + let done = match self.state { + State::Encoding => encoder.flush(&mut output)?, + + State::Finishing | State::Done => { + break Poll::Ready(Err(io::Error::other("Flush after close"))) + } + }; + + let produced = output.written().len(); + writer.as_mut().produce(produced); + + if done { + break Poll::Ready(Ok(())); + } + } + } + + pub fn do_poll_close( + &mut self, + cx: &mut Context<'_>, + mut writer: Pin<&mut dyn AsyncBufWrite>, + encoder: &mut impl Encode, + ) -> Poll> { + loop { + let output = ready!(writer.as_mut().poll_partial_flush_buf(cx))?; + let mut output = PartialBuffer::new(output); + + self.state = match self.state { + State::Encoding | State::Finishing => { + if encoder.finish(&mut output)? { + State::Done + } else { + State::Finishing + } + } + + State::Done => State::Done, + }; + + let produced = output.written().len(); + writer.as_mut().produce(produced); + + if let State::Done = self.state { + break Poll::Ready(Ok(())); + } + } + } +} + +macro_rules! impl_encoder { + ($poll_close: tt) => { + use crate::{codecs::Encode, generic::write::Encoder as GenericEncoder}; + use futures_core::ready; + use pin_project_lite::pin_project; + + pin_project! { + #[derive(Debug)] + pub struct Encoder { + #[pin] + writer: BufWriter, + encoder: E, + inner: GenericEncoder, + } + } + + impl Encoder { + pub fn get_ref(&self) -> &W { + self.writer.get_ref() + } + + pub fn get_mut(&mut self) -> &mut W { + self.writer.get_mut() + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.project().writer.get_pin_mut() + } + + pub(crate) fn get_encoder_ref(&self) -> &E { + &self.encoder + } + + pub fn into_inner(self) -> W { + self.writer.into_inner() + } + + pub fn new(writer: W, encoder: E) -> Self { + Self { + writer: BufWriter::new(writer), + encoder, + inner: Default::default(), + } + } + + pub fn with_capacity(writer: W, encoder: E, cap: usize) -> Self { + Self { + writer: BufWriter::with_capacity(cap, writer), + encoder, + inner: Default::default(), + } + } + } + + impl AsyncWrite for Encoder { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + this.inner.poll_write(cx, buf, this.writer, this.encoder) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + ready!(this + .inner + .do_poll_flush(cx, this.writer.as_mut(), this.encoder))?; + this.writer.poll_flush(cx) + } + + fn $poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + ready!(this + .inner + .do_poll_close(cx, this.writer.as_mut(), this.encoder))?; + this.writer.$poll_close(cx) + } + } + + impl AsyncBufRead for Encoder { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.get_pin_mut().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.get_pin_mut().consume(amt) + } + } + }; +} +pub(crate) use impl_encoder; diff --git a/crates/async-compression/src/generic/write/mod.rs b/crates/async-compression/src/generic/write/mod.rs index cf72319e..7037596b 100644 --- a/crates/async-compression/src/generic/write/mod.rs +++ b/crates/async-compression/src/generic/write/mod.rs @@ -1,7 +1,9 @@ mod buf_write; mod buf_writer; mod decoder; +mod encoder; pub(crate) use buf_write::*; pub(crate) use buf_writer::*; pub(crate) use decoder::*; +pub(crate) use encoder::*; diff --git a/crates/async-compression/src/tokio/write/generic/encoder.rs b/crates/async-compression/src/tokio/write/generic/encoder.rs index 72420dc8..c50358db 100644 --- a/crates/async-compression/src/tokio/write/generic/encoder.rs +++ b/crates/async-compression/src/tokio/write/generic/encoder.rs @@ -1,8 +1,4 @@ -use crate::codecs::Encode; -use crate::core::util::PartialBuffer; -use crate::tokio::write::{AsyncBufWrite, BufWriter}; -use futures_core::ready; -use pin_project_lite::pin_project; +use crate::{generic::write::impl_encoder, tokio::write::BufWriter}; use std::{ io, pin::Pin, @@ -10,193 +6,7 @@ use std::{ }; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - writer: BufWriter, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - - pub fn with_capacity(writer: W, encoder: E, cap: usize) -> Self { - Self { - writer: BufWriter::with_capacity(cap, writer), - encoder, - state: State::Encoding, - } - } -} - -impl Encoder { - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub(crate) fn get_encoder_ref(&self) -> &E { - &self.encoder - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } -} - -impl Encoder { - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding => { - this.encoder.encode(input, &mut output)?; - State::Encoding - } - - // Once a flush has been started, it must be completed. - State::Flushing => match this.encoder.flush(&mut output)? { - true => State::Encoding, - false => State::Flushing, - }, - - State::Finishing | State::Done => { - return Poll::Ready(Err(io::Error::other("Write after shutdown"))) - } - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let done = match this.state { - State::Encoding | State::Flushing => this.encoder.flush(&mut output)?, - - State::Finishing | State::Done => { - return Poll::Ready(Err(io::Error::other("Flush after shutdown"))) - } - }; - *this.state = State::Flushing; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - *this.state = State::Encoding; - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding | State::Finishing => { - if this.encoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - // Once a flush has been started, it must be completed. - State::Flushing => match this.encoder.flush(&mut output)? { - true => State::Finishing, - false => State::Flushing, - }, - - State::Done => State::Done, - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Encoder { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_shutdown(cx))?; - ready!(self.project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } -} +impl_encoder!(poll_shutdown); impl AsyncRead for Encoder { fn poll_read( @@ -207,13 +17,3 @@ impl AsyncRead for Encoder { self.get_pin_mut().poll_read(cx, buf) } } - -impl AsyncBufRead for Encoder { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_pin_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_pin_mut().consume(amt) - } -} From f1dcaa3c32fe9fff8c00d8537ce96998c9bc7387 Mon Sep 17 00:00:00 2001 From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Date: Tue, 4 Nov 2025 14:48:05 +0000 Subject: [PATCH 3/3] Simplify `generic::write::Decoder` impl - Do not use `ready!` or `?` propagation on last `poll` call - Do not use `as_mut()` for last `poll` call --- crates/async-compression/src/generic/write/decoder.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/async-compression/src/generic/write/decoder.rs b/crates/async-compression/src/generic/write/decoder.rs index 7c2353c5..ec3035b0 100644 --- a/crates/async-compression/src/generic/write/decoder.rs +++ b/crates/async-compression/src/generic/write/decoder.rs @@ -208,8 +208,7 @@ macro_rules! impl_decoder { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) + self.project().writer.poll_flush(cx) } fn $poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -219,8 +218,7 @@ macro_rules! impl_decoder { let this = self.project(); if this.inner.is_done() { - ready!(this.writer.$poll_close(cx))?; - Poll::Ready(Ok(())) + this.writer.$poll_close(cx) } else { Poll::Ready(Err(io::Error::other( "Attempt to close before finishing input",