diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 5bced98cd..aab1e0b03 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -11,39 +11,39 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type Bundle = crate::communication::Message>; +pub type Bundle = crate::communication::Message>; /// A serializable representation of timestamped data. #[derive(Clone, Abomonation, Serialize, Deserialize)] -pub struct Message { +pub struct Message { /// The timestamp associated with the message. pub time: T, /// The data in the message. - pub data: D, + pub data: C, /// The source worker. pub from: usize, /// A sequence number for this worker-to-worker stream. pub seq: usize, } -impl Message { +impl Message { /// Default buffer size. #[deprecated = "Use timely::buffer::default_capacity instead"] pub fn default_length() -> usize { - crate::container::buffer::default_capacity::() + crate::container::buffer::default_capacity::() } } -impl Message { +impl Message { /// Creates a new message instance from arguments. - pub fn new(time: T, data: D, from: usize, seq: usize) -> Self { + pub fn new(time: T, data: C, from: usize, seq: usize) -> Self { Message { time, data, from, seq } } /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. #[inline] - pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 3e0ba21f7..abf6fef3a 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -20,11 +20,11 @@ use crate::progress::Timestamp; use crate::worker::AsWorker; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. -pub trait ParallelizationContract { +pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); } @@ -33,13 +33,11 @@ pub trait ParallelizationContract { #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContract for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; +impl ParallelizationContract for Pipeline { + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (pusher, puller) = allocator.pipeline::>(identifier, address); - // // ignore `&mut A` and use thread allocator - // let (pusher, puller) = Thread::new::>>(); + let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) } @@ -89,17 +87,17 @@ impl Debug for ExchangeCore { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, source: usize, target: usize, - phantom: PhantomData<(T, D)>, + phantom: PhantomData<(T, C)>, logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -114,9 +112,9 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; @@ -145,15 +143,15 @@ impl>> Push> for LogPusher` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, - phantom: PhantomData<(T, D)>, + phantom: PhantomData<(T, C)>, logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -166,9 +164,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 848b9a8f5..8efba9318 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -9,10 +9,10 @@ use crate::communication::Pull; use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, - phantom: ::std::marker::PhantomData, + phantom: ::std::marker::PhantomData, } /// A guard type that updates the change batch counts on drop @@ -36,15 +36,15 @@ impl Drop for ConsumedGuard { } } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut Bundle> { + pub fn next(&mut self) -> Option<&mut Bundle> { self.next_guarded().map(|(_guard, bundle)| bundle) } #[inline] - pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { if let Some(message) = self.pullable.pull() { let guard = ConsumedGuard { consumed: Rc::clone(&self.consumed), @@ -57,7 +57,7 @@ impl>> Counter } } -impl>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 27e95377f..5262d36c9 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -13,11 +13,11 @@ use crate::{Container, Data}; /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct Buffer>> { +pub struct Buffer>> { /// the currently open time, if it is open time: Option, /// a buffer for records, to send at self.time - buffer: D, + buffer: C, pusher: P, } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index c40e3ddec..11b72946a 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -11,15 +11,15 @@ use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct Counter>> { +pub struct Counter>> { pushee: P, produced: Rc>>, - phantom: PhantomData, + phantom: PhantomData, } -impl Push> for Counter where P: Push> { +impl Push> for Counter where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -31,9 +31,9 @@ impl Push> for Counter wher } } -impl>> Counter where T : Ord+Clone+'static { +impl>> Counter where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. - pub fn new(pushee: P) -> Counter { + pub fn new(pushee: P) -> Counter { Counter { pushee, produced: Rc::new(RefCell::new(ChangeBatch::new())), diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 210c9d149..f5cbfb226 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -9,17 +9,17 @@ use crate::dataflow::channels::{Bundle, Message}; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. -pub struct Tee { - buffer: D, - shared: PushList, +pub struct Tee { + buffer: C, + shared: PushList, } -impl Push> for Tee { +impl Push> for Tee { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { @@ -39,9 +39,9 @@ impl Push> for Tee { } } -impl Tee { +impl Tee { /// Allocates a new pair of `Tee` and `TeeHelper`. - pub fn new() -> (Tee, TeeHelper) { + pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); let port = Tee { buffer: Default::default(), @@ -52,7 +52,7 @@ impl Tee { } } -impl Clone for Tee { +impl Clone for Tee { fn clone(&self) -> Self { Self { buffer: Default::default(), @@ -61,9 +61,9 @@ impl Clone for Tee { } } -impl Debug for Tee +impl Debug for Tee where - D: Debug, + C: Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut debug = f.debug_struct("Tee"); @@ -80,18 +80,18 @@ where } /// A shared list of `Box` used to add `Push` implementors. -pub struct TeeHelper { - shared: PushList, +pub struct TeeHelper { + shared: PushList, } -impl TeeHelper { +impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } -impl Clone for TeeHelper { +impl Clone for TeeHelper { fn clone(&self) -> Self { TeeHelper { shared: self.shared.clone(), @@ -99,7 +99,7 @@ impl Clone for TeeHelper { } } -impl Debug for TeeHelper { +impl Debug for TeeHelper { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut debug = f.debug_struct("TeeHelper"); diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index c62b95417..9aa3827de 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -17,7 +17,7 @@ use crate::progress::Timestamp; use super::{EventCore, EventPusherCore}; /// Capture a stream of timestamped data for later replay. -pub trait Capture { +pub trait Capture { /// Captures a stream of timestamped data for later replay. /// /// # Examples @@ -103,18 +103,18 @@ pub trait Capture { /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// ``` - fn capture_into+'static>(&self, pusher: P); + fn capture_into+'static>(&self, pusher: P); /// Captures a stream using Rust's MPSC channels. - fn capture(&self) -> ::std::sync::mpsc::Receiver> { + fn capture(&self) -> ::std::sync::mpsc::Receiver> { let (send, recv) = ::std::sync::mpsc::channel(); self.capture_into(send); recv } } -impl Capture for StreamCore { - fn capture_into+'static>(&self, mut event_pusher: P) { +impl Capture for StreamCore { + fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index 252b7361c..a5b5f07b8 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -6,25 +6,25 @@ /// Data and progress events of the captured stream. #[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] -pub enum EventCore { +pub enum EventCore { /// Progress received via `push_external_progress`. Progress(Vec<(T, i64)>), /// Messages received via the data stream. - Messages(T, D), + Messages(T, C), } /// Data and progress events of the captured stream, specialized to vector-based containers. pub type Event = EventCore>; -/// Iterates over contained `EventCore`. +/// Iterates over contained `EventCore`. /// /// The `EventIterator` trait describes types that can iterate over references to events, /// and which can be used to replay a stream into a new timely dataflow computation. /// /// This method is not simply an iterator because of the lifetime in the result. -pub trait EventIteratorCore { - /// Iterates over references to `EventCore` elements. - fn next(&mut self) -> Option<&EventCore>; +pub trait EventIteratorCore { + /// Iterates over references to `EventCore` elements. + fn next(&mut self) -> Option<&EventCore>; } /// A [EventIteratorCore] specialized to vector-based containers. @@ -40,10 +40,10 @@ impl>> EventIterator for E { } -/// Receives `EventCore` events. -pub trait EventPusherCore { +/// Receives `EventCore` events. +pub trait EventPusherCore { /// Provides a new `Event` to the pusher. - fn push(&mut self, event: EventCore); + fn push(&mut self, event: EventCore); } /// A [EventPusherCore] specialized to vector-based containers. @@ -53,8 +53,8 @@ impl>> EventPusher for E {} // implementation for the linked list behind a `Handle`. -impl EventPusherCore for ::std::sync::mpsc::Sender> { - fn push(&mut self, event: EventCore) { +impl EventPusherCore for ::std::sync::mpsc::Sender> { + fn push(&mut self, event: EventCore) { // NOTE: An Err(x) result just means "data not accepted" most likely // because the receiver is gone. No need to panic. let _ = self.send(event); @@ -69,38 +69,38 @@ pub mod link { use super::{EventCore, EventPusherCore, EventIteratorCore}; - /// A linked list of EventCore. - pub struct EventLinkCore { + /// A linked list of EventCore. + pub struct EventLinkCore { /// An event, if one exists. /// /// An event might not exist, if either we want to insert a `None` and have the output iterator pause, /// or in the case of the very first linked list element, which has no event when constructed. - pub event: Option>, + pub event: Option>, /// The next event, if it exists. - pub next: RefCell>>>, + pub next: RefCell>>>, } /// A [EventLinkCore] specialized to vector-based containers. pub type EventLink = EventLinkCore>; - impl EventLinkCore { + impl EventLinkCore { /// Allocates a new `EventLink`. - pub fn new() -> EventLinkCore { + pub fn new() -> EventLinkCore { EventLinkCore { event: None, next: RefCell::new(None) } } } // implementation for the linked list behind a `Handle`. - impl EventPusherCore for Rc> { - fn push(&mut self, event: EventCore) { + impl EventPusherCore for Rc> { + fn push(&mut self, event: EventCore) { *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) })); let next = self.next.borrow().as_ref().unwrap().clone(); *self = next; } } - impl EventIteratorCore for Rc> { - fn next(&mut self) -> Option<&EventCore> { + impl EventIteratorCore for Rc> { + fn next(&mut self) -> Option<&EventCore> { let is_some = self.next.borrow().is_some(); if is_some { let next = self.next.borrow().as_ref().unwrap().clone(); @@ -114,7 +114,7 @@ pub mod link { } // Drop implementation to prevent stack overflow through naive drop impl. - impl Drop for EventLinkCore { + impl Drop for EventLinkCore { fn drop(&mut self) { while let Some(link) = self.next.replace(None) { if let Ok(head) = Rc::try_unwrap(link) { @@ -124,7 +124,7 @@ pub mod link { } } - impl Default for EventLinkCore { + impl Default for EventLinkCore { fn default() -> Self { Self::new() } @@ -147,16 +147,16 @@ pub mod binary { use abomonation::Abomonation; use super::{EventCore, EventPusherCore, EventIteratorCore}; - /// A wrapper for `W: Write` implementing `EventPusherCore`. - pub struct EventWriterCore { + /// A wrapper for `W: Write` implementing `EventPusherCore`. + pub struct EventWriterCore { stream: W, - phant: ::std::marker::PhantomData<(T,D)>, + phant: ::std::marker::PhantomData<(T, C)>, } /// [EventWriterCore] specialized to vector-based containers. pub type EventWriter = EventWriterCore, W>; - impl EventWriterCore { + impl EventWriterCore { /// Allocates a new `EventWriter` wrapping a supplied writer. pub fn new(w: W) -> Self { Self { @@ -166,28 +166,28 @@ pub mod binary { } } - impl EventPusherCore for EventWriterCore { - fn push(&mut self, event: EventCore) { + impl EventPusherCore for EventWriterCore { + fn push(&mut self, event: EventCore) { // TODO: `push` has no mechanism to report errors, so we `unwrap`. unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); } } } /// A Wrapper for `R: Read` implementing `EventIterator`. - pub struct EventReaderCore { + pub struct EventReaderCore { reader: R, bytes: Vec, buff1: Vec, buff2: Vec, consumed: usize, valid: usize, - phant: ::std::marker::PhantomData<(T,D)>, + phant: ::std::marker::PhantomData<(T, C)>, } /// [EventReaderCore] specialized to vector-based containers. pub type EventReader = EventReaderCore, R>; - impl EventReaderCore { + impl EventReaderCore { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(r: R) -> Self { Self { @@ -202,12 +202,12 @@ pub mod binary { } } - impl EventIteratorCore for EventReaderCore { - fn next(&mut self) -> Option<&EventCore> { + impl EventIteratorCore for EventReaderCore { + fn next(&mut self) -> Option<&EventCore> { // if we can decode something, we should just return it! :D - if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { - let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); + if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { + let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); self.consumed = self.valid - rest.len(); return Some(item); } diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/concat.rs index 449609f5b..f69fd98a7 100644 --- a/timely/src/dataflow/operators/concat.rs +++ b/timely/src/dataflow/operators/concat.rs @@ -6,7 +6,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{StreamCore, Scope}; /// Merge the contents of two streams. -pub trait Concat { +pub trait Concat { /// Merge the contents of two streams. /// /// # Examples @@ -20,17 +20,17 @@ pub trait Concat { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(&self, _: &StreamCore) -> StreamCore; + fn concat(&self, _: &StreamCore) -> StreamCore; } -impl Concat for StreamCore { - fn concat(&self, other: &StreamCore) -> StreamCore { +impl Concat for StreamCore { + fn concat(&self, other: &StreamCore) -> StreamCore { self.scope().concatenate([self.clone(), other.clone()]) } } /// Merge the contents of multiple streams. -pub trait Concatenate { +pub trait Concatenate { /// Merge the contents of multiple streams. /// /// # Examples @@ -47,25 +47,25 @@ pub trait Concatenate { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concatenate(&self, sources: I) -> StreamCore + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator>; + I: IntoIterator>; } -impl Concatenate for StreamCore { - fn concatenate(&self, sources: I) -> StreamCore +impl Concatenate for StreamCore { + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator> + I: IntoIterator> { let clone = self.clone(); self.scope().concatenate(Some(clone).into_iter().chain(sources)) } } -impl Concatenate for G { - fn concatenate(&self, sources: I) -> StreamCore +impl Concatenate for G { + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator> + I: IntoIterator> { // create an operator builder. diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index af2eaf00f..97ada7ae2 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -115,7 +115,7 @@ impl, C: Data+Container> Enter { +pub trait Leave { /// Moves a `Stream` to the parent of its current `Scope`. /// /// # Examples @@ -130,17 +130,17 @@ pub trait Leave { /// }); /// }); /// ``` - fn leave(&self) -> StreamCore; + fn leave(&self) -> StreamCore; } -impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, D> { - fn leave(&self) -> StreamCore { +impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, C> { + fn leave(&self) -> StreamCore { let scope = self.scope(); let output = scope.subgraph.borrow_mut().new_output(); let target = Target::new(0, output.port); - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = Tee::::new(); let egress = EgressNub { targets, phantom: PhantomData }; let channel_id = scope.clone().new_identifier(); @@ -160,15 +160,15 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave } -struct IngressNub, TData: Container> { - targets: Counter>, +struct IngressNub, TContainer: Container> { + targets: Counter>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TData: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { +impl, TContainer: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); let data = ::std::mem::take(&mut outer_message.data); @@ -192,14 +192,14 @@ impl, TData: Container> Pus } -struct EgressNub, TData: Data> { - targets: Tee, +struct EgressNub, TContainer: Data> { + targets: Tee, phantom: PhantomData, } -impl Push> for EgressNub -where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { +impl Push> for EgressNub +where TOuter: Timestamp, TInner: Timestamp+Refines, TContainer: Data { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { let inner_message = message.as_mut(); let data = ::std::mem::take(&mut inner_message.data); @@ -241,12 +241,12 @@ impl

LogPusher

{ } } -impl Push> for LogPusher

+impl Push> for LogPusher

where - D: Container, - P: Push>, + C: Container, + P: Push>, { - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) { if let Some(bundle) = element { let send_event = MessagesEvent { is_send: true, diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index 038ca7e09..80e623df1 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -59,7 +59,7 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -87,7 +87,7 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, C>, StreamCore, C>); } impl Feedback for G { @@ -95,7 +95,7 @@ impl Feedback for G { self.feedback_core(summary) } - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); @@ -105,13 +105,13 @@ impl Feedback for G { } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, C>, StreamCore, C>) { self.feedback_core(Product::new(Default::default(), summary)) } } /// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { +pub trait ConnectLoop { /// Connect a `Stream` to be the input of a loop variable. /// /// # Examples @@ -129,11 +129,11 @@ pub trait ConnectLoop { /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(&self, _: HandleCore); + fn connect_loop(&self, _: HandleCore); } -impl ConnectLoop for StreamCore { - fn connect_loop(&self, helper: HandleCore) { +impl ConnectLoop for StreamCore { + fn connect_loop(&self, helper: HandleCore) { let mut builder = helper.builder; let summary = helper.summary; @@ -159,10 +159,10 @@ impl ConnectLoop for StreamCore { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct HandleCore { +pub struct HandleCore { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper>, + output: OutputWrapper>, } /// A `HandleCore` specialized for using `Vec` as container diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 10899b9be..9522b6f34 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -105,17 +105,17 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller where - P: ParallelizationContract { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller where - P: ParallelizationContract { + P: ParallelizationContract { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); @@ -131,16 +131,16 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (Tee, StreamCore) { + pub fn new_output(&mut self) -> (Tee, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = Tee::::new(); let source = Source::new(self.index, self.shape.outputs); let stream = StreamCore::new(source, registrar, self.scope.clone()); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index f753ef09a..3d2ebbe29 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -59,9 +59,9 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore + pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore where - P: ParallelizationContract { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()]; self.new_input_connection(stream, pact, connection) @@ -75,9 +75,9 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore where - P: ParallelizationContract { + P: ParallelizationContract { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); @@ -92,7 +92,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +105,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index b5157f4ce..ee4059bb4 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -22,8 +22,8 @@ use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandleCore>> { - pull_counter: PullCounter, +pub struct InputHandleCore>> { + pull_counter: PullCounter, internal: Rc>>>>>, /// Timestamp summaries from this input to each output. /// @@ -37,9 +37,9 @@ pub struct InputHandleCore>> { pub type InputHandle = InputHandleCore, P>; /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { +pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull>+'a> { /// The underlying input handle. - pub handle: &'a mut InputHandleCore, + pub handle: &'a mut InputHandleCore, /// The frontier as reported by timely progress tracking. pub frontier: &'a MutableAntichain, } @@ -47,13 +47,13 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull< /// Handle to an operator's input stream and frontier, specialized to vectors. pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { +impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { let internal = &self.internal; let summaries = &self.summaries; self.pull_counter.next_guarded().map(|(guard, bundle)| { @@ -87,7 +87,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore, RefOrMut)>(&mut self, mut logic: F) { + pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { let mut logging = self.logging.take(); while let Some((cap, data)) = self.next() { logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); @@ -99,9 +99,9 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore>+'a> FrontieredInputHandleCore<'a, T, D, P> { +impl<'a, T: Timestamp, C: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, C, P> { /// Allocate a new frontiered input handle. - pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { + pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { FrontieredInputHandleCore { handle, frontier, @@ -112,7 +112,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHa /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { self.handle.next() } @@ -135,7 +135,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHa /// }); /// ``` #[inline] - pub fn for_each, RefOrMut)>(&mut self, logic: F) { + pub fn for_each, RefOrMut)>(&mut self, logic: F) { self.handle.for_each(logic) } @@ -146,18 +146,18 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHa } } -pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>( - pull_counter: PullCounter, +pub fn new_input_handle>>( + pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>>, logging: Option -) -> InputHandleCore { +) -> InputHandleCore { InputHandleCore { pull_counter, internal, @@ -172,14 +172,14 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: Buffer>, +pub struct OutputWrapper>> { + push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -189,7 +189,7 @@ impl>> OutputWrapper { /// /// This method ensures that the only access to the push buffer is through the `OutputHandle` /// type which ensures the use of capabilities, and which calls `cease` when it is dropped. - pub fn activate(&mut self) -> OutputHandleCore { + pub fn activate(&mut self) -> OutputHandleCore { OutputHandleCore { push_buffer: &mut self.push_buffer, internal_buffer: &self.internal_buffer, diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 5ba6a211f..45571153a 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -15,7 +15,7 @@ use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNoti use crate::Container; /// Methods to construct generic streaming and blocking operators. -pub trait Operator { +pub trait Operator { /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input. @@ -55,13 +55,13 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + C2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -92,12 +92,12 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_notify, - &mut OutputHandleCore>, + fn unary_notify, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + P: ParallelizationContract> + (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -127,13 +127,13 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + C2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContract; + L: FnMut(&mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -185,16 +185,16 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + C2: Container, + C3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore, + &mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -241,15 +241,15 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_notify, - &mut InputHandleCore, - &mut OutputHandleCore>, + fn binary_notify, + &mut InputHandleCore, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + P1: ParallelizationContract, + P2: ParallelizationContract> + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -285,16 +285,16 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + C2: Container, + C3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream @@ -321,19 +321,19 @@ pub trait Operator { /// ``` fn sink(&self, pact: P, name: &str, logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore)+'static, + P: ParallelizationContract; } -impl Operator for StreamCore { +impl Operator for StreamCore { - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + C2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -355,12 +355,12 @@ impl Operator for StreamCore { stream } - fn unary_notify, - &mut OutputHandleCore>, + fn unary_notify, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + P: ParallelizationContract> + (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -377,13 +377,13 @@ impl Operator for StreamCore { }) } - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Container, + C2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContract { + L: FnMut(&mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -405,16 +405,16 @@ impl Operator for StreamCore { stream } - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + C2: Container, + C3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore, + &mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -438,15 +438,15 @@ impl Operator for StreamCore { stream } - fn binary_notify, - &mut InputHandleCore, - &mut OutputHandleCore>, + fn binary_notify, + &mut InputHandleCore, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + P1: ParallelizationContract, + P2: ParallelizationContract> + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -465,16 +465,16 @@ impl Operator for StreamCore { } - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Container, - D3: Container, + C2: Container, + C3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -499,8 +499,8 @@ impl Operator for StreamCore { fn sink(&self, pact: P, name: &str, mut logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let mut input = builder.new_input(self, pact); @@ -555,11 +555,11 @@ impl Operator for StreamCore { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore +pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore where - D: Container, + C: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandleCore>)+'static { + L: FnMut(&mut OutputHandleCore>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); @@ -599,7 +599,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> StreamCore { +pub fn empty(scope: &G) -> StreamCore { source(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 3be59db31..90eb45810 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -93,7 +93,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, C>, StreamCore); /// Create a new stream from a supplied interactive handle. /// @@ -157,7 +157,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, C>) -> StreamCore; } use crate::order::TotalOrder; @@ -170,14 +170,14 @@ impl Input for G where ::Timestamp: TotalOrder { self.input_from_core(handle) } - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, C>, StreamCore) { let mut handle = HandleCore::new(); let stream = self.input_from_core(&mut handle); (handle, stream) } - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { - let (output, registrar) = Tee::<::Timestamp, D>::new(); + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, C>) -> StreamCore { + let (output, registrar) = Tee::<::Timestamp, C>::new(); let counter = Counter::new(output); let produced = counter.produced().clone(); @@ -258,7 +258,7 @@ pub struct HandleCore { /// A handle specialized to vector-based containers. pub type Handle = HandleCore>; -impl HandleCore { +impl HandleCore { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -322,7 +322,7 @@ impl HandleCore { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + pub fn to_stream(&mut self, scope: &mut G) -> StreamCore where T: TotalOrder, G: ScopeParent, @@ -332,7 +332,7 @@ impl HandleCore { fn register( &mut self, - pusher: Counter>, + pusher: Counter>, progress: Rc>>, ) { // flush current contents, so new registrant does not see existing data. @@ -407,7 +407,7 @@ impl HandleCore { /// } /// }); /// ``` - pub fn send_batch(&mut self, buffer: &mut D) { + pub fn send_batch(&mut self, buffer: &mut C) { if !buffer.is_empty() { // flush buffered elements to ensure local fifo. diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 982303a59..592899a22 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -16,7 +16,7 @@ use crate::dataflow::{StreamCore, Scope}; use crate::Container; /// Monitors progress at a `Stream`. -pub trait Probe { +pub trait Probe { /// Constructs a progress probe which indicates which timestamps have elapsed at the operator. /// /// # Examples @@ -76,10 +76,10 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &Handle) -> StreamCore; + fn probe_with(&self, handle: &Handle) -> StreamCore; } -impl Probe for StreamCore { +impl Probe for StreamCore { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. @@ -87,7 +87,7 @@ impl Probe for StreamCore { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &Handle) -> StreamCore { + fn probe_with(&self, handle: &Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index ea6084770..86c6b191c 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -142,14 +142,14 @@ pub trait UnorderedInputCore { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); } impl UnorderedInputCore for G { - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { - let (output, registrar) = Tee::::new(); + let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); @@ -215,19 +215,19 @@ impl Operate for UnorderedOperator { /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandleCore { - buffer: PushBuffer>>, +pub struct UnorderedHandleCore { + buffer: PushBuffer>>, } -impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { +impl UnorderedHandleCore { + fn new(pusher: PushCounter>) -> UnorderedHandleCore { UnorderedHandleCore { buffer: PushBuffer::new(pusher), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index e69520729..8eafd5a4b 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -15,29 +15,29 @@ use crate::Container; // use dataflow::scopes::root::loggers::CHANNELS_Q; -/// Abstraction of a stream of `D: Container` records timestamped with `S::Timestamp`. +/// Abstraction of a stream of `C: Container` records timestamped with `S::Timestamp`. /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. #[derive(Clone)] -pub struct StreamCore { +pub struct StreamCore { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. scope: S, - /// Maintains a list of Push>> interested in the stream's output. - ports: TeeHelper, + /// Maintains a list of Push> interested in the stream's output. + ports: TeeHelper, } /// A stream batching data in vectors. pub type Stream = StreamCore>; -impl StreamCore { +impl StreamCore { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { @@ -51,7 +51,7 @@ impl StreamCore { self.ports.add_pusher(pusher); } /// Allocates a `Stream` from a supplied `Source` name and rendezvous point. - pub fn new(source: Source, output: TeeHelper, scope: S) -> Self { + pub fn new(source: Source, output: TeeHelper, scope: S) -> Self { Self { name: source, ports: output, scope } } /// The name of the stream's source operator. @@ -60,7 +60,7 @@ impl StreamCore { pub fn scope(&self) -> S { self.scope.clone() } } -impl Debug for StreamCore +impl Debug for StreamCore where S: Scope, {