Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions futures-util/src/future/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{LocalWaker, Poll};
use pin_utils::unsafe_pinned;

Expand All @@ -22,9 +22,7 @@ impl<Fut: Future> Fuse<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(f: Fut) -> Fuse<Fut> {
Fuse {
future: Some(f),
}
Fuse { future: Some(f) }
}
}

Expand All @@ -44,13 +42,13 @@ impl<Fut: Future> Future for Fuse<Fut> {
// safety: this re-pinned future will never move before being dropped
match fut.poll(lw) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => v
Poll::Ready(v) => v,
}
}
None => return Poll::Pending,
};

Pin::set(self.as_mut().future(), None);
Pin::set(&mut self.as_mut().future(), None);
Poll::Ready(v)
}
}
21 changes: 11 additions & 10 deletions futures-util/src/future/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,35 @@ use pin_utils::unsafe_pinned;
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<Fut: Future> {
future: Option<Fut>
future: Option<Fut>,
}

impl<Fut: Future> IntoStream<Fut> {
unsafe_pinned!(future: Option<Fut>);

pub(super) fn new(future: Fut) -> IntoStream<Fut> {
IntoStream {
future: Some(future)
future: Some(future),
}
}
}

impl<Fut: Future> Stream for IntoStream<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
lw: &LocalWaker,
) -> Poll<Option<Self::Item>> {
let v = match self.as_mut().future().as_pin_mut() {
Some(fut) => {
match fut.poll(lw) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => v
}
}
Some(fut) => match fut.poll(lw) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => v,
},
None => return Poll::Ready(None),
};

Pin::set(self.as_mut().future(), None);
Pin::set(&mut self.as_mut().future(), None);
Poll::Ready(Some(v))
}
}
2 changes: 1 addition & 1 deletion futures-util/src/future/maybe_done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<Fut: Future> Future for MaybeDone<Fut> {
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
};
Pin::set(self, MaybeDone::Done(res));
Pin::set(&mut self, MaybeDone::Done(res));
Poll::Ready(())
}
}
15 changes: 10 additions & 5 deletions futures-util/src/sink/with_flat_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::marker::{Unpin, PhantomData};
use core::marker::{PhantomData, Unpin};
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::task::{LocalWaker, Poll};
Expand Down Expand Up @@ -28,7 +28,8 @@ where
Si: Sink + Unpin,
F: FnMut(U) -> St,
St: Stream<Item = Result<Si::SinkItem, Si::SinkError>> + Unpin,
{}
{
}

impl<Si, U, St, F> WithFlatMap<Si, U, St, F>
where
Expand Down Expand Up @@ -78,8 +79,12 @@ where
self: Pin<&mut Self>,
lw: &LocalWaker,
) -> Poll<Result<(), Si::SinkError>> {
let WithFlatMap { sink, stream, buffer, .. } =
unsafe { Pin::get_unchecked_mut(self) };
let WithFlatMap {
sink,
stream,
buffer,
..
} = unsafe { Pin::get_unchecked_mut(self) };
let mut sink = unsafe { Pin::new_unchecked(sink) };
let mut stream = unsafe { Pin::new_unchecked(stream) };

Expand All @@ -102,7 +107,7 @@ where
};
}
}
Pin::set(stream, None);
Pin::set(&mut stream, None);
Poll::Ready(Ok(()))
}
}
Expand Down
18 changes: 10 additions & 8 deletions futures-util/src/stream/unfold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned};
/// assert_eq!(result, vec![0, 2, 4]);
/// ```
pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
{
Unfold {
f,
Expand Down Expand Up @@ -85,28 +86,29 @@ impl<T, F, Fut> FusedStream for Unfold<T, F, Fut> {
}

impl<T, F, Fut, It> Stream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
{
type Item = It;

fn poll_next(
mut self: Pin<&mut Self>,
lw: &LocalWaker
lw: &LocalWaker,
) -> Poll<Option<It>> {
if let Some(state) = self.as_mut().state().take() {
let fut = (self.as_mut().f())(state);
Pin::set(self.as_mut().fut(), Some(fut));
Pin::set(&mut self.as_mut().fut(), Some(fut));
}

let step = ready!(self.as_mut().fut().as_pin_mut().unwrap().poll(lw));
self.as_mut().fut().set(None);

if let Some((item, next_state)) = step {
*self.as_mut().state() = Some(next_state);
return Poll::Ready(Some(item))
return Poll::Ready(Some(item));
} else {
return Poll::Ready(None)
return Poll::Ready(None);
}
}
}
6 changes: 3 additions & 3 deletions futures-util/src/try_future/flatten_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where

#[allow(clippy::needless_lifetimes)] // https://github.com/rust-lang/rust/issues/52675
fn project_pin<'a>(
self: Pin<&'a mut Self>
self: Pin<&'a mut Self>,
) -> State<Pin<&'a mut Fut>, Pin<&'a mut Si>> {
unsafe {
match &mut Pin::get_unchecked_mut(self).0 {
Expand Down Expand Up @@ -59,7 +59,7 @@ where
Waiting(f) => try_ready!(f.try_poll(lw)),
Closed => panic!("poll_ready called after eof"),
};
Pin::set(self.as_mut(), FlattenSink(Ready(resolved_stream)));
Pin::set(&mut self.as_mut(), FlattenSink(Ready(resolved_stream)));
if let Ready(resolved_stream) = self.project_pin() {
resolved_stream.poll_ready(lw)
} else {
Expand Down Expand Up @@ -99,7 +99,7 @@ where
Waiting(_) | Closed => Poll::Ready(Ok(())),
};
if res.is_ready() {
Pin::set(self, FlattenSink(Closed));
Pin::set(&mut self, FlattenSink(Closed));
}
res
}
Expand Down
18 changes: 10 additions & 8 deletions futures-util/src/try_stream/try_for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ pub struct TryForEach<St, Fut, F> {
impl<St: Unpin, Fut: Unpin, F> Unpin for TryForEach<St, Fut, F> {}

impl<St, Fut, F> TryForEach<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
where
St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
{
unsafe_pinned!(stream: St);
unsafe_unpinned!(f: F);
Expand All @@ -38,9 +39,10 @@ where St: TryStream,
}

impl<St, Fut, F> Future for TryForEach<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
where
St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
{
type Output = Result<(), St::Error>;

Expand All @@ -49,12 +51,12 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
if let Some(future) = self.as_mut().future().as_pin_mut() {
try_ready!(future.try_poll(lw));
}
Pin::set(self.as_mut().future(), None);
Pin::set(&mut self.as_mut().future(), None);

match ready!(self.as_mut().stream().try_poll_next(lw)) {
Some(Ok(e)) => {
let future = (self.as_mut().f())(e);
Pin::set(self.as_mut().future(), Some(future));
Pin::set(&mut self.as_mut().future(), Some(future));
}
Some(Err(e)) => return Poll::Ready(Err(e)),
None => return Poll::Ready(Ok(())),
Expand Down