Skip to content

Commit 8442e15

Browse files
authored
Container-invariant Reclock operator (#474)
The reclock operator delays batches of data based on a clock input. With this change, it can not only hold back vector-based container but any type of container. Signed-off-by: Moritz Hoffmann <[email protected]> Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 0fce4ab commit 8442e15

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

timely/src/dataflow/operators/reclock.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
//! Extension methods for `Stream` based on record-by-record transformation.
22
3-
use crate::Data;
3+
use crate::Container;
44
use crate::order::PartialOrder;
5-
use crate::dataflow::{Stream, Scope};
5+
use crate::dataflow::{Scope, StreamCore};
66
use crate::dataflow::channels::pact::Pipeline;
77
use crate::dataflow::operators::generic::operator::Operator;
88

99
/// Extension trait for reclocking a stream.
10-
pub trait Reclock<S: Scope, D: Data> {
10+
pub trait Reclock<S: Scope> {
1111
/// Delays records until an input is observed on the `clock` input.
1212
///
1313
/// The source stream is buffered until a record is seen on the clock input,
@@ -45,19 +45,19 @@ pub trait Reclock<S: Scope, D: Data> {
4545
/// assert_eq!(extracted[1], (5, vec![4,5]));
4646
/// assert_eq!(extracted[2], (8, vec![6,7,8]));
4747
/// ```
48-
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D>;
48+
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> Self;
4949
}
5050

51-
impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D> {
52-
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D> {
51+
impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C> {
52+
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {
5353

5454
let mut stash = vec![];
5555

5656
self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| {
5757

5858
// stash each data input with its timestamp.
5959
input1.for_each(|cap, data| {
60-
stash.push((cap.time().clone(), data.replace(Vec::new())));
60+
stash.push((cap.time().clone(), data.replace(Default::default())));
6161
});
6262

6363
// request notification at time, to flush stash.
@@ -70,7 +70,7 @@ impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D> {
7070
let mut session = output.session(&cap);
7171
for &mut (ref t, ref mut data) in &mut stash {
7272
if t.less_equal(cap.time()) {
73-
session.give_vec(data);
73+
session.give_container(data);
7474
}
7575
}
7676
stash.retain(|x| !x.0.less_equal(cap.time()));

0 commit comments

Comments
 (0)