diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs similarity index 87% rename from timely/src/dataflow/operators/enterleave.rs rename to timely/src/dataflow/operators/core/enterleave.rs index 97ada7ae2..23da0e201 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -25,16 +25,14 @@ use crate::logging::{TimelyLogger, MessagesEvent}; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; -use crate::order::Product; use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::{Bundle, Message}; use crate::worker::AsWorker; -use crate::dataflow::{StreamCore, Scope, Stream}; -use crate::dataflow::scopes::{Child, ScopeParent}; -use crate::dataflow::operators::delay::Delay; +use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::scopes::Child; /// Extension trait to move a `Stream` into a child of its current `Scope`. pub trait Enter, C: Container> { @@ -55,34 +53,6 @@ pub trait Enter, C: Container> { fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore, C>; } -use crate::dataflow::scopes::child::Iterative; - -/// Extension trait to move a `Stream` into a child of its current `Scope` setting the timestamp for each element. -pub trait EnterAt { - /// Moves the `Stream` argument into a child of its current `Scope` setting the timestamp for each element by `initial`. - /// - /// # Examples - /// ``` - /// use timely::dataflow::scopes::Scope; - /// use timely::dataflow::operators::{EnterAt, Leave, ToStream}; - /// - /// timely::example(|outer| { - /// let stream = (0..9u64).to_stream(outer); - /// let output = outer.iterative(|inner| { - /// stream.enter_at(inner, |x| *x).leave() - /// }); - /// }); - /// ``` - fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream, D> ; -} - -impl::Timestamp, T>, Vec>> EnterAt for E { - fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) -> - Stream, D> { - self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum))) - } -} - impl, C: Data+Container> Enter for StreamCore { fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 758eb5231..d81a5c625 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -2,6 +2,7 @@ //! are independent of specific container types. pub mod concat; +pub mod enterleave; pub mod exchange; pub mod feedback; pub mod inspect; @@ -10,6 +11,7 @@ pub mod rc; pub mod reclock; pub use concat::{Concat, Concatenate}; +pub use enterleave::{Enter, Leave}; pub use exchange::Exchange; pub use feedback::{Feedback, LoopVariable, ConnectLoop}; pub use inspect::{Inspect, InspectCore}; diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 29f62299b..458566ddc 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -8,7 +8,6 @@ //! operators whose behavior can be supplied using closures accepting input and output handles. //! Most of the operators in this module are defined using these two general operators. -pub use self::enterleave::{Enter, EnterAt, Leave}; pub use self::input::Input; pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; pub use self::partition::Partition; @@ -32,7 +31,7 @@ pub use self::count::Accumulate; pub mod core; -pub mod enterleave; +pub use self::core::enterleave::{self, Enter, Leave}; pub mod input; pub mod flow_controlled; pub mod unordered_input;