From e4a8100b9c977ccf6bd9ac9f1ffecda64b89f450 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 29 Dec 2020 11:56:13 +0100 Subject: [PATCH 1/2] Provide stream functions to handle Result streams Add a `ResultStream` trait providing functions modeled after the native Result type, if their stream counterpart is meaningful. This includes functions to extract and map `Ok` or `Err` variants, as well as unwrapping, and chaining results with `and_then`. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/mod.rs | 2 + timely/src/dataflow/operators/result.rs | 212 ++++++++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 timely/src/dataflow/operators/result.rs diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 0f07a4fb8..2c4bbc99e 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -26,6 +26,7 @@ pub use self::to_stream::ToStream; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; +pub use self::result::ResultStream; pub use self::generic::Operator; pub use self::generic::{Notificator, FrontierNotificator}; @@ -51,6 +52,7 @@ pub mod to_stream; pub mod capture; pub mod branch; pub mod ok_err; +pub mod result; pub mod aggregation; pub mod generic; diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs new file mode 100644 index 000000000..a233dba8d --- /dev/null +++ b/timely/src/dataflow/operators/result.rs @@ -0,0 +1,212 @@ +//! Extension methods for `Stream` containing `Result`s. + +use crate::Data; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::Operator; +use crate::dataflow::operators::Map; +use crate::dataflow::{Scope, Stream}; + +/// Extension trait for `Stream`. +pub trait ResultStream { + /// Returns a new instance of `self` containing only `ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .ok() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn ok(&self) -> Stream; + + /// Returns a new instance of `self` containing only `err` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .err() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn err(&self) -> Stream; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .map_ok(|x| x + 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map_ok T2 + 'static>(&self, logic: L) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Err` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .map_err(|_| 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map_err E2 + 'static>(&self, logic: L) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` + /// records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(0), Err(())].to_stream(scope) + /// .and_then(|x| Ok(1 + 1)) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn and_then Result + 'static>( + &self, + logic: L, + ) -> Stream>; + + /// Returns a new instance of `self` applying `logic` on all `Ok` records. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// + /// timely::example(|scope| { + /// vec![Ok(1), Err(())].to_stream(scope) + /// .unwrap_or_else(|_| 0) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn unwrap_or_else T + 'static>(&self, logic: L) -> Stream; +} + +impl ResultStream for Stream> { + fn ok(&self) -> Stream { + let mut buffer = vec![]; + self.unary(Pipeline, "Result::ok", move |_, _| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut buffer); + buffer.retain(Result::is_ok); + if !buffer.is_empty() { + output + .session(&time) + .give_iterator(buffer.drain(..).map(|e| e.ok().unwrap()), + ); + } + }) + }) + } + + fn err(&self) -> Stream { + let mut buffer = vec![]; + self.unary(Pipeline, "Result::err", move |_, _| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut buffer); + buffer.retain(Result::is_err); + if !buffer.is_empty() { + output + .session(&time) + .give_iterator(buffer.drain(..).map(|r| r.err().unwrap())); + } + }) + }) + } + + fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.map(|x| logic(x))) + } + + fn map_err E2 + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.map_err(|x| logic(x))) + } + + fn and_then Result + 'static>(&self, mut logic: L) -> Stream> { + self.map(move |r| r.and_then(|x| logic(x))) + } + + fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream { + self.map(move |r| r.unwrap_or_else(|err| logic(err))) + } +} + +#[cfg(test)] +mod tests { + use crate::dataflow::operators::{ToStream, ResultStream, Capture, capture::Extract}; + + #[test] + fn test_ok() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .ok() + .capture() + }); + assert_eq!(output.extract()[0].1, vec![0]); + } + + #[test] + fn test_err() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .err() + .capture() + }); + assert_eq!(output.extract()[0].1, vec![()]); + } + + #[test] + fn test_map_ok() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .map_ok(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(10), Err(())]); + } + + #[test] + fn test_map_err() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .map_err(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(0), Err(10)]); + } + + #[test] + fn test_and_then() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .and_then(|_| Ok(1)) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![Ok(1), Err(())]); + } + + #[test] + fn test_unwrap_or_else() { + let output = crate::example(|scope| { + vec![Ok(0), Err(())].to_stream(scope) + .unwrap_or_else(|_| 10) + .capture() + }); + assert_eq!(output.extract()[0].1, vec![0, 10]); + } +} From 8e78df924153da02615f45c1cc42154df2ce54f4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 29 Dec 2020 12:24:51 +0100 Subject: [PATCH 2/2] Define ResultStream err/ok in terms of flat_map Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/result.rs | 29 ++----------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs index a233dba8d..3607f9f8e 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/result.rs @@ -1,8 +1,6 @@ //! Extension methods for `Stream` containing `Result`s. use crate::Data; -use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::operators::generic::Operator; use crate::dataflow::operators::Map; use crate::dataflow::{Scope, Stream}; @@ -99,34 +97,11 @@ pub trait ResultStream { impl ResultStream for Stream> { fn ok(&self) -> Stream { - let mut buffer = vec![]; - self.unary(Pipeline, "Result::ok", move |_, _| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut buffer); - buffer.retain(Result::is_ok); - if !buffer.is_empty() { - output - .session(&time) - .give_iterator(buffer.drain(..).map(|e| e.ok().unwrap()), - ); - } - }) - }) + self.flat_map(Result::ok) } fn err(&self) -> Stream { - let mut buffer = vec![]; - self.unary(Pipeline, "Result::err", move |_, _| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut buffer); - buffer.retain(Result::is_err); - if !buffer.is_empty() { - output - .session(&time) - .give_iterator(buffer.drain(..).map(|r| r.err().unwrap())); - } - }) - }) + self.flat_map(Result::err) } fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream> {