diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index e5b515b64a..7843df9df6 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -9,7 +9,7 @@ use crate::future::{assert_future, Either}; use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::{Stream, TryStream}; +use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "compat")] @@ -248,15 +248,12 @@ pub trait SinkExt: Sink { /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from /// `stream` and send them to `self`. - fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> + fn send_all(&mut self, stream: St) -> SendAll<'_, Self, St> where - St: TryStream + Stream + Unpin + ?Sized, - // St: Stream> + Unpin + ?Sized, + St: Stream>, Self: Unpin, { - // TODO: type mismatch resolving `::Item == std::result::Result>::Error>` - // assert_future::, _>(SendAll::new(self, stream)) - SendAll::new(self, stream) + assert_future::, _>(SendAll::new(self, stream)) } /// Wrap this sink in an `Either` sink, making it the left-hand variant diff --git a/futures-util/src/sink/send_all.rs b/futures-util/src/sink/send_all.rs index 6a33459be0..21892fe88c 100644 --- a/futures-util/src/sink/send_all.rs +++ b/futures-util/src/sink/send_all.rs @@ -1,4 +1,4 @@ -use crate::stream::{StreamExt, TryStreamExt, Fuse}; +use crate::stream::{StreamExt, Fuse}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -6,24 +6,28 @@ use futures_core::ready; use futures_core::stream::{TryStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; +use pin_project_lite::pin_project; -/// Future for the [`send_all`](super::SinkExt::send_all) method. -#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993 -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct SendAll<'a, Si, St> -where - Si: ?Sized, - St: ?Sized + TryStream, -{ - sink: &'a mut Si, - stream: Fuse<&'a mut St>, - buffered: Option, +pin_project! { + /// Future for the [`send_all`](super::SinkExt::send_all) method. + #[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993 + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct SendAll<'a, Si, St> + where + Si: ?Sized, + St: TryStream, + { + sink: &'a mut Si, + #[pin] + stream: Fuse, + buffered: Option, + } } impl fmt::Debug for SendAll<'_, Si, St> where Si: fmt::Debug + ?Sized, - St: fmt::Debug + ?Sized + TryStream, + St: fmt::Debug + TryStream, St::Ok: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -35,21 +39,14 @@ where } } -// Pinning is never projected to any fields -impl Unpin for SendAll<'_, Si, St> -where - Si: Unpin + ?Sized, - St: TryStream + Unpin + ?Sized, -{} - impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St> where Si: Sink + Unpin + ?Sized, - St: TryStream + Stream + Unpin + ?Sized, + St: TryStream + Stream, { pub(super) fn new( sink: &'a mut Si, - stream: &'a mut St, + stream: St, ) -> Self { Self { sink, @@ -59,17 +56,18 @@ where } fn try_start_send( - &mut self, + self: Pin<&mut Self>, cx: &mut Context<'_>, item: St::Ok, ) -> Poll> { - debug_assert!(self.buffered.is_none()); - match Pin::new(&mut self.sink).poll_ready(cx)? { + let this = self.project(); + debug_assert!(this.buffered.is_none()); + match Pin::new(&mut *this.sink).poll_ready(cx)? { Poll::Ready(()) => { - Poll::Ready(Pin::new(&mut self.sink).start_send(item)) + Poll::Ready(Pin::new(&mut *this.sink).start_send(item)) } Poll::Pending => { - self.buffered = Some(item); + *this.buffered = Some(item); Poll::Pending } } @@ -79,7 +77,7 @@ where impl Future for SendAll<'_, Si, St> where Si: Sink + Unpin + ?Sized, - St: Stream> + Unpin + ?Sized, + St: Stream>, { type Output = Result<(), Error>; @@ -87,24 +85,24 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { - let this = &mut *self; // If we've got an item buffered already, we need to write it to the // sink before we can do anything else - if let Some(item) = this.buffered.take() { - ready!(this.try_start_send(cx, item))? + if let Some(item) = self.as_mut().project().buffered.take() { + ready!(self.as_mut().try_start_send(cx, item))? } loop { - match this.stream.try_poll_next_unpin(cx)? { + let this = self.as_mut().project(); + match this.stream.try_poll_next(cx)? { Poll::Ready(Some(item)) => { - ready!(this.try_start_send(cx, item))? + ready!(self.as_mut().try_start_send(cx, item))? } Poll::Ready(None) => { - ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + ready!(Pin::new(this.sink).poll_flush(cx))?; return Poll::Ready(Ok(())) } Poll::Pending => { - ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + ready!(Pin::new(this.sink).poll_flush(cx))?; return Poll::Pending } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 111fdf6388..935cf6a50f 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1018,7 +1018,7 @@ pub mod sink { assert_not_impl!(SendAll<'_, (), LocalTryStream>: Sync); assert_not_impl!(SendAll<'_, *const (), SyncTryStream<()>>: Sync); assert_impl!(SendAll<'_, (), UnpinTryStream>: Unpin); - assert_not_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin); + assert_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin); assert_not_impl!(SendAll<'_, (), PinnedTryStream>: Unpin); assert_impl!(SinkErrInto: Send);