From 336f8cb21de37ff9795d81bf300edb6762b4d047 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 15 Jul 2022 17:54:49 +0200 Subject: [PATCH] Container-invariant BranchWhen operator As the title says, this changes the BranchWhen operator to work on streams of arbitrary containers. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/branch.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 5d2000a18..70e087abd 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -2,8 +2,8 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, Stream}; -use crate::Data; +use crate::dataflow::{Scope, Stream, StreamCore}; +use crate::{Container, Data}; /// Extension trait for `Stream`. pub trait Branch { @@ -71,7 +71,7 @@ impl Branch for Stream { } /// Extension trait for `Stream`. -pub trait BranchWhen { +pub trait BranchWhen: Sized { /// Takes one input stream and splits it into two output streams. /// For each time, the supplied closure is called. If it returns true, /// the records for that will be sent to the second returned stream, otherwise @@ -91,17 +91,11 @@ pub trait BranchWhen { /// after_five.inspect(|x| println!("Times 5 and later: {:?}", x)); /// }); /// ``` - fn branch_when( - &self, - condition: impl Fn(&S::Timestamp) -> bool + 'static, - ) -> (Stream, Stream); + fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for Stream { - fn branch_when( - &self, - condition: impl Fn(&S::Timestamp) -> bool + 'static, - ) -> (Stream, Stream) { +impl BranchWhen for StreamCore { + fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); @@ -110,19 +104,19 @@ impl BranchWhen for Stream { builder.build(move |_| { - let mut vector = Vec::new(); + let mut container = Default::default(); move |_frontiers| { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); input.for_each(|time, data| { - data.swap(&mut vector); + data.swap(&mut container); let mut out = if condition(&time.time()) { output2_handle.session(&time) } else { output1_handle.session(&time) }; - out.give_vec(&mut vector); + out.give_container(&mut container); }); } });