diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index a6a3c33a9..043317fd8 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -16,7 +16,7 @@ pub use self::stream::{StreamCore, Stream}; pub use self::scopes::{Scope, ScopeParent}; -pub use self::operators::input::HandleCore as InputHandleCore; +pub use self::operators::core::input::Handle as InputHandleCore; pub use self::operators::input::Handle as InputHandle; pub use self::operators::probe::Handle as ProbeHandle; diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs new file mode 100644 index 000000000..c6585162a --- /dev/null +++ b/timely/src/dataflow/operators/core/filter.rs @@ -0,0 +1,41 @@ +//! Filters a stream by a predicate. +use timely_container::{Container, PushContainer, PushInto}; + +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::operators::generic::operator::Operator; + +/// Extension trait for filtering. +pub trait Filter { + /// Returns a new instance of `self` containing only records satisfying `predicate`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{Filter, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .filter(|x| *x % 2 == 0) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn filter)->bool+'static>(&self, predicate: P) -> Self; +} + +impl Filter for StreamCore +where + for<'a> C::Item<'a>: PushInto +{ + fn filter)->bool+'static>(&self, mut predicate: P) -> StreamCore { + let mut container = Default::default(); + self.unary(Pipeline, "Filter", move |_,_| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + if !container.is_empty() { + output.session(&time).give_iterator(container.drain().filter(&mut predicate)); + } + }); + }) + } +} diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs new file mode 100644 index 000000000..ec27cab9b --- /dev/null +++ b/timely/src/dataflow/operators/core/input.rs @@ -0,0 +1,436 @@ +//! Create new `Streams` connected to external inputs. + +use std::rc::Rc; +use std::cell::RefCell; + +use timely_container::{PushContainer, PushInto}; + +use crate::scheduling::{Schedule, Activator}; + +use crate::progress::frontier::Antichain; +use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; +use crate::progress::Source; + +use crate::Container; +use crate::communication::Push; +use crate::dataflow::{Scope, ScopeParent, StreamCore}; +use crate::dataflow::channels::pushers::{Tee, Counter}; +use crate::dataflow::channels::Message; + + +// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something +// TODO : more like a harness, with direct access to its inputs. + +// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird. +// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long. +// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a. + +/// Create a new `Stream` and `Handle` through which to supply input. +pub trait Input : Scope { + /// Create a new [StreamCore] and [Handle] through which to supply input. + /// + /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used + /// immediately for timely dataflow construction, and the `Handle` is later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Handle` also provides a means to indicate + /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely + /// to issue progress notifications. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = worker.dataflow(|scope| { + /// let (input, stream) = scope.new_input::>(); + /// stream.inspect(|x| println!("hello {:?}", x)); + /// input + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore); + + /// Create a new stream from a supplied interactive handle. + /// + /// This method creates a new timely stream whose data are supplied interactively through the `handle` + /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate + /// if it as attached to more than one stream. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore; +} + +use crate::order::TotalOrder; +impl Input for G where ::Timestamp: TotalOrder { + fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore) { + let mut handle = Handle::new(); + let stream = self.input_from(&mut handle); + (handle, stream) + } + + fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore { + let (output, registrar) = Tee::<::Timestamp, C>::new(); + let counter = Counter::new(output); + let produced = counter.produced().clone(); + + let index = self.allocate_operator_index(); + let mut address = self.addr(); + address.push(index); + + handle.activate.push(self.activator_for(&address[..])); + + let progress = Rc::new(RefCell::new(ChangeBatch::new())); + + handle.register(counter, progress.clone()); + + let copies = self.peers(); + + self.add_operator_with_index(Box::new(Operator { + name: "Input".to_owned(), + address, + shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), + progress, + messages: produced, + copies, + }), index); + + StreamCore::new(Source::new(index, 0), registrar, self.clone()) + } +} + +#[derive(Debug)] +struct Operator { + name: String, + address: Vec, + shared_progress: Rc>>, + progress: Rc>>, // times closed since last asked + messages: Rc>>, // messages sent since last asked + copies: usize, +} + +impl Schedule for Operator { + + fn name(&self) -> &str { &self.name } + + fn path(&self) -> &[usize] { &self.address[..] } + + fn schedule(&mut self) -> bool { + let shared_progress = &mut *self.shared_progress.borrow_mut(); + self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]); + self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]); + false + } +} + +impl Operate for Operator { + + fn inputs(&self) -> usize { 0 } + fn outputs(&self) -> usize { 1 } + + fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); + (Vec::new(), self.shared_progress.clone()) + } + + fn notify_me(&self) -> bool { false } +} + + +/// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation. +#[derive(Debug)] +pub struct Handle { + activate: Vec, + progress: Vec>>>, + pushers: Vec>>, + buffer1: C, + buffer2: C, + now_at: T, +} + +impl Handle { + /// Allocates a new input handle, from which one can create timely streams. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn new() -> Self { + Self { + activate: Vec::new(), + progress: Vec::new(), + pushers: Vec::new(), + buffer1: Default::default(), + buffer2: Default::default(), + now_at: T::minimum(), + } + } + + /// Creates an input stream from the handle in the supplied scope. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// input.to_stream(scope) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + where + T: TotalOrder, + G: ScopeParent, + { + scope.input_from(self) + } + + fn register( + &mut self, + pusher: Counter>, + progress: Rc>>, + ) { + // flush current contents, so new registrant does not see existing data. + if !self.buffer1.is_empty() { self.flush(); } + + // we need to produce an appropriate update to the capabilities for `progress`, in case a + // user has decided to drive the handle around a bit before registering it. + progress.borrow_mut().update(T::minimum(), -1); + progress.borrow_mut().update(self.now_at.clone(), 1); + + self.progress.push(progress); + self.pushers.push(pusher); + } + + // flushes our buffer at each of the destinations. there can be more than one; clone if needed. + #[inline(never)] + fn flush(&mut self) { + for index in 0 .. self.pushers.len() { + if index < self.pushers.len() - 1 { + self.buffer2.clone_from(&self.buffer1); + Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + debug_assert!(self.buffer2.is_empty()); + } + else { + Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); + debug_assert!(self.buffer1.is_empty()); + } + } + self.buffer1.clear(); + } + + // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. + fn close_epoch(&mut self) { + if !self.buffer1.is_empty() { self.flush(); } + for pusher in self.pushers.iter_mut() { + pusher.done(); + } + for progress in self.progress.iter() { + progress.borrow_mut().update(self.now_at.clone(), -1); + } + // Alert worker of each active input operator. + for activate in self.activate.iter() { + activate.activate(); + } + } + + /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. + /// + /// This method flushes single elements previously sent with `send`, to keep the insertion order. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, InspectCore}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect_container(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send_batch(&mut vec![format!("{}", round)]); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send_batch(&mut self, buffer: &mut C) { + + if !buffer.is_empty() { + // flush buffered elements to ensure local fifo. + if !self.buffer1.is_empty() { self.flush(); } + + // push buffer (or clone of buffer) at each destination. + for index in 0 .. self.pushers.len() { + if index < self.pushers.len() - 1 { + self.buffer2.clone_from(&buffer); + Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + assert!(self.buffer2.is_empty()); + } + else { + Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); + assert!(buffer.is_empty()); + } + } + buffer.clear(); + } + } + + /// Advances the current epoch to `next`. + /// + /// This method allows timely dataflow to issue progress notifications as it can now determine + /// that this input can no longer produce data at earlier timestamps. + pub fn advance_to(&mut self, next: T) { + // Assert that we do not rewind time. + assert!(self.now_at.less_equal(&next)); + // Flush buffers if time has actually changed. + if !self.now_at.eq(&next) { + self.close_epoch(); + self.now_at = next; + for progress in self.progress.iter() { + progress.borrow_mut().update(self.now_at.clone(), 1); + } + } + } + + /// Closes the input. + /// + /// This method allows timely dataflow to issue all progress notifications blocked by this input + /// and to begin to shut down operators, as this input can no longer produce data. + pub fn close(self) { } + + /// Reports the current epoch. + pub fn epoch(&self) -> &T { + &self.now_at + } + + /// Reports the current timestamp. + pub fn time(&self) -> &T { + &self.now_at + } +} + +impl Handle { + #[inline] + /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::core::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send>(&mut self, data: D) { + self.buffer1.push(data); + if self.buffer1.len() == self.buffer1.capacity() { + self.flush(); + } + } +} + +impl Default for Handle { + fn default() -> Self { + Self::new() + } +} + +impl Drop for Handle { + fn drop(&mut self) { + self.close_epoch(); + } +} diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs new file mode 100644 index 000000000..ecbba1ea4 --- /dev/null +++ b/timely/src/dataflow/operators/core/map.rs @@ -0,0 +1,73 @@ +//! Extension methods for `StreamCore` based on record-by-record transformation. + +use timely_container::{Container, PushContainer, PushInto}; + +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::operator::Operator; + +/// Extension trait for `Stream`. +pub trait Map { + /// Consumes each element of the stream and yields a new element. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{Map, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .map(|x| x + 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map(&self, mut logic: L) -> StreamCore + where + C2: PushContainer, + D2: PushInto, + L: FnMut(C::Item<'_>)->D2 + 'static, + { + self.flat_map(move |x| std::iter::once(logic(x))) + } + /// Consumes each element of the stream and yields some number of new elements. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{Map, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .flat_map(|x| (0..x)) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn flat_map(&self, logic: L) -> StreamCore + where + C2: PushContainer, + I: IntoIterator, + I::Item: PushInto, + L: FnMut(C::Item<'_>)->I + 'static, + ; +} + +impl Map for StreamCore { + // TODO : This would be more robust if it captured an iterator and then pulled an appropriate + // TODO : number of elements from the iterator. This would allow iterators that produce many + // TODO : records without taking arbitrarily long and arbitrarily much memory. + fn flat_map(&self, mut logic: L) -> StreamCore + where + C2: PushContainer, + I: IntoIterator, + I::Item: PushInto, + L: FnMut(C::Item<'_>)->I + 'static, + { + let mut container = Default::default(); + self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut container); + output.session(&time).give_iterator(container.drain().flat_map(&mut logic)); + }); + }) + } +} diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index d81a5c625..d2ae145d9 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,19 +1,31 @@ -//! Extension traits for `Stream` implementing various operators that +//! Extension traits for `StreamCore` implementing various operators that //! are independent of specific container types. pub mod concat; pub mod enterleave; pub mod exchange; pub mod feedback; +pub mod filter; +pub mod input; pub mod inspect; +pub mod map; +pub mod ok_err; pub mod probe; pub mod rc; pub mod reclock; +pub mod to_stream; +pub mod unordered_input; pub use concat::{Concat, Concatenate}; pub use enterleave::{Enter, Leave}; pub use exchange::Exchange; pub use feedback::{Feedback, LoopVariable, ConnectLoop}; +pub use filter::Filter; +pub use input::Input; pub use inspect::{Inspect, InspectCore}; +pub use map::Map; +pub use ok_err::OkErr; pub use probe::Probe; +pub use to_stream::ToStream; pub use reclock::Reclock; +pub use unordered_input::{UnorderedInput, UnorderedHandle}; diff --git a/timely/src/dataflow/operators/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs similarity index 67% rename from timely/src/dataflow/operators/ok_err.rs rename to timely/src/dataflow/operators/core/ok_err.rs index 36d794681..6888108b3 100644 --- a/timely/src/dataflow/operators/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,24 +1,23 @@ //! Operators that separate one stream into two streams based on some condition +use timely_container::{Container, PushContainer, PushInto}; + use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, Stream}; -use crate::Data; +use crate::dataflow::{Scope, StreamCore}; /// Extension trait for `Stream`. -pub trait OkErr { +pub trait OkErr { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with the data. /// If it returns `Ok(x)`, then `x` will be sent /// to the first returned stream; otherwise, if it returns `Err(e)`, /// then `e` will be sent to the second. /// - /// If the result of the closure only depends on the time, not the data, - /// `branch_when` should be used instead. - /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, OkErr, Inspect}; + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{OkErr, Inspect}; /// /// timely::example(|scope| { /// let (odd, even) = (0..10) @@ -29,28 +28,30 @@ pub trait OkErr { /// odd.inspect(|x| println!("odd numbers: {:?}", x)); /// }); /// ``` - fn ok_err( + fn ok_err( &self, logic: L, - ) -> (Stream, Stream) - + ) -> (StreamCore, StreamCore) where - D1: Data, - D2: Data, - L: FnMut(D) -> Result+'static + C1: PushContainer, + D1: PushInto, + C2: PushContainer, + D2: PushInto, + L: FnMut(C::Item<'_>) -> Result+'static ; } -impl OkErr for Stream { - fn ok_err( +impl OkErr for StreamCore { + fn ok_err( &self, mut logic: L, - ) -> (Stream, Stream) - + ) -> (StreamCore, StreamCore) where - D1: Data, - D2: Data, - L: FnMut(D) -> Result+'static + C1: PushContainer, + D1: PushInto, + C2: PushContainer, + D2: PushInto, + L: FnMut(C::Item<'_>) -> Result+'static { let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); @@ -59,16 +60,16 @@ impl OkErr for Stream { let (mut output2, stream2) = builder.new_output(); builder.build(move |_| { - let mut vector = Vec::new(); + let mut container = Default::default(); move |_frontiers| { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); input.for_each(|time, data| { - data.swap(&mut vector); + data.swap(&mut container); let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); - for datum in vector.drain(..) { + for datum in container.drain() { match logic(datum) { Ok(datum) => out1.give(datum), Err(datum) => out2.give(datum), diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs new file mode 100644 index 000000000..3775e0267 --- /dev/null +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -0,0 +1,58 @@ +//! Conversion to the `StreamCore` type from iterators. + +use crate::container::{PushContainer, PushInto}; +use crate::Container; +use crate::dataflow::operators::generic::operator::source; +use crate::dataflow::{StreamCore, Scope}; + +/// Converts to a timely [StreamCore]. +pub trait ToStream { + /// Converts to a timely [StreamCore]. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::core::ToStream; + /// use timely::dataflow::operators::Capture; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = (0..3).to_stream(scope).capture(); + /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream(self, scope: &mut S) -> StreamCore; +} + +impl ToStream for I where I::Item: PushInto { + fn to_stream(self, scope: &mut S) -> StreamCore { + + source(scope, "ToStream", |capability, info| { + + // Acquire an activator, so that the operator can rescheduled itself. + let activator = scope.activator_for(&info.address[..]); + + let mut iterator = self.into_iter().fuse(); + let mut capability = Some(capability); + + move |output| { + + if let Some(element) = iterator.next() { + let mut session = output.session(capability.as_ref().unwrap()); + session.give(element); + let n = 256 * crate::container::buffer::default_capacity::(); + for element in iterator.by_ref().take(n - 1) { + session.give(element); + } + activator.activate(); + } + else { + capability = None; + } + } + }) + } +} diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs new file mode 100644 index 000000000..61c42daa5 --- /dev/null +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -0,0 +1,163 @@ +//! Create new `StreamCore`s connected to external inputs. + +use std::rc::Rc; +use std::cell::RefCell; +use crate::Container; + +use crate::scheduling::{Schedule, ActivateOnDrop}; + +use crate::progress::frontier::Antichain; +use crate::progress::{Operate, operate::SharedProgress, Timestamp}; +use crate::progress::Source; +use crate::progress::ChangeBatch; + +use crate::dataflow::channels::pushers::{Counter, Tee}; +use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore}; + +use crate::dataflow::operators::{ActivateCapability, Capability}; + +use crate::dataflow::{Scope, StreamCore}; + +/// Create a new `Stream` and `Handle` through which to supply input. +pub trait UnorderedInput { + /// Create a new capability-based [StreamCore] and [UnorderedHandle] through which to supply input. This + /// input supports multiple open epochs (timestamps) at the same time. + /// + /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used + /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Capability` returned is for the default value of the timestamp type in use. The + /// capability can be dropped to inform the system that the input has advanced beyond the + /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp + /// should be obtained first, via the `delayed` function for `Capability`. + /// + /// To communicate the end-of-input drop all available capabilities. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// + /// use timely::*; + /// use timely::dataflow::operators::{capture::Extract, Capture}; + /// use timely::dataflow::operators::core::{UnorderedInput}; + /// use timely::dataflow::Stream; + /// + /// // get send and recv endpoints, wrap send to share + /// let (send, recv) = ::std::sync::mpsc::channel(); + /// let send = Arc::new(Mutex::new(send)); + /// + /// timely::execute(Config::thread(), move |worker| { + /// + /// // this is only to validate the output. + /// let send = send.lock().unwrap().clone(); + /// + /// // create and capture the unordered input. + /// let (mut input, mut cap) = worker.dataflow::(|scope| { + /// let (input, stream) = scope.new_unordered_input(); + /// stream.capture_into(send); + /// input + /// }); + /// + /// // feed values 0..10 at times 0..10. + /// for round in 0..10 { + /// input.session(cap.clone()).give(round); + /// cap = cap.delayed(&(round + 1)); + /// worker.step(); + /// } + /// }).unwrap(); + /// + /// let extract = recv.extract(); + /// for i in 0..10 { + /// assert_eq!(extract[i], (i, vec![i])); + /// } + /// ``` + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); +} + +impl UnorderedInput for G { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { + + let (output, registrar) = Tee::::new(); + let internal = Rc::new(RefCell::new(ChangeBatch::new())); + // let produced = Rc::new(RefCell::new(ChangeBatch::new())); + let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); + let counter = Counter::new(output); + let produced = counter.produced().clone(); + let peers = self.peers(); + + let index = self.allocate_operator_index(); + let mut address = self.addr(); + address.push(index); + + let cap = ActivateCapability::new(cap, &address, self.activations()); + + let helper = UnorderedHandle::new(counter); + + self.add_operator_with_index(Box::new(UnorderedOperator { + name: "UnorderedInput".to_owned(), + address, + shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), + internal, + produced, + peers, + }), index); + + ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) + } +} + +struct UnorderedOperator { + name: String, + address: Vec, + shared_progress: Rc>>, + internal: Rc>>, + produced: Rc>>, + peers: usize, +} + +impl Schedule for UnorderedOperator { + fn name(&self) -> &str { &self.name } + fn path(&self) -> &[usize] { &self.address[..] } + fn schedule(&mut self) -> bool { + let shared_progress = &mut *self.shared_progress.borrow_mut(); + self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]); + self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]); + false + } +} + +impl Operate for UnorderedOperator { + fn inputs(&self) -> usize { 0 } + fn outputs(&self) -> usize { 1 } + + fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + let mut borrow = self.internal.borrow_mut(); + for (time, count) in borrow.drain() { + self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); + } + (Vec::new(), self.shared_progress.clone()) + } + + fn notify_me(&self) -> bool { false } +} + +/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. +#[derive(Debug)] +pub struct UnorderedHandle { + buffer: PushBuffer>>, +} + +impl UnorderedHandle { + fn new(pusher: Counter>) -> UnorderedHandle { + UnorderedHandle { + buffer: PushBuffer::new(pusher), + } + } + + /// Allocates a new automatically flushing session based on the supplied capability. + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) + } +} diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 90eb45810..d1e30280b 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -1,20 +1,8 @@ //! Create new `Streams` connected to external inputs. -use std::rc::Rc; -use std::cell::RefCell; - -use crate::scheduling::{Schedule, Activator}; - -use crate::progress::frontier::Antichain; -use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; -use crate::progress::Source; - -use crate::{Container, Data}; -use crate::communication::Push; -use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore}; -use crate::dataflow::channels::pushers::{Tee, Counter}; -use crate::dataflow::channels::Message; - +use crate::Data; +use crate::dataflow::{Stream, ScopeParent, Scope}; +use crate::dataflow::operators::core::{Input as InputCore}; // TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something // TODO : more like a harness, with direct access to its inputs. @@ -60,41 +48,6 @@ pub trait Input : Scope { /// ``` fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); - /// Create a new [StreamCore] and [HandleCore] through which to supply input. - /// - /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used - /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce - /// data into the timely dataflow computation. - /// - /// The `HandleCore` also provides a means to indicate - /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely - /// to issue progress notifications. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input_core::>(); - /// stream.inspect(|x| println!("hello {:?}", x)); - /// input - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, C>, StreamCore); - /// Create a new stream from a supplied interactive handle. /// /// This method creates a new timely stream whose data are supplied interactively through the `handle` @@ -126,388 +79,18 @@ pub trait Input : Scope { /// }); /// ``` fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; - - /// Create a new stream from a supplied interactive handle. - /// - /// This method creates a new timely stream whose data are supplied interactively through the `handle` - /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate - /// if it as attached to more than one stream. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, C>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { - self.new_input_core() + InputCore::new_input(self) } fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { - self.input_from_core(handle) - } - - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, C>, StreamCore) { - let mut handle = HandleCore::new(); - let stream = self.input_from_core(&mut handle); - (handle, stream) - } - - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, C>) -> StreamCore { - let (output, registrar) = Tee::<::Timestamp, C>::new(); - let counter = Counter::new(output); - let produced = counter.produced().clone(); - - let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); - - handle.activate.push(self.activator_for(&address[..])); - - let progress = Rc::new(RefCell::new(ChangeBatch::new())); - - handle.register(counter, progress.clone()); - - let copies = self.peers(); - - self.add_operator_with_index(Box::new(Operator { - name: "Input".to_owned(), - address, - shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), - progress, - messages: produced, - copies, - }), index); - - StreamCore::new(Source::new(index, 0), registrar, self.clone()) - } -} - -#[derive(Debug)] -struct Operator { - name: String, - address: Vec, - shared_progress: Rc>>, - progress: Rc>>, // times closed since last asked - messages: Rc>>, // messages sent since last asked - copies: usize, -} - -impl Schedule for Operator { - - fn name(&self) -> &str { &self.name } - - fn path(&self) -> &[usize] { &self.address[..] } - - fn schedule(&mut self) -> bool { - let shared_progress = &mut *self.shared_progress.borrow_mut(); - self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]); - self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false + InputCore::input_from(self, handle) } } -impl Operate for Operator { - - fn inputs(&self) -> usize { 0 } - fn outputs(&self) -> usize { 1 } - - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { - self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); - (Vec::new(), self.shared_progress.clone()) - } - - fn notify_me(&self) -> bool { false } -} - - /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. -#[derive(Debug)] -pub struct HandleCore { - activate: Vec, - progress: Vec>>>, - pushers: Vec>>, - buffer1: C, - buffer2: C, - now_at: T, -} - -/// A handle specialized to vector-based containers. -pub type Handle = HandleCore>; - -impl HandleCore { - /// Allocates a new input handle, from which one can create timely streams. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn new() -> Self { - Self { - activate: Vec::new(), - progress: Vec::new(), - pushers: Vec::new(), - buffer1: Default::default(), - buffer2: Default::default(), - now_at: T::minimum(), - } - } - - /// Creates an input stream from the handle in the supplied scope. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// input.to_stream(scope) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore - where - T: TotalOrder, - G: ScopeParent, - { - scope.input_from_core(self) - } - - fn register( - &mut self, - pusher: Counter>, - progress: Rc>>, - ) { - // flush current contents, so new registrant does not see existing data. - if !self.buffer1.is_empty() { self.flush(); } - - // we need to produce an appropriate update to the capabilities for `progress`, in case a - // user has decided to drive the handle around a bit before registering it. - progress.borrow_mut().update(T::minimum(), -1); - progress.borrow_mut().update(self.now_at.clone(), 1); - - self.progress.push(progress); - self.pushers.push(pusher); - } - - // flushes our buffer at each of the destinations. there can be more than one; clone if needed. - #[inline(never)] - fn flush(&mut self) { - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&self.buffer1); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer1.is_empty()); - } - } - self.buffer1.clear(); - } - - // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. - fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } - for pusher in self.pushers.iter_mut() { - pusher.done(); - } - for progress in self.progress.iter() { - progress.borrow_mut().update(self.now_at.clone(), -1); - } - // Alert worker of each active input operator. - for activate in self.activate.iter() { - activate.activate(); - } - } - - /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. - /// - /// This method flushes single elements previously sent with `send`, to keep the insertion order. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, InspectCore}; - /// use timely::dataflow::operators::input::HandleCore; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = HandleCore::new(); - /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) - /// .inspect_container(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send_batch(&mut vec![format!("{}", round)]); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn send_batch(&mut self, buffer: &mut C) { - - if !buffer.is_empty() { - // flush buffered elements to ensure local fifo. - if !self.buffer1.is_empty() { self.flush(); } - - // push buffer (or clone of buffer) at each destination. - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&buffer); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); - assert!(buffer.is_empty()); - } - } - buffer.clear(); - } - } - - /// Advances the current epoch to `next`. - /// - /// This method allows timely dataflow to issue progress notifications as it can now determine - /// that this input can no longer produce data at earlier timestamps. - pub fn advance_to(&mut self, next: T) { - // Assert that we do not rewind time. - assert!(self.now_at.less_equal(&next)); - // Flush buffers if time has actually changed. - if !self.now_at.eq(&next) { - self.close_epoch(); - self.now_at = next; - for progress in self.progress.iter() { - progress.borrow_mut().update(self.now_at.clone(), 1); - } - } - } - - /// Closes the input. - /// - /// This method allows timely dataflow to issue all progress notifications blocked by this input - /// and to begin to shut down operators, as this input can no longer produce data. - pub fn close(self) { } - - /// Reports the current epoch. - pub fn epoch(&self) -> &T { - &self.now_at - } - - /// Reports the current timestamp. - pub fn time(&self) -> &T { - &self.now_at - } -} - -impl Handle { - #[inline] - /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn send(&mut self, data: D) { - // assert!(self.buffer1.capacity() == Message::::default_length()); - self.buffer1.push(data); - if self.buffer1.len() == self.buffer1.capacity() { - self.flush(); - } - } -} - -impl Default for Handle { - fn default() -> Self { - Self::new() - } -} - -impl Drop for HandleCore { - fn drop(&mut self) { - self.close_epoch(); - } -} +pub type Handle = crate::dataflow::operators::core::input::Handle>; diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 77eced3f6..9786d4ec5 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -4,6 +4,7 @@ use crate::Data; use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; +use crate::dataflow::operators::core::{Map as MapCore}; /// Extension trait for `Stream`. pub trait Map { @@ -19,7 +20,9 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(&self, logic: L) -> Stream; + fn mapD2+'static>(&self, mut logic: L) -> Stream { + self.flat_map(move |x| std::iter::once(logic(x))) + } /// Updates each element of the stream and yields the element, re-using memory where possible. /// /// # Examples @@ -49,15 +52,6 @@ pub trait Map { } impl Map for Stream { - fn mapD2+'static>(&self, mut logic: L) -> Stream { - let mut vector = Vec::new(); - self.unary(Pipeline, "Map", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - output.session(&time).give_iterator(vector.drain(..).map(|x| logic(x))); - }); - }) - } fn map_in_place(&self, mut logic: L) -> Stream { let mut vector = Vec::new(); self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { @@ -71,13 +65,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(&self, mut logic: L) -> Stream where I::Item: Data { - let mut vector = Vec::new(); - self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - output.session(&time).give_iterator(vector.drain(..).flat_map(|x| logic(x).into_iter())); - }); - }) + fn flat_mapI+'static>(&self, logic: L) -> Stream where I::Item: Data { + MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 458566ddc..453e8838f 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -9,7 +9,7 @@ //! Most of the operators in this module are defined using these two general operators. pub use self::input::Input; -pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; +pub use self::unordered_input::UnorderedInput; pub use self::partition::Partition; pub use self::map::Map; pub use self::inspect::{Inspect, InspectCore}; @@ -17,10 +17,8 @@ pub use self::filter::Filter; pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; -pub use self::to_stream::{ToStream, ToStreamCore}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; -pub use self::ok_err::OkErr; pub use self::result::ResultStream; pub use self::generic::Operator; @@ -45,10 +43,10 @@ pub mod delay; pub use self::core::exchange; pub mod broadcast; pub use self::core::probe::{self, Probe}; -pub mod to_stream; +pub use self::core::to_stream::ToStream; pub mod capture; pub mod branch; -pub mod ok_err; +pub use self::core::ok_err::{self, OkErr}; pub use self::core::rc; pub mod result; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs deleted file mode 100644 index 555a540b0..000000000 --- a/timely/src/dataflow/operators/to_stream.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! Conversion to the `Stream` type from iterators. - -use crate::Container; -use crate::progress::Timestamp; -use crate::Data; -use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::{StreamCore, Stream, Scope}; - -/// Converts to a timely `Stream`. -pub trait ToStream { - /// Converts to a timely `Stream`. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::operators::{ToStream, Capture}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = (0..3).to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream>(self, scope: &mut S) -> Stream; -} - -impl ToStream for I where I::Item: Data { - fn to_stream>(self, scope: &mut S) -> Stream { - - source(scope, "ToStream", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give(element); - let n = 256 * crate::container::buffer::default_capacity::(); - for element in iterator.by_ref().take(n - 1) { - session.give(element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) - } -} - -/// Converts to a timely [StreamCore]. -pub trait ToStreamCore { - /// Converts to a timely [StreamCore]. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::operators::{ToStreamCore, Capture}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = Some((0..3).collect::>()).to_stream_core(scope).capture(); - /// let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture(); - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream_core>(self, scope: &mut S) -> StreamCore; -} - -impl ToStreamCore for I where I::Item: Container { - fn to_stream_core>(self, scope: &mut S) -> StreamCore { - - source(scope, "ToStreamCore", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(mut element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give_container(&mut element); - let n = 256; - for mut element in iterator.by_ref().take(n - 1) { - session.give_container(&mut element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) - } -} diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 86c6b191c..f35d6ff06 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -1,23 +1,10 @@ //! Create new `Streams` connected to external inputs. -use std::rc::Rc; -use std::cell::RefCell; -use crate::Container; - -use crate::scheduling::{Schedule, ActivateOnDrop}; - -use crate::progress::frontier::Antichain; -use crate::progress::{Operate, operate::SharedProgress, Timestamp}; -use crate::progress::Source; -use crate::progress::ChangeBatch; - use crate::Data; -use crate::dataflow::channels::pushers::{Counter as PushCounter, Tee}; -use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore}; -use crate::dataflow::operators::{ActivateCapability, Capability}; - -use crate::dataflow::{Stream, Scope, StreamCore}; +use crate::dataflow::operators::{ActivateCapability}; +use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; +use crate::dataflow::{Stream, Scope}; /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { @@ -80,154 +67,9 @@ pub trait UnorderedInput { impl UnorderedInput for G { fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { - self.new_unordered_input_core() + UnorderedInputCore::new_unordered_input(self) } } /// An unordered handle specialized to vectors. pub type UnorderedHandle = UnorderedHandleCore>; - -/// Create a new `Stream` and `Handle` through which to supply input. -pub trait UnorderedInputCore { - /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This - /// input supports multiple open epochs (timestamps) at the same time. - /// - /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used - /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce - /// data into the timely dataflow computation. - /// - /// The `Capability` returned is for the default value of the timestamp type in use. The - /// capability can be dropped to inform the system that the input has advanced beyond the - /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp - /// should be obtained first, via the `delayed` function for `Capability`. - /// - /// To communicate the end-of-input drop all available capabilities. - /// - /// # Examples - /// - /// ``` - /// use std::sync::{Arc, Mutex}; - /// - /// use timely::*; - /// use timely::dataflow::operators::*; - /// use timely::dataflow::operators::capture::Extract; - /// use timely::dataflow::Stream; - /// - /// // get send and recv endpoints, wrap send to share - /// let (send, recv) = ::std::sync::mpsc::channel(); - /// let send = Arc::new(Mutex::new(send)); - /// - /// timely::execute(Config::thread(), move |worker| { - /// - /// // this is only to validate the output. - /// let send = send.lock().unwrap().clone(); - /// - /// // create and capture the unordered input. - /// let (mut input, mut cap) = worker.dataflow::(|scope| { - /// let (input, stream) = scope.new_unordered_input_core(); - /// stream.capture_into(send); - /// input - /// }); - /// - /// // feed values 0..10 at times 0..10. - /// for round in 0..10 { - /// input.session(cap.clone()).give(round); - /// cap = cap.delayed(&(round + 1)); - /// worker.step(); - /// } - /// }).unwrap(); - /// - /// let extract = recv.extract(); - /// for i in 0..10 { - /// assert_eq!(extract[i], (i, vec![i])); - /// } - /// ``` - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); -} - - -impl UnorderedInputCore for G { - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { - - let (output, registrar) = Tee::::new(); - let internal = Rc::new(RefCell::new(ChangeBatch::new())); - // let produced = Rc::new(RefCell::new(ChangeBatch::new())); - let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); - let counter = PushCounter::new(output); - let produced = counter.produced().clone(); - let peers = self.peers(); - - let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); - - let cap = ActivateCapability::new(cap, &address, self.activations()); - - let helper = UnorderedHandleCore::new(counter); - - self.add_operator_with_index(Box::new(UnorderedOperator { - name: "UnorderedInput".to_owned(), - address, - shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), - internal, - produced, - peers, - }), index); - - ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) - } -} - -struct UnorderedOperator { - name: String, - address: Vec, - shared_progress: Rc>>, - internal: Rc>>, - produced: Rc>>, - peers: usize, -} - -impl Schedule for UnorderedOperator { - fn name(&self) -> &str { &self.name } - fn path(&self) -> &[usize] { &self.address[..] } - fn schedule(&mut self) -> bool { - let shared_progress = &mut *self.shared_progress.borrow_mut(); - self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]); - self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false - } -} - -impl Operate for UnorderedOperator { - fn inputs(&self) -> usize { 0 } - fn outputs(&self) -> usize { 1 } - - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { - let mut borrow = self.internal.borrow_mut(); - for (time, count) in borrow.drain() { - self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); - } - (Vec::new(), self.shared_progress.clone()) - } - - fn notify_me(&self) -> bool { false } -} - -/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. -#[derive(Debug)] -pub struct UnorderedHandleCore { - buffer: PushBuffer>>, -} - -impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { - UnorderedHandleCore { - buffer: PushBuffer::new(pusher), - } - } - - /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { - ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) - } -}