Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::future::{assert_future, Either};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};

#[cfg(feature = "compat")]
Expand Down Expand Up @@ -248,15 +248,12 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Doing `sink.send_all(stream)` is roughly equivalent to
/// `stream.forward(sink)`. The returned future will exhaust all items from
/// `stream` and send them to `self`.
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<St>(&mut self, stream: St) -> SendAll<'_, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
// St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Unpin,
{
// TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
// assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
SendAll::new(self, stream)
assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
}

/// Wrap this sink in an `Either` sink, making it the left-hand variant
Expand Down
68 changes: 33 additions & 35 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
use crate::stream::{StreamExt, TryStreamExt, Fuse};
use crate::stream::{StreamExt, Fuse};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::{TryStream, Stream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project_lite::pin_project;

/// Future for the [`send_all`](super::SinkExt::send_all) method.
#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendAll<'a, Si, St>
where
Si: ?Sized,
St: ?Sized + TryStream,
{
sink: &'a mut Si,
stream: Fuse<&'a mut St>,
buffered: Option<St::Ok>,
pin_project! {
/// Future for the [`send_all`](super::SinkExt::send_all) method.
#[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct SendAll<'a, Si, St>
where
Si: ?Sized,
St: TryStream,
{
sink: &'a mut Si,
#[pin]
stream: Fuse<St>,
buffered: Option<St::Ok>,
}
}

impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
where
Si: fmt::Debug + ?Sized,
St: fmt::Debug + ?Sized + TryStream,
St: fmt::Debug + TryStream,
St::Ok: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -35,21 +39,14 @@ where
}
}

// Pinning is never projected to any fields
impl<Si, St> Unpin for SendAll<'_, Si, St>
where
Si: Unpin + ?Sized,
St: TryStream + Unpin + ?Sized,
{}

impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St>
where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized,
St: TryStream<Ok = Ok, Error = Error> + Stream,
{
pub(super) fn new(
sink: &'a mut Si,
stream: &'a mut St,
stream: St,
) -> Self {
Self {
sink,
Expand All @@ -59,17 +56,18 @@ where
}

fn try_start_send(
&mut self,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
item: St::Ok,
) -> Poll<Result<(), Si::Error>> {
debug_assert!(self.buffered.is_none());
match Pin::new(&mut self.sink).poll_ready(cx)? {
let this = self.project();
debug_assert!(this.buffered.is_none());
match Pin::new(&mut *this.sink).poll_ready(cx)? {
Poll::Ready(()) => {
Poll::Ready(Pin::new(&mut self.sink).start_send(item))
Poll::Ready(Pin::new(&mut *this.sink).start_send(item))
}
Poll::Pending => {
self.buffered = Some(item);
*this.buffered = Some(item);
Poll::Pending
}
}
Expand All @@ -79,32 +77,32 @@ where
impl<Si, St, Ok, Error> Future for SendAll<'_, Si, St>
where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: Stream<Item = Result<Ok, Error>> + Unpin + ?Sized,
St: Stream<Item = Result<Ok, Error>>,
{
type Output = Result<(), Error>;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if let Some(item) = this.buffered.take() {
ready!(this.try_start_send(cx, item))?
if let Some(item) = self.as_mut().project().buffered.take() {
ready!(self.as_mut().try_start_send(cx, item))?
}

loop {
match this.stream.try_poll_next_unpin(cx)? {
let this = self.as_mut().project();
match this.stream.try_poll_next(cx)? {
Poll::Ready(Some(item)) => {
ready!(this.try_start_send(cx, item))?
ready!(self.as_mut().try_start_send(cx, item))?
}
Poll::Ready(None) => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(Pin::new(this.sink).poll_flush(cx))?;
return Poll::Ready(Ok(()))
}
Poll::Pending => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(Pin::new(this.sink).poll_flush(cx))?;
return Poll::Pending
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ pub mod sink {
assert_not_impl!(SendAll<'_, (), LocalTryStream>: Sync);
assert_not_impl!(SendAll<'_, *const (), SyncTryStream<()>>: Sync);
assert_impl!(SendAll<'_, (), UnpinTryStream>: Unpin);
assert_not_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin);
assert_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin);
assert_not_impl!(SendAll<'_, (), PinnedTryStream>: Unpin);

assert_impl!(SinkErrInto<SendSink, *const (), *const ()>: Send);
Expand Down