Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.3.25 - 2022-10-20

* Fix soundness issue in `join!` and `try_join!` macros (#2649)
* Implement `Clone` for `sink::Drain` (#2650)

# 0.3.24 - 2022-08-29

* Fix incorrect termination of `select_with_strategy` streams (#2635)
Expand Down
6 changes: 3 additions & 3 deletions futures-channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-channel"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand All @@ -22,8 +22,8 @@ unstable = []
cfg-target-has-atomic = []

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.24", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.3.24", default-features = false, optional = true }
futures-core = { path = "../futures-core", version = "0.3.25", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true }

[dev-dependencies]
futures = { path = "../futures", default-features = true }
Expand Down
2 changes: 1 addition & 1 deletion futures-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-core"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.36"
license = "MIT OR Apache-2.0"
Expand Down
8 changes: 4 additions & 4 deletions futures-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-executor"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand All @@ -16,9 +16,9 @@ std = ["futures-core/std", "futures-task/std", "futures-util/std"]
thread-pool = ["std", "num_cpus"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.24", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.24", default-features = false }
futures-util = { path = "../futures-util", version = "0.3.24", default-features = false }
futures-core = { path = "../futures-core", version = "0.3.25", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.25", default-features = false }
futures-util = { path = "../futures-util", version = "0.3.25", default-features = false }
num_cpus = { version = "1.8.0", optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion futures-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-io"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.36"
license = "MIT OR Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion futures-macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-macro"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand Down
13 changes: 7 additions & 6 deletions futures-macro/src/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn bind_futures(fut_exprs: Vec<Expr>, span: Span) -> (Vec<TokenStream2>, Vec<Ide
// Move future into a local so that it is pinned in one place and
// is no longer accessible by the end user.
let mut #name = __futures_crate::future::maybe_done(#expr);
let mut #name = unsafe { __futures_crate::Pin::new_unchecked(&mut #name) };
});
name
})
Expand All @@ -58,12 +59,12 @@ pub(crate) fn join(input: TokenStream) -> TokenStream {
let poll_futures = future_names.iter().map(|fut| {
quote! {
__all_done &= __futures_crate::future::Future::poll(
unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }, __cx).is_ready();
#fut.as_mut(), __cx).is_ready();
}
});
let take_outputs = future_names.iter().map(|fut| {
quote! {
unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }.take_output().unwrap(),
#fut.as_mut().take_output().unwrap(),
}
});

Expand Down Expand Up @@ -96,17 +97,17 @@ pub(crate) fn try_join(input: TokenStream) -> TokenStream {
let poll_futures = future_names.iter().map(|fut| {
quote! {
if __futures_crate::future::Future::poll(
unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }, __cx).is_pending()
#fut.as_mut(), __cx).is_pending()
{
__all_done = false;
} else if unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }.output_mut().unwrap().is_err() {
} else if #fut.as_mut().output_mut().unwrap().is_err() {
// `.err().unwrap()` rather than `.unwrap_err()` so that we don't introduce
// a `T: Debug` bound.
// Also, for an error type of ! any code after `err().unwrap()` is unreachable.
#[allow(unreachable_code)]
return __futures_crate::task::Poll::Ready(
__futures_crate::Err(
unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }.take_output().unwrap().err().unwrap()
#fut.as_mut().take_output().unwrap().err().unwrap()
)
);
}
Expand All @@ -118,7 +119,7 @@ pub(crate) fn try_join(input: TokenStream) -> TokenStream {
// an `E: Debug` bound.
// Also, for an ok type of ! any code after `ok().unwrap()` is unreachable.
#[allow(unreachable_code)]
unsafe { __futures_crate::Pin::new_unchecked(&mut #fut) }.take_output().unwrap().ok().unwrap(),
#fut.as_mut().take_output().unwrap().ok().unwrap(),
}
});

Expand Down
2 changes: 1 addition & 1 deletion futures-sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-sink"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.36"
license = "MIT OR Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion futures-task/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-task"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand Down
16 changes: 8 additions & 8 deletions futures-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-test"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand All @@ -11,13 +11,13 @@ Common utilities for testing components built off futures-rs.
"""

[dependencies]
futures-core = { version = "0.3.24", path = "../futures-core", default-features = false }
futures-task = { version = "0.3.24", path = "../futures-task", default-features = false }
futures-io = { version = "0.3.24", path = "../futures-io", default-features = false }
futures-util = { version = "0.3.24", path = "../futures-util", default-features = false }
futures-executor = { version = "0.3.24", path = "../futures-executor", default-features = false }
futures-sink = { version = "0.3.24", path = "../futures-sink", default-features = false }
futures-macro = { version = "=0.3.24", path = "../futures-macro", default-features = false }
futures-core = { version = "0.3.25", path = "../futures-core", default-features = false }
futures-task = { version = "0.3.25", path = "../futures-task", default-features = false }
futures-io = { version = "0.3.25", path = "../futures-io", default-features = false }
futures-util = { version = "0.3.25", path = "../futures-util", default-features = false }
futures-executor = { version = "0.3.25", path = "../futures-executor", default-features = false }
futures-sink = { version = "0.3.25", path = "../futures-sink", default-features = false }
futures-macro = { version = "=0.3.25", path = "../futures-macro", default-features = false }
pin-utils = { version = "0.1.0", default-features = false }
pin-project = "1.0.11"

Expand Down
14 changes: 7 additions & 7 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-util"
version = "0.3.24"
version = "0.3.25"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -34,12 +34,12 @@ write-all-vectored = ["io"]
cfg-target-has-atomic = []

[dependencies]
futures-core = { path = "../futures-core", version = "0.3.24", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.24", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.3.24", default-features = false, features = ["std"], optional = true }
futures-io = { path = "../futures-io", version = "0.3.24", default-features = false, features = ["std"], optional = true }
futures-sink = { path = "../futures-sink", version = "0.3.24", default-features = false, optional = true }
futures-macro = { path = "../futures-macro", version = "=0.3.24", default-features = false, optional = true }
futures-core = { path = "../futures-core", version = "0.3.25", default-features = false }
futures-task = { path = "../futures-task", version = "0.3.25", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.3.25", default-features = false, features = ["std"], optional = true }
futures-io = { path = "../futures-io", version = "0.3.25", default-features = false, features = ["std"], optional = true }
futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true }
futures-macro = { path = "../futures-macro", version = "=0.3.25", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
Expand Down
6 changes: 6 additions & 0 deletions futures-util/src/sink/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ pub fn drain<T>() -> Drain<T> {

impl<T> Unpin for Drain<T> {}

impl<T> Clone for Drain<T> {
fn clone(&self) -> Self {
drain()
}
}

impl<T> Sink<T> for Drain<T> {
type Error = Never;

Expand Down
1 change: 1 addition & 0 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;

mod iter;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};

mod task;
Expand Down
6 changes: 1 addition & 5 deletions futures-util/src/stream/stream/buffer_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ where
St: Stream,
St::Item: Future,
{
pub(super) fn new(stream: St, n: usize) -> Self
where
St: Stream,
St::Item: Future,
{
pub(super) fn new(stream: St, n: usize) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
Expand Down
7 changes: 2 additions & 5 deletions futures-util/src/stream/stream/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ pin_project! {
}
}

impl<St: Stream> Chunks<St>
where
St: Stream,
{
impl<St: Stream> Chunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Expand Down Expand Up @@ -77,7 +74,7 @@ impl<St: Stream> Stream for Chunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let peek_len = if self.peeked.is_some() { 1 } else { 0 };
let peek_len = usize::from(self.peeked.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(peek_len);
let upper = match upper {
Expand Down
7 changes: 2 additions & 5 deletions futures-util/src/stream/stream/ready_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ pin_project! {
}
}

impl<St: Stream> ReadyChunks<St>
where
St: Stream,
{
impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Expand Down Expand Up @@ -85,7 +82,7 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
if self.done_skipping {
self.stream.size_hint()
} else {
let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/stream/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ where

let (lower, upper) = self.stream.size_hint();

let lower = cmp::min(lower, self.remaining as usize);
let lower = cmp::min(lower, self.remaining);

let upper = match upper {
Some(x) if x < self.remaining as usize => Some(x),
_ => Some(self.remaining as usize),
Some(x) if x < self.remaining => Some(x),
_ => Some(self.remaining),
};

(lower, upper)
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/take_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
return (0, Some(0));
}

let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = if self.future.is_some() { 1 } else { 0 };
let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/stream/stream/zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queued1_len = if self.queued1.is_some() { 1 } else { 0 };
let queued2_len = if self.queued2.is_some() { 1 } else { 0 };
let queued1_len = usize::from(self.queued1.is_some());
let queued2_len = usize::from(self.queued2.is_some());
let (stream1_lower, stream1_upper) = self.stream1.size_hint();
let (stream2_lower, stream2_upper) = self.stream2.size_hint();

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = if self.future.is_some() { 1 } else { 0 };
let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/or_else.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let future_len = if self.future.is_some() { 1 } else { 0 };
let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<St: TryStream> Stream for TryChunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending_fut.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending_fut.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
let pending_len = if self.pending.is_some() { 1 } else { 0 };
let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
Expand Down
Loading