From 50d62add5ecb8a386e86259b5bdfbe138a8a6cf1 Mon Sep 17 00:00:00 2001 From: NobodyXu Date: Sun, 24 Aug 2025 18:19:05 +1000 Subject: [PATCH 1/4] Refactor: Extract duplicate trait AsyncBufWrite --- src/{futures/write => }/buf_write.rs | 0 src/futures/write/buf_writer.rs | 2 +- src/futures/write/mod.rs | 3 +-- src/lib.rs | 3 +++ src/tokio/write/buf_write.rs | 32 ---------------------------- src/tokio/write/buf_writer.rs | 2 +- src/tokio/write/mod.rs | 3 +-- 7 files changed, 7 insertions(+), 38 deletions(-) rename src/{futures/write => }/buf_write.rs (100%) delete mode 100644 src/tokio/write/buf_write.rs diff --git a/src/futures/write/buf_write.rs b/src/buf_write.rs similarity index 100% rename from src/futures/write/buf_write.rs rename to src/buf_write.rs diff --git a/src/futures/write/buf_writer.rs b/src/futures/write/buf_writer.rs index 13f23f16..fc9b1d03 100644 --- a/src/futures/write/buf_writer.rs +++ b/src/futures/write/buf_writer.rs @@ -2,7 +2,7 @@ // the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient // with those methods. -use super::AsyncBufWrite; +use crate::AsyncBufWrite; use futures_core::ready; use futures_io::{AsyncSeek, AsyncWrite, SeekFrom}; use pin_project_lite::pin_project; diff --git a/src/futures/write/mod.rs b/src/futures/write/mod.rs index 831328d6..09fa0321 100644 --- a/src/futures/write/mod.rs +++ b/src/futures/write/mod.rs @@ -5,11 +5,10 @@ mod macros; mod generic; -mod buf_write; mod buf_writer; +use crate::AsyncBufWrite; use self::{ - buf_write::AsyncBufWrite, buf_writer::BufWriter, generic::{Decoder, Encoder}, }; diff --git a/src/lib.rs b/src/lib.rs index 0c1e0a5d..6e0b594e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -155,6 +155,9 @@ #[macro_use] mod macros; +mod buf_write; +use buf_write::AsyncBufWrite; + #[cfg(feature = "futures-io")] pub mod futures; #[cfg(feature = "tokio")] diff --git a/src/tokio/write/buf_write.rs b/src/tokio/write/buf_write.rs deleted file mode 100644 index 5ca99731..00000000 --- a/src/tokio/write/buf_write.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::{ - io, - pin::Pin, - task::{Context, Poll}, -}; - -pub(crate) trait AsyncBufWrite { - /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if - /// it is full. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and - /// arranges for the current task context (`cx`) to receive a notification when the object - /// becomes readable or is closed. - fn poll_partial_flush_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be - /// written out to the underlying IO when possible. - /// - /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to - /// function properly. This function does not perform any I/O, it simply informs this object - /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should - /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't - /// called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. - fn produce(self: Pin<&mut Self>, amt: usize); -} diff --git a/src/tokio/write/buf_writer.rs b/src/tokio/write/buf_writer.rs index c56c7e64..42c8e5dc 100644 --- a/src/tokio/write/buf_writer.rs +++ b/src/tokio/write/buf_writer.rs @@ -2,7 +2,7 @@ // the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient // with those methods. -use super::AsyncBufWrite; +use crate::AsyncBufWrite; use futures_core::ready; use pin_project_lite::pin_project; use std::{ diff --git a/src/tokio/write/mod.rs b/src/tokio/write/mod.rs index 409cd670..db0a756f 100644 --- a/src/tokio/write/mod.rs +++ b/src/tokio/write/mod.rs @@ -5,11 +5,10 @@ mod macros; mod generic; -mod buf_write; mod buf_writer; +use crate::AsyncBufWrite; use self::{ - buf_write::AsyncBufWrite, buf_writer::BufWriter, generic::{Decoder, Encoder}, }; From 398c05ed1a07e7056b9cddd82e6e84fead0cffa8 Mon Sep 17 00:00:00 2001 From: NobodyXu Date: Sun, 24 Aug 2025 18:45:11 +1000 Subject: [PATCH 2/4] Refactor: Extract dup buf_writer.rs --- src/buf_write.rs | 57 ++++---- src/buf_writer.rs | 252 ++++++++++++++++++++++++++++++++ src/futures/write/buf_writer.rs | 230 ----------------------------- src/futures/write/mod.rs | 9 +- src/lib.rs | 4 +- src/tokio/write/buf_writer.rs | 216 --------------------------- src/tokio/write/mod.rs | 9 +- 7 files changed, 291 insertions(+), 486 deletions(-) create mode 100644 src/buf_writer.rs delete mode 100644 src/futures/write/buf_writer.rs delete mode 100644 src/tokio/write/buf_writer.rs diff --git a/src/buf_write.rs b/src/buf_write.rs index 5ca99731..c86da09a 100644 --- a/src/buf_write.rs +++ b/src/buf_write.rs @@ -4,29 +4,36 @@ use std::{ task::{Context, Poll}, }; -pub(crate) trait AsyncBufWrite { - /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if - /// it is full. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and - /// arranges for the current task context (`cx`) to receive a notification when the object - /// becomes readable or is closed. - fn poll_partial_flush_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be - /// written out to the underlying IO when possible. - /// - /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to - /// function properly. This function does not perform any I/O, it simply informs this object - /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should - /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't - /// called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. - fn produce(self: Pin<&mut Self>, amt: usize); +macro_rules! impl_async_buf_write { + ($AsyncBufWrite:tt) => { + pub(crate) trait $AsyncBufWrite { + /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if + /// it is full. + /// + /// On success, returns `Poll::Ready(Ok(buf))`. + /// + /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and + /// arranges for the current task context (`cx`) to receive a notification when the object + /// becomes readable or is closed. + fn poll_partial_flush_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; + + /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be + /// written out to the underlying IO when possible. + /// + /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to + /// function properly. This function does not perform any I/O, it simply informs this object + /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should + /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't + /// called before calling it. + /// + /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. + fn produce(self: Pin<&mut Self>, amt: usize); + } + } } + +impl_async_buf_write!(AsyncBufWriteTokio); +impl_async_buf_write!(AsyncBufWriteFuturesIo); \ No newline at end of file diff --git a/src/buf_writer.rs b/src/buf_writer.rs new file mode 100644 index 00000000..b7a51776 --- /dev/null +++ b/src/buf_writer.rs @@ -0,0 +1,252 @@ +// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that +// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient +// with those methods. + +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + cmp::min, + fmt, io, + pin::Pin, + task::{Context, Poll}, +}; + +const DEFAULT_BUF_SIZE: usize = 8192; + +pin_project! { + pub struct BufWriter { + #[pin] + inner: W, + buf: Box<[u8]>, + written: usize, + buffered: usize, + } +} + +impl BufWriter { + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: W) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufWriter` with the specified buffer capacity. + pub fn with_capacity(cap: usize, inner: W) -> Self { + Self { + inner, + buf: vec![0; cap].into(), + written: 0, + buffered: 0, + } + } + + /// Gets a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.inner + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_mut(&mut self) -> &mut W { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.project().inner + } + + /// Consumes this `BufWriter`, returning the underlying writer. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> W { + self.inner + } +} + +impl fmt::Debug for BufWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufWriter") + .field("writer", &self.inner) + .field( + "buffer", + &format_args!("{}/{}", self.buffered, self.buf.len()), + ) + .field("written", &self.written) + .finish() + } +} + + +macro_rules! impl_traits { + ($partial_flush_buf:tt, $flush_buf:tt, $shutdown_fn:tt) => { + impl BufWriter { + fn $partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + let mut ret = Ok(()); + while *this.written < *this.buffered { + match this + .inner + .as_mut() + .poll_write(cx, &this.buf[*this.written..*this.buffered]) + { + Poll::Pending => { + break; + } + Poll::Ready(Ok(0)) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write the buffered data", + )); + break; + } + Poll::Ready(Ok(n)) => *this.written += n, + Poll::Ready(Err(e)) => { + ret = Err(e); + break; + } + } + } + + if *this.written > 0 { + this.buf.copy_within(*this.written..*this.buffered, 0); + *this.buffered -= *this.written; + *this.written = 0; + + Poll::Ready(ret) + } else if *this.buffered == 0 { + Poll::Ready(ret) + } else { + ret?; + Poll::Pending + } + } + + fn $flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + let mut ret = Ok(()); + while *this.written < *this.buffered { + match ready!(this + .inner + .as_mut() + .poll_write(cx, &this.buf[*this.written..*this.buffered])) + { + Ok(0) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write the buffered data", + )); + break; + } + Ok(n) => *this.written += n, + Err(e) => { + ret = Err(e); + break; + } + } + } + this.buf.copy_within(*this.written..*this.buffered, 0); + *this.buffered -= *this.written; + *this.written = 0; + Poll::Ready(ret) + } + } + + + impl AsyncWrite for BufWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.as_mut().project(); + if *this.buffered + buf.len() > this.buf.len() { + ready!(self.as_mut().$partial_flush_buf(cx))?; + } + + let this = self.as_mut().project(); + if buf.len() >= this.buf.len() { + if *this.buffered == 0 { + this.inner.poll_write(cx, buf) + } else { + // The only way that `partial_flush_buf` would have returned with + // `this.buffered != 0` is if it were Pending, so our waker was already queued + Poll::Pending + } + } else { + let len = min(this.buf.len() - *this.buffered, buf.len()); + this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); + *this.buffered += len; + Poll::Ready(Ok(len)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().$flush_buf(cx))?; + self.project().inner.poll_flush(cx) + } + + fn $shutdown_fn(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().$flush_buf(cx))?; + self.project().inner.$shutdown_fn(cx) + } + } + + impl AsyncBufWrite for BufWriter { + fn poll_partial_flush_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + ready!(self.as_mut().$partial_flush_buf(cx))?; + let this = self.project(); + Poll::Ready(Ok(&mut this.buf[*this.buffered..])) + } + + fn produce(self: Pin<&mut Self>, amt: usize) { + let this = self.project(); + debug_assert!( + *this.buffered + amt <= this.buf.len(), + "produce called with amt exceeding buffer capacity" + ); + *this.buffered += amt; + } + } + }; +} + +#[cfg(feature = "tokio")] +mod tokio_impl { + use tokio::io::AsyncWrite; + use crate::buf_write::AsyncBufWriteTokio as AsyncBufWrite; + use super::*; + + impl_traits!(partial_flush_buf_tokio, flush_buf_tokio, poll_shutdown); +} + +#[cfg(feature = "futures-io")] +mod futures_io_impl { + use crate::buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite; + use super::*; + use futures_io::{AsyncSeek, AsyncWrite, SeekFrom}; + + impl_traits!(partial_flush_buf_futures_io, flush_buf_futures_io, poll_close); + + impl AsyncSeek for BufWriter { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + ready!(self.as_mut().flush_buf_futures_io(cx))?; + self.project().inner.poll_seek(cx, pos) + } + } +} \ No newline at end of file diff --git a/src/futures/write/buf_writer.rs b/src/futures/write/buf_writer.rs deleted file mode 100644 index fc9b1d03..00000000 --- a/src/futures/write/buf_writer.rs +++ /dev/null @@ -1,230 +0,0 @@ -// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that -// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient -// with those methods. - -use crate::AsyncBufWrite; -use futures_core::ready; -use futures_io::{AsyncSeek, AsyncWrite, SeekFrom}; -use pin_project_lite::pin_project; -use std::{ - cmp::min, - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; - -const DEFAULT_BUF_SIZE: usize = 8192; - -pin_project! { - pub struct BufWriter { - #[pin] - inner: W, - buf: Box<[u8]>, - written: usize, - buffered: usize, - } -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: vec![0; cap].into(), - written: 0, - buffered: 0, - } - } - - fn partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered]) - { - Poll::Pending => { - break; - } - Poll::Ready(Ok(0)) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Poll::Ready(Ok(n)) => *this.written += n, - Poll::Ready(Err(e)) => { - ret = Err(e); - break; - } - } - } - - if *this.written > 0 { - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - - Poll::Ready(ret) - } else if *this.buffered == 0 { - Poll::Ready(ret) - } else { - ret?; - Poll::Pending - } - } - - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match ready!(this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered])) - { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *this.written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - Poll::Ready(ret) - } -} - -impl BufWriter { - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.as_mut().project(); - if *this.buffered + buf.len() > this.buf.len() { - ready!(self.as_mut().partial_flush_buf(cx))?; - } - - let this = self.as_mut().project(); - if buf.len() >= this.buf.len() { - if *this.buffered == 0 { - this.inner.poll_write(cx, buf) - } else { - // The only way that `partial_flush_buf` would have returned with - // `this.buffered != 0` is if it were Pending, so our waker was already queued - Poll::Pending - } - } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); - this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); - *this.buffered += len; - Poll::Ready(Ok(len)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_close(cx) - } -} - -impl AsyncBufWrite for BufWriter { - fn poll_partial_flush_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().partial_flush_buf(cx))?; - let this = self.project(); - Poll::Ready(Ok(&mut this.buf[*this.buffered..])) - } - - fn produce(self: Pin<&mut Self>, amt: usize) { - let this = self.project(); - debug_assert!( - *this.buffered + amt <= this.buf.len(), - "produce called with amt exceeding buffer capacity" - ); - *this.buffered += amt; - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buffered, self.buf.len()), - ) - .field("written", &self.written) - .finish() - } -} - -impl AsyncSeek for BufWriter { - /// Seek to the offset, in bytes, in the underlying writer. - /// - /// Seeking always writes out the internal buffer before seeking. - fn poll_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_seek(cx, pos) - } -} diff --git a/src/futures/write/mod.rs b/src/futures/write/mod.rs index 09fa0321..84397b37 100644 --- a/src/futures/write/mod.rs +++ b/src/futures/write/mod.rs @@ -5,12 +5,7 @@ mod macros; mod generic; -mod buf_writer; - -use crate::AsyncBufWrite; -use self::{ - buf_writer::BufWriter, - generic::{Decoder, Encoder}, -}; +use crate::{buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite, BufWriter}; +use self::generic::{Decoder, Encoder}; algos!(futures::write); diff --git a/src/lib.rs b/src/lib.rs index 6e0b594e..e55ffb27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,7 +156,9 @@ mod macros; mod buf_write; -use buf_write::AsyncBufWrite; +mod buf_writer; + +use buf_writer::BufWriter; #[cfg(feature = "futures-io")] pub mod futures; diff --git a/src/tokio/write/buf_writer.rs b/src/tokio/write/buf_writer.rs deleted file mode 100644 index 42c8e5dc..00000000 --- a/src/tokio/write/buf_writer.rs +++ /dev/null @@ -1,216 +0,0 @@ -// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that -// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient -// with those methods. - -use crate::AsyncBufWrite; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - cmp::min, - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; -use tokio::io::AsyncWrite; - -const DEFAULT_BUF_SIZE: usize = 8192; - -pin_project! { - pub struct BufWriter { - #[pin] - inner: W, - buf: Box<[u8]>, - written: usize, - buffered: usize, - } -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: vec![0; cap].into(), - written: 0, - buffered: 0, - } - } - - fn partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered]) - { - Poll::Pending => { - break; - } - Poll::Ready(Ok(0)) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Poll::Ready(Ok(n)) => *this.written += n, - Poll::Ready(Err(e)) => { - ret = Err(e); - break; - } - } - } - - if *this.written > 0 { - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - - Poll::Ready(ret) - } else if *this.buffered == 0 { - Poll::Ready(ret) - } else { - ret?; - Poll::Pending - } - } - - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match ready!(this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered])) - { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *this.written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - Poll::Ready(ret) - } -} - -impl BufWriter { - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.as_mut().project(); - if *this.buffered + buf.len() > this.buf.len() { - ready!(self.as_mut().partial_flush_buf(cx))?; - } - - let this = self.as_mut().project(); - if buf.len() >= this.buf.len() { - if *this.buffered == 0 { - this.inner.poll_write(cx, buf) - } else { - // The only way that `partial_flush_buf` would have returned with - // `this.buffered != 0` is if it were Pending, so our waker was already queued - Poll::Pending - } - } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); - this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); - *this.buffered += len; - Poll::Ready(Ok(len)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_shutdown(cx) - } -} - -impl AsyncBufWrite for BufWriter { - fn poll_partial_flush_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().partial_flush_buf(cx))?; - let this = self.project(); - Poll::Ready(Ok(&mut this.buf[*this.buffered..])) - } - - fn produce(self: Pin<&mut Self>, amt: usize) { - let this = self.project(); - debug_assert!( - *this.buffered + amt <= this.buf.len(), - "produce called with amt exceeding buffer capacity" - ); - *this.buffered += amt; - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buffered, self.buf.len()), - ) - .field("written", &self.written) - .finish() - } -} diff --git a/src/tokio/write/mod.rs b/src/tokio/write/mod.rs index db0a756f..ef65b2b2 100644 --- a/src/tokio/write/mod.rs +++ b/src/tokio/write/mod.rs @@ -5,12 +5,7 @@ mod macros; mod generic; -mod buf_writer; - -use crate::AsyncBufWrite; -use self::{ - buf_writer::BufWriter, - generic::{Decoder, Encoder}, -}; +use crate::{buf_write::AsyncBufWriteTokio as AsyncBufWrite, BufWriter}; +use self::generic::{Decoder, Encoder}; algos!(tokio::write); From 54f80cf42bf4db34d2e637fb1b1eadf2b29c3918 Mon Sep 17 00:00:00 2001 From: NobodyXu Date: Sun, 24 Aug 2025 18:49:52 +1000 Subject: [PATCH 3/4] Rm unnecessary `std::cmp::min` import --- src/buf_writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/buf_writer.rs b/src/buf_writer.rs index b7a51776..1c200d60 100644 --- a/src/buf_writer.rs +++ b/src/buf_writer.rs @@ -5,7 +5,6 @@ use futures_core::ready; use pin_project_lite::pin_project; use std::{ - cmp::min, fmt, io, pin::Pin, task::{Context, Poll}, @@ -179,7 +178,7 @@ macro_rules! impl_traits { Poll::Pending } } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); + let len = buf.len().min(this.buf.len() - *this.buffered); this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); *this.buffered += len; Poll::Ready(Ok(len)) From 39274e59dd3945d633675e05ce62d3bddab4d8c1 Mon Sep 17 00:00:00 2001 From: NobodyXu Date: Sun, 24 Aug 2025 18:54:28 +1000 Subject: [PATCH 4/4] `cargo fmt --all` --- src/buf_write.rs | 6 +++--- src/buf_writer.rs | 42 ++++++++++++++++++++++++---------------- src/futures/write/mod.rs | 2 +- src/tokio/write/mod.rs | 2 +- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/buf_write.rs b/src/buf_write.rs index c86da09a..62c1dcf9 100644 --- a/src/buf_write.rs +++ b/src/buf_write.rs @@ -19,7 +19,7 @@ macro_rules! impl_async_buf_write { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>; - + /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be /// written out to the underlying IO when possible. /// @@ -32,8 +32,8 @@ macro_rules! impl_async_buf_write { /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. fn produce(self: Pin<&mut Self>, amt: usize); } - } + }; } impl_async_buf_write!(AsyncBufWriteTokio); -impl_async_buf_write!(AsyncBufWriteFuturesIo); \ No newline at end of file +impl_async_buf_write!(AsyncBufWriteFuturesIo); diff --git a/src/buf_writer.rs b/src/buf_writer.rs index 1c200d60..7c723f7b 100644 --- a/src/buf_writer.rs +++ b/src/buf_writer.rs @@ -79,13 +79,15 @@ impl fmt::Debug for BufWriter { } } - macro_rules! impl_traits { ($partial_flush_buf:tt, $flush_buf:tt, $shutdown_fn:tt) => { impl BufWriter { - fn $partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn $partial_flush_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let mut this = self.project(); - + let mut ret = Ok(()); while *this.written < *this.buffered { match this @@ -110,12 +112,12 @@ macro_rules! impl_traits { } } } - + if *this.written > 0 { this.buf.copy_within(*this.written..*this.buffered, 0); *this.buffered -= *this.written; *this.written = 0; - + Poll::Ready(ret) } else if *this.buffered == 0 { Poll::Ready(ret) @@ -124,10 +126,10 @@ macro_rules! impl_traits { Poll::Pending } } - + fn $flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - + let mut ret = Ok(()); while *this.written < *this.buffered { match ready!(this @@ -156,7 +158,6 @@ macro_rules! impl_traits { } } - impl AsyncWrite for BufWriter { fn poll_write( mut self: Pin<&mut Self>, @@ -167,7 +168,7 @@ macro_rules! impl_traits { if *this.buffered + buf.len() > this.buf.len() { ready!(self.as_mut().$partial_flush_buf(cx))?; } - + let this = self.as_mut().project(); if buf.len() >= this.buf.len() { if *this.buffered == 0 { @@ -184,13 +185,16 @@ macro_rules! impl_traits { Poll::Ready(Ok(len)) } } - + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().$flush_buf(cx))?; self.project().inner.poll_flush(cx) } - fn $shutdown_fn(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn $shutdown_fn( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { ready!(self.as_mut().$flush_buf(cx))?; self.project().inner.$shutdown_fn(cx) } @@ -205,7 +209,7 @@ macro_rules! impl_traits { let this = self.project(); Poll::Ready(Ok(&mut this.buf[*this.buffered..])) } - + fn produce(self: Pin<&mut Self>, amt: usize) { let this = self.project(); debug_assert!( @@ -220,20 +224,24 @@ macro_rules! impl_traits { #[cfg(feature = "tokio")] mod tokio_impl { - use tokio::io::AsyncWrite; - use crate::buf_write::AsyncBufWriteTokio as AsyncBufWrite; use super::*; + use crate::buf_write::AsyncBufWriteTokio as AsyncBufWrite; + use tokio::io::AsyncWrite; impl_traits!(partial_flush_buf_tokio, flush_buf_tokio, poll_shutdown); } #[cfg(feature = "futures-io")] mod futures_io_impl { - use crate::buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite; use super::*; + use crate::buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite; use futures_io::{AsyncSeek, AsyncWrite, SeekFrom}; - impl_traits!(partial_flush_buf_futures_io, flush_buf_futures_io, poll_close); + impl_traits!( + partial_flush_buf_futures_io, + flush_buf_futures_io, + poll_close + ); impl AsyncSeek for BufWriter { /// Seek to the offset, in bytes, in the underlying writer. @@ -248,4 +256,4 @@ mod futures_io_impl { self.project().inner.poll_seek(cx, pos) } } -} \ No newline at end of file +} diff --git a/src/futures/write/mod.rs b/src/futures/write/mod.rs index 84397b37..271bbfa2 100644 --- a/src/futures/write/mod.rs +++ b/src/futures/write/mod.rs @@ -5,7 +5,7 @@ mod macros; mod generic; -use crate::{buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite, BufWriter}; use self::generic::{Decoder, Encoder}; +use crate::{buf_write::AsyncBufWriteFuturesIo as AsyncBufWrite, BufWriter}; algos!(futures::write); diff --git a/src/tokio/write/mod.rs b/src/tokio/write/mod.rs index ef65b2b2..c3f21909 100644 --- a/src/tokio/write/mod.rs +++ b/src/tokio/write/mod.rs @@ -5,7 +5,7 @@ mod macros; mod generic; -use crate::{buf_write::AsyncBufWriteTokio as AsyncBufWrite, BufWriter}; use self::generic::{Decoder, Encoder}; +use crate::{buf_write::AsyncBufWriteTokio as AsyncBufWrite, BufWriter}; algos!(tokio::write);