Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions timely/src/dataflow/operators/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, Stream};
use crate::Data;
use crate::dataflow::{Scope, Stream, StreamCore};
use crate::{Container, Data};

/// Extension trait for `Stream`.
pub trait Branch<S: Scope, D: Data> {
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
}

/// Extension trait for `Stream`.
pub trait BranchWhen<S: Scope, D: Data> {
pub trait BranchWhen<T>: Sized {
/// Takes one input stream and splits it into two output streams.
/// For each time, the supplied closure is called. If it returns true,
/// the records for that will be sent to the second returned stream, otherwise
Expand All @@ -91,17 +91,11 @@ pub trait BranchWhen<S: Scope, D: Data> {
/// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
/// });
/// ```
fn branch_when(
&self,
condition: impl Fn(&S::Timestamp) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>);
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
}

impl<S: Scope, D: Data> BranchWhen<S, D> for Stream<S, D> {
fn branch_when(
&self,
condition: impl Fn(&S::Timestamp) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>) {
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
Expand All @@ -110,19 +104,19 @@ impl<S: Scope, D: Data> BranchWhen<S, D> for Stream<S, D> {

builder.build(move |_| {

let mut vector = Vec::new();
let mut container = Default::default();
move |_frontiers| {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();

input.for_each(|time, data| {
data.swap(&mut vector);
data.swap(&mut container);
let mut out = if condition(&time.time()) {
output2_handle.session(&time)
} else {
output1_handle.session(&time)
};
out.give_vec(&mut vector);
out.give_container(&mut container);
});
}
});
Expand Down