From be12daed08ac6e2c6ec690b5a35459e8b984047c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 15 Nov 2021 17:27:12 -0500 Subject: [PATCH 1/3] Timely container abstraction Introduces a container abstraction to enable generic data on dataflow edges. A container describes its contents, has a length and capacity, can be cleared. Containers implementing the PushPartitioned trait are suitable for exchanging across Timely workers. Signed-off-by: Moritz Hoffmann --- Cargo.toml | 1 + container/Cargo.toml | 6 + container/src/lib.rs | 163 ++++++++++++++++++++++++++++ mdbook/src/chapter_5/chapter_5_3.md | 76 +++++++++++++ 4 files changed, 246 insertions(+) create mode 100644 container/Cargo.toml create mode 100644 container/src/lib.rs create mode 100644 mdbook/src/chapter_5/chapter_5_3.md diff --git a/Cargo.toml b/Cargo.toml index a7feac1a1..e7bacb486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "bytes", "communication", + "container", "kafkaesque", "logging", "timely", diff --git a/container/Cargo.toml b/container/Cargo.toml new file mode 100644 index 000000000..23d530654 --- /dev/null +++ b/container/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "timely_container" +version = "0.12.0" +edition = "2018" +description = "Container abstractions for Timely" +license = "MIT" diff --git a/container/src/lib.rs b/container/src/lib.rs new file mode 100644 index 000000000..6e465ba8d --- /dev/null +++ b/container/src/lib.rs @@ -0,0 +1,163 @@ +//! Specifications for containers + +#![forbid(missing_docs)] + +/// A container transferring data through dataflow edges +/// +/// A container stores a number of elements and thus is able to describe it length (`len()`) and +/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`). +/// +/// A container must implement default. The default implementation is not required to allocate +/// memory for variable-length components. +/// +/// We require the container to be cloneable to enable efficient copies when providing references +/// of containers to operators. Care must be taken that the type's `clone_from` implementation +/// is efficient (which is not necessarily the case when deriving `Clone`.) +/// TODO: Don't require `Container: Clone` +pub trait Container: Default + Clone + 'static { + /// The type of elements this container holds. + type Item; + + /// The number of elements in this container + /// + /// The length of a container must be consistent between sending and receiving it. + /// When exchanging a container and partitioning it into pieces, the sum of the length + /// of all pieces must be equal to the length of the original container. + fn len(&self) -> usize; + + /// Determine if the container contains any elements, corresponding to `len() == 0`. + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The capacity of the underlying container + fn capacity(&self) -> usize; + + /// Remove all contents from `self` while retaining allocated memory. + /// After calling `clear`, `is_empty` must return `true` and `len` 0. + fn clear(&mut self); +} + +impl Container for Vec { + type Item = T; + + fn len(&self) -> usize { + Vec::len(self) + } + + fn is_empty(&self) -> bool { + Vec::is_empty(self) + } + + fn capacity(&self) -> usize { + Vec::capacity(self) + } + + fn clear(&mut self) { Vec::clear(self) } +} + +mod rc { + use std::rc::Rc; + + use crate::Container; + + impl Container for Rc { + type Item = T::Item; + + fn len(&self) -> usize { + std::ops::Deref::deref(self).len() + } + + fn is_empty(&self) -> bool { + std::ops::Deref::deref(self).is_empty() + } + + fn capacity(&self) -> usize { + std::ops::Deref::deref(self).capacity() + } + + fn clear(&mut self) { } + } +} + +mod arc { + use std::sync::Arc; + + use crate::Container; + + impl Container for Arc { + type Item = T::Item; + + fn len(&self) -> usize { + std::ops::Deref::deref(self).len() + } + + fn is_empty(&self) -> bool { + std::ops::Deref::deref(self).is_empty() + } + + fn capacity(&self) -> usize { + std::ops::Deref::deref(self).capacity() + } + + fn clear(&mut self) { } + } +} + +/// A container that can partition itself into pieces. +pub trait PushPartitioned: Container { + /// Partition and push this container. + /// + /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to + /// append an element to. Call `flush` with an index and a buffer to send the data downstream. + fn push_partitioned(&mut self, buffers: &mut [Self], index: I, flush: F) + where + I: FnMut(&Self::Item) -> usize, + F: FnMut(usize, &mut Self); +} + +impl PushPartitioned for Vec { + fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + where + I: FnMut(&Self::Item) -> usize, + F: FnMut(usize, &mut Self), + { + fn ensure_capacity(this: &mut Vec) { + let capacity = this.capacity(); + let desired_capacity = buffer::default_capacity::(); + if capacity < desired_capacity { + this.reserve(desired_capacity - capacity); + } + } + + for datum in self.drain(..) { + let index = index(&datum); + ensure_capacity(&mut buffers[index]); + buffers[index].push(datum); + if buffers[index].len() == buffers[index].capacity() { + flush(index, &mut buffers[index]); + } + } + } +} + +pub mod buffer { + //! Functionality related to calculating default buffer sizes + + /// The upper limit for buffers to allocate, size in bytes. [default_capacity] converts + /// this to size in elements. + pub const BUFFER_SIZE_BYTES: usize = 1 << 13; + + /// The maximum buffer capacity in elements. Returns a number between [:BUFFER_SIZE_BYTES] + /// and 1, inclusively. + pub const fn default_capacity() -> usize { + let size = ::std::mem::size_of::(); + if size == 0 { + BUFFER_SIZE_BYTES + } else if size <= BUFFER_SIZE_BYTES { + BUFFER_SIZE_BYTES / size + } else { + 1 + } + } +} diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md new file mode 100644 index 000000000..5cd38763e --- /dev/null +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -0,0 +1,76 @@ +# Containers + +Timely's core isn't tied to a specific representation of data that flows along dataflow edges. +While some convenience operators make assumptions about the type of batches, the core infrastructure is generic in what containers are exchanged. +This section explains what containers are and what their contract with the Timely APIs is. + +Many parts of Timely assume that data is organized into `Vec`, i.e., batches store data as consecutive elements in memory. +This abstractions works well for many cases but precludes some advanced techniques, such as transferring translated or columnar data between operators. +With the container abstraction, Timely specifies a minimal interface it requires tracking progress and provide data to operators. + +## Core operators + +In Timely, we provide a set of `Core` operators that are generic on the container type they can handle. +In most cases, the `Core` operators are a immediate generalization of their non-core variant, providing the semantically equivalent functionality. + +## Limitations + +A challenge when genericizing Timely operators is that all interfaces need to be declared independent of a concrete type, for example as part of a trait. +For this reason, Timely doesn't currently support operators that require knowledge of the elements of a container or how to partition a container, with the only exception being the `Vec` type. + +## A custom container + +Let's walk through an example container that resembles a `Result` type, but moves the storage to within the result. + +```rust +extern crate timely_container; + +use timely_container::Container; + +#[derive(Clone)] +enum ResultContainer { + Ok(Vec), + Err(E), +} + +impl Default for ResultContainer { + fn default() -> Self { + Self::Ok(Default::default()) + } +} + +impl Container for ResultContainer { + type Item = Result; + + fn len(&self) -> usize { + match self { + ResultContainer::Ok(data) => data.len(), + ResultContainer::Err(_) => 1, + } + } + + fn is_empty(&self) -> bool { + match self { + ResultContainer::Ok(data) => data.is_empty(), + ResultContainer::Err(_) => false, + } + } + + fn capacity(&self) -> usize { + match self { + ResultContainer::Ok(data) => data.capacity(), + ResultContainer::Err(_) => 1, + } + } + + fn clear(&mut self) { + match self { + ResultContainer::Ok(data) => data.clear(), + ResultContainer::Err(_) => {}, + } + } +} +``` + +The type can either store a vector of data, or a single error. +Its length is the length of the vector, or 1 if it represents an error. From f3884138c4ecf92dd9aa8d5b7ba42fe03a55a84a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 15 Nov 2021 17:34:20 -0500 Subject: [PATCH 2/3] Make Timely generic over a container type Add *Core variants of core Timely infrastructure that is generic in the data passed along dataflow edges. Signed-off-by: Moritz Hoffmann --- kafkaesque/src/lib.rs | 32 +-- timely/Cargo.toml | 1 + timely/src/dataflow/channels/mod.rs | 52 ++--- timely/src/dataflow/channels/pact.rs | 64 +++--- .../src/dataflow/channels/pullers/counter.rs | 11 +- .../src/dataflow/channels/pushers/buffer.rs | 73 +++++-- .../src/dataflow/channels/pushers/counter.rs | 20 +- .../src/dataflow/channels/pushers/exchange.rs | 67 +++--- timely/src/dataflow/channels/pushers/mod.rs | 4 +- timely/src/dataflow/channels/pushers/tee.rs | 39 ++-- timely/src/dataflow/mod.rs | 3 +- .../src/dataflow/operators/capture/capture.rs | 26 +-- .../src/dataflow/operators/capture/event.rs | 113 ++++++---- .../src/dataflow/operators/capture/extract.rs | 78 +++++-- timely/src/dataflow/operators/capture/mod.rs | 14 +- .../src/dataflow/operators/capture/replay.rs | 34 +-- timely/src/dataflow/operators/concat.rs | 36 ++-- timely/src/dataflow/operators/enterleave.rs | 60 +++--- timely/src/dataflow/operators/exchange.rs | 4 +- timely/src/dataflow/operators/feedback.rs | 60 ++++-- .../dataflow/operators/generic/builder_raw.rs | 25 ++- .../dataflow/operators/generic/builder_rc.rs | 35 ++- .../src/dataflow/operators/generic/handles.rs | 67 +++--- timely/src/dataflow/operators/generic/mod.rs | 2 +- .../dataflow/operators/generic/operator.rs | 199 +++++++++-------- timely/src/dataflow/operators/input.rs | 202 ++++++++++++++---- timely/src/dataflow/operators/inspect.rs | 45 +++- timely/src/dataflow/operators/mod.rs | 6 +- timely/src/dataflow/operators/probe.rs | 20 +- timely/src/dataflow/operators/to_stream.rs | 58 ++++- .../src/dataflow/operators/unordered_input.rs | 95 ++++++-- timely/src/dataflow/stream.rs | 18 +- timely/src/execute.rs | 8 +- timely/src/lib.rs | 6 + 34 files changed, 1014 insertions(+), 563 deletions(-) diff --git a/kafkaesque/src/lib.rs b/kafkaesque/src/lib.rs index 39ec51bc1..ba91061e6 100644 --- a/kafkaesque/src/lib.rs +++ b/kafkaesque/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicIsize, Ordering}; use abomonation::Abomonation; -use timely::dataflow::operators::capture::event::{Event, EventPusher, EventIterator}; +use timely::dataflow::operators::capture::event::{EventCore, EventPusherCore, EventIteratorCore}; use rdkafka::Message; use rdkafka::client::ClientContext; @@ -37,7 +37,7 @@ impl OutstandingCounterContext { } /// A wrapper for `W: Write` implementing `EventPusher`. -pub struct EventProducer { +pub struct EventProducerCore { topic: String, buffer: Vec, producer: BaseProducer, @@ -45,14 +45,17 @@ pub struct EventProducer { phant: ::std::marker::PhantomData<(T,D)>, } -impl EventProducer { +/// [EventProducerCore] specialized to vector-based containers. +pub type EventProducer = EventProducerCore>; + +impl EventProducerCore { /// Allocates a new `EventWriter` wrapping a supplied writer. pub fn new(config: ClientConfig, topic: String) -> Self { let counter = Arc::new(AtomicIsize::new(0)); let context = OutstandingCounterContext::new(&counter); let producer = BaseProducer::::from_config_and_context(&config, context).expect("Couldn't create producer"); println!("allocating producer for topic {:?}", topic); - EventProducer { + Self { topic: topic, buffer: vec![], producer: producer, @@ -62,8 +65,8 @@ impl EventProducer { } } -impl EventPusher for EventProducer { - fn push(&mut self, event: Event) { +impl EventPusherCore for EventProducerCore { + fn push(&mut self, event: EventCore) { unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); } // println!("sending {:?} bytes", self.buffer.len()); self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap(); @@ -73,7 +76,7 @@ impl EventPusher for EventProducer { } } -impl Drop for EventProducer { +impl Drop for EventProducerCore { fn drop(&mut self) { while self.counter.load(Ordering::SeqCst) > 0 { self.producer.poll(std::time::Duration::from_millis(10)); @@ -82,19 +85,22 @@ impl Drop for EventProducer { } /// A Wrapper for `R: Read` implementing `EventIterator`. -pub struct EventConsumer { +pub struct EventConsumerCore { consumer: BaseConsumer, buffer: Vec, phant: ::std::marker::PhantomData<(T,D)>, } -impl EventConsumer { +/// [EventConsumerCore] specialized to vector-based containers. +pub type EventConsumer = EventConsumerCore>; + +impl EventConsumerCore { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(config: ClientConfig, topic: String) -> Self { println!("allocating consumer for topic {:?}", topic); let consumer : BaseConsumer = config.create().expect("Couldn't create consumer"); consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); - EventConsumer { + Self { consumer: consumer, buffer: Vec::new(), phant: ::std::marker::PhantomData, @@ -102,14 +108,14 @@ impl EventConsumer { } } -impl EventIterator for EventConsumer { - fn next(&mut self) -> Option<&Event> { +impl EventIteratorCore for EventConsumerCore { + fn next(&mut self) -> Option<&EventCore> { if let Some(result) = self.consumer.poll(std::time::Duration::from_millis(0)) { match result { Ok(message) => { self.buffer.clear(); self.buffer.extend_from_slice(message.payload().unwrap()); - Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) + Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) }, Err(err) => { println!("KafkaConsumer error: {:?}", err); diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 60269900a..0b550e53c 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -29,6 +29,7 @@ abomonation_derive = "0.5" timely_bytes = { path = "../bytes", version = "0.12" } timely_logging = { path = "../logging", version = "0.12" } timely_communication = { path = "../communication", version = "0.12", default-features = false } +timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" futures-util = "0.3" diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 7d174f083..84303a039 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -1,6 +1,7 @@ //! Structured communication between timely dataflow operators. use crate::communication::Push; +use crate::Container; /// A collection of types that may be pushed at. pub mod pushers; @@ -10,7 +11,10 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type Bundle = crate::communication::Message>; +pub type BundleCore = crate::communication::Message>; + +/// The input to and output from timely dataflow communication channels specialized to vectors. +pub type Bundle = BundleCore>; /// A serializable representation of timestamped data. #[derive(Clone, Abomonation, Serialize, Deserialize)] @@ -18,7 +22,7 @@ pub struct Message { /// The timestamp associated with the message. pub time: T, /// The data in the message. - pub data: Vec, + pub data: D, /// The source worker. pub from: usize, /// A sequence number for this worker-to-worker stream. @@ -27,49 +31,26 @@ pub struct Message { impl Message { /// Default buffer size. + #[deprecated = "Use timely::buffer::default_capacity instead"] pub fn default_length() -> usize { - const MESSAGE_BUFFER_SIZE: usize = 1 << 13; - let size = std::mem::size_of::(); - if size == 0 { - // We could use usize::MAX here, but to avoid overflows we - // limit the default length for zero-byte types. - MESSAGE_BUFFER_SIZE - } else if size <= MESSAGE_BUFFER_SIZE { - MESSAGE_BUFFER_SIZE / size - } else { - 1 - } + crate::container::buffer::default_capacity::() } +} +impl Message { /// Creates a new message instance from arguments. - pub fn new(time: T, data: Vec, from: usize, seq: usize) -> Self { + pub fn new(time: T, data: D, from: usize, seq: usize) -> Self { Message { time, data, from, seq } } /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher - /// leaves in place, or a new `Vec`. Note that the returned vector is always initialized with - /// a capacity of [Self::default_length] elements. - #[inline] - pub fn push_at>>(buffer: &mut Vec, time: T, pusher: &mut P) { - - Self::push_at_no_allocation(buffer, time, pusher); - - // Allocate a default buffer to avoid oddly sized or empty buffers - if buffer.capacity() != Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); - } - } - - /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher - /// leaves in place, or a new empty `Vec`. If the pusher leaves a vector with a capacity larger - /// than [Self::default_length], the vector is initialized with a new vector with - /// [Self::default_length] capacity. + /// leaves in place, or the container's default element. #[inline] - pub fn push_at_no_allocation>>(buffer: &mut Vec, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); - let mut bundle = Some(Bundle::from_typed(message)); + let mut bundle = Some(BundleCore::from_typed(message)); pusher.push(&mut bundle); @@ -79,10 +60,5 @@ impl Message { buffer.clear(); } } - - // Avoid memory leaks by buffers growing out of bounds - if buffer.capacity() > Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); - } } } diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 43b5e4279..6ef17ab31 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -8,33 +8,41 @@ //! The progress tracking logic assumes that this number is independent of the pact used. use std::{fmt::{self, Debug}, marker::PhantomData}; +use timely_container::PushPartitioned; use crate::communication::{Push, Pull, Data}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; +use crate::Container; use crate::worker::AsWorker; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use super::{Bundle, Message}; +use super::{BundleCore, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; +use crate::progress::Timestamp; -/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. -pub trait ParallelizationContract { +/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors. +pub trait ParallelizationContractCore { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); } +/// A `ParallelizationContractCore` specialized for `Vec` containers +/// TODO: Use trait aliases once stable. +pub trait ParallelizationContract: ParallelizationContractCore> { } +impl>> ParallelizationContract for P { } + /// A direct connection #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContract for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; +impl ParallelizationContractCore for Pipeline { + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); // // ignore `&mut A` and use thread allocator @@ -45,12 +53,15 @@ impl ParallelizationContract for Pipeline { } /// An exchange between multiple observers by data -pub struct Exchange { hash_func: F, phantom: PhantomData } +pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> } + +/// [ExchangeCore] specialized to vector-based containers. +pub type Exchange = ExchangeCore, D, F>; -implu64+'static> Exchange { +implu64+'static> ExchangeCore { /// Allocates a new `Exchange` pact from a distribution function. - pub fn new(func: F) -> Exchange { - Exchange { + pub fn new(func: F) -> ExchangeCore { + ExchangeCore { hash_func: func, phantom: PhantomData, } @@ -58,19 +69,22 @@ implu64+'static> Exchange { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContract for Exchange { +implu64+'static> ParallelizationContractCore for ExchangeCore +where + C: Data + Container + PushPartitioned, +{ // TODO: The closure in the type prevents us from naming it. // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; + type Pusher = Box>>; + type Puller = Box>>; fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); + let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) } } -impl Debug for Exchange { +impl Debug for ExchangeCore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Exchange").finish() } @@ -78,7 +92,7 @@ impl Debug for Exchange { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, @@ -88,7 +102,7 @@ pub struct LogPusher>> { logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -103,9 +117,9 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; @@ -134,7 +148,7 @@ impl>> Push> for LogPusher { /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, @@ -142,7 +156,7 @@ pub struct LogPuller>> { logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -155,9 +169,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 66a06c6ec..04a189da5 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -3,21 +3,22 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::BundleCore; use crate::progress::ChangeBatch; use crate::communication::Pull; +use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, phantom: ::std::marker::PhantomData, } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut Bundle> { + pub fn next(&mut self) -> Option<&mut BundleCore> { if let Some(message) = self.pullable.pull() { if message.data.len() > 0 { self.consumed.borrow_mut().update(message.time.clone(), message.data.len() as i64); @@ -29,7 +30,7 @@ impl>> Counter { } } -impl>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 288d5ff88..6f92dabb0 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -1,44 +1,50 @@ //! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, //! with the performance of batched sends. -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::{Bundle, BundleCore, Message}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; use crate::communication::Push; +use crate::{Container, Data}; /// Buffers data sent at the same time, for efficient communication. /// /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct Buffer>> { - time: Option, // the currently open time, if it is open - buffer: Vec, // a buffer for records, to send at self.time +pub struct BufferCore>> { + /// the currently open time, if it is open + time: Option, + /// a buffer for records, to send at self.time + buffer: D, pusher: P, } -impl>> Buffer where T: Eq+Clone { +/// A buffer specialized to vector-based containers. +pub type Buffer = BufferCore, P>; + +impl>> BufferCore where T: Eq+Clone { /// Creates a new `Buffer`. - pub fn new(pusher: P) -> Buffer { - Buffer { + pub fn new(pusher: P) -> Self { + Self { time: None, - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Default::default(), pusher, } } /// Returns a `Session`, which accepts data to send at the associated time - pub fn session(&mut self, time: &T) -> Session { + pub fn session(&mut self, time: &T) -> Session { if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); } self.time = Some(time.clone()); Session { buffer: self } } /// Allocates a new `AutoflushSession` which flushes itself on drop. - pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSession where T: Timestamp { + pub fn autoflush_session(&mut self, cap: Capability) -> AutoflushSessionCore where T: Timestamp { if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); } self.time = Some(cap.time().clone()); - AutoflushSession { + AutoflushSessionCore { buffer: self, _capability: cap, } @@ -63,8 +69,23 @@ impl>> Buffer where T: Eq+Clone { } } + // Gives an entire container at a specific time. + fn give_container(&mut self, vector: &mut C) { + // flush to ensure fifo-ness + self.flush(); + + let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone(); + Message::push_at(vector, time, &mut self.pusher); + } +} + +impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. fn give(&mut self, data: D) { + if self.buffer.capacity() < crate::container::buffer::default_capacity::() { + let to_reserve = crate::container::buffer::default_capacity::() - self.buffer.capacity(); + self.buffer.reserve(to_reserve); + } self.buffer.push(data); // assert!(self.buffer.capacity() == Message::::default_length()); if self.buffer.len() == self.buffer.capacity() { @@ -75,9 +96,7 @@ impl>> Buffer where T: Eq+Clone { // Gives an entire message at a specific time. fn give_vec(&mut self, vector: &mut Vec) { // flush to ensure fifo-ness - if !self.buffer.is_empty() { - self.flush(); - } + self.flush(); let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone(); Message::push_at(vector, time, &mut self.pusher); @@ -90,11 +109,18 @@ impl>> Buffer where T: Eq+Clone { /// The `Session` struct provides the user-facing interface to an operator output, namely /// the `Buffer` type. A `Session` wraps a session of output at a specified time, and /// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, D, P: Push>+'a> where T: Eq+Clone+'a, D: 'a { - buffer: &'a mut Buffer, +pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { + buffer: &'a mut BufferCore, } -impl<'a, T, D, P: Push>+'a> Session<'a, T, D, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { + /// Provide a container at the time specified by the [Session]. + pub fn give_container(&mut self, container: &mut C) { + self.buffer.give_container(container) + } +} + +impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Provides one record at the time specified by the `Session`. #[inline] pub fn give(&mut self, data: D) { @@ -121,15 +147,18 @@ impl<'a, T, D, P: Push>+'a> Session<'a, T, D, P> where T: Eq+Clone } /// A session which will flush itself when dropped. -pub struct AutoflushSession<'a, T: Timestamp, D, P: Push>+'a> where - T: Eq+Clone+'a, D: 'a { +pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where + T: Eq+Clone+'a, C: 'a { /// A reference to the underlying buffer. - buffer: &'a mut Buffer, + buffer: &'a mut BufferCore, /// The capability being used to send the data. _capability: Capability, } -impl<'a, T: Timestamp, D, P: Push>+'a> AutoflushSession<'a, T, D, P> where T: Eq+Clone+'a, D: 'a { +/// Auto-flush session specialized to vector-based containers. +pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec, P>; + +impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Transmits a single record. #[inline] pub fn give(&mut self, data: D) { @@ -151,7 +180,7 @@ impl<'a, T: Timestamp, D, P: Push>+'a> AutoflushSession<'a, T, D, P } } -impl<'a, T: Timestamp, D, P: Push>+'a> Drop for AutoflushSession<'a, T, D, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { fn drop(&mut self) { self.buffer.cease(); } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index c8d6dff72..59ccacf32 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -4,21 +4,25 @@ use std::marker::PhantomData; use std::rc::Rc; use std::cell::RefCell; -use crate::progress::ChangeBatch; -use crate::dataflow::channels::Bundle; +use crate::progress::{ChangeBatch, Timestamp}; +use crate::dataflow::channels::BundleCore; use crate::communication::Push; +use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct Counter>> { +pub struct CounterCore>> { pushee: P, produced: Rc>>, phantom: PhantomData, } -impl Push> for Counter where T : Ord+Clone+'static, P: Push> { +/// A counter specialized to vector. +pub type Counter = CounterCore, P>; + +impl Push> for CounterCore where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -30,10 +34,10 @@ impl Push> for Counter where T : Ord+Clone+'stati } } -impl>> Counter where T : Ord+Clone+'static { +impl>> CounterCore where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. - pub fn new(pushee: P) -> Counter { - Counter { + pub fn new(pushee: P) -> CounterCore { + CounterCore { pushee, produced: Rc::new(RefCell::new(ChangeBatch::new())), phantom: PhantomData, diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index aad867aa2..6ddca7332 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -1,64 +1,51 @@ //! The exchange pattern distributes pushed data between many target pushees. -use crate::Data; +use timely_container::PushPartitioned; +use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::{BundleCore, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&T, &D) -> u64> { +pub struct Exchange>, H: FnMut(&T, &D) -> u64> { pushers: Vec

, - buffers: Vec>, + buffers: Vec, current: Option, hash_func: H, + phantom: std::marker::PhantomData, } -impl>, H: FnMut(&T, &D)->u64> Exchange { +impl>, H: FnMut(&T, &D)->u64> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { - buffers.push(Vec::new()); + buffers.push(Default::default()); } Exchange { pushers, hash_func: key, buffers, current: None, + phantom: std::marker::PhantomData, } } #[inline] fn flush(&mut self, index: usize) { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); - } - } - } - - /// Push data partitioned according to an index function. - #[inline(always)] - fn push_partitioned usize>(&mut self, time: &T, data: &mut Vec, func: F) { - for datum in data.drain(..) { - let index = (func)((self.hash_func)(time, &datum)); - - // Ensure allocated buffers: If the buffer's capacity is less than its default - // capacity, increase the capacity such that it matches the default. - if self.buffers[index].capacity() < Message::::default_length() { - let to_reserve = Message::::default_length() - self.buffers[index].capacity(); - self.buffers[index].reserve(to_reserve); - } - self.buffers[index].push(datum); - if self.buffers[index].len() == self.buffers[index].capacity() { - self.flush(index); + Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); } } } } -impl>, H: FnMut(&T, &D)->u64> Push> for Exchange { +impl>, H: FnMut(&T, &D)->u64> Push> for Exchange +where + C: PushPartitioned +{ #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); @@ -77,15 +64,31 @@ impl>, H: FnMut(&T, &D)->u64> Push = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. -pub struct Tee { - buffer: Vec, +pub struct TeeCore { + buffer: D, shared: PushList, } -impl Push> for Tee { +/// [TeeCore] specialized to `Vec`-based container. +pub type Tee = TeeCore>; + +impl Push> for TeeCore { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { - self.buffer.extend_from_slice(&message.data); + self.buffer.clone_from(&message.data); Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]); } } @@ -39,12 +42,12 @@ impl Push> for Tee { } } -impl Tee { +impl TeeCore { /// Allocates a new pair of `Tee` and `TeeHelper`. - pub fn new() -> (Tee, TeeHelper) { + pub fn new() -> (TeeCore, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); - let port = Tee { - buffer: Vec::with_capacity(Message::::default_length()), + let port = TeeCore { + buffer: Default::default(), shared: shared.clone(), }; @@ -52,16 +55,16 @@ impl Tee { } } -impl Clone for Tee { - fn clone(&self) -> Tee { - Tee { - buffer: Vec::with_capacity(self.buffer.capacity()), +impl Clone for TeeCore { + fn clone(&self) -> Self { + Self { + buffer: Default::default(), shared: self.shared.clone(), } } } -impl Debug for Tee +impl Debug for TeeCore where D: Debug, { @@ -86,7 +89,7 @@ pub struct TeeHelper { impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index 54da3cb7e..a6a3c33a9 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -13,9 +13,10 @@ //! }); //! ``` -pub use self::stream::Stream; +pub use self::stream::{StreamCore, Stream}; pub use self::scopes::{Scope, ScopeParent}; +pub use self::operators::input::HandleCore as InputHandleCore; pub use self::operators::input::Handle as InputHandle; pub use self::operators::probe::Handle as ProbeHandle; diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index f3f2181cb..c62b95417 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -5,19 +5,19 @@ //! and there are several default implementations, including a linked-list, Rust's MPSC //! queue, and a binary serializer wrapping any `W: Write`. -use crate::Data; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; +use crate::Container; use crate::progress::ChangeBatch; use crate::progress::Timestamp; -use super::{Event, EventPusher}; +use super::{EventCore, EventPusherCore}; /// Capture a stream of timestamped data for later replay. -pub trait Capture { +pub trait Capture { /// Captures a stream of timestamped data for later replay. /// /// # Examples @@ -30,7 +30,7 @@ pub trait Capture { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -42,7 +42,7 @@ pub trait Capture { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLink::new()); + /// let handle1 = Rc::new(EventLinkCore::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -103,18 +103,18 @@ pub trait Capture { /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// ``` - fn capture_into+'static>(&self, pusher: P); + fn capture_into+'static>(&self, pusher: P); /// Captures a stream using Rust's MPSC channels. - fn capture(&self) -> ::std::sync::mpsc::Receiver> { + fn capture(&self) -> ::std::sync::mpsc::Receiver> { let (send, recv) = ::std::sync::mpsc::channel(); self.capture_into(send); recv } } -impl Capture for Stream { - fn capture_into+'static>(&self, mut event_pusher: P) { +impl Capture for StreamCore { + fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -131,7 +131,7 @@ impl Capture for Stream { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(Event::Progress(to_send.into_inner())); + event_pusher.push(EventCore::Progress(to_send.into_inner())); } use crate::communication::message::RefOrMut; @@ -142,8 +142,8 @@ impl Capture for Stream { RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)), RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; - let vector = data.replace(Vec::new()); - event_pusher.push(Event::Messages(time.clone(), vector)); + let vector = data.replace(Default::default()); + event_pusher.push(EventCore::Messages(time.clone(), vector)); } input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); false diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index 31e3d6b0d..f7686df00 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -6,35 +6,55 @@ /// Data and progress events of the captured stream. #[derive(Debug, Clone, Abomonation, Hash, Ord, PartialOrd, Eq, PartialEq)] -pub enum Event { +pub enum EventCore { /// Progress received via `push_external_progress`. Progress(Vec<(T, i64)>), /// Messages received via the data stream. - Messages(T, Vec), + Messages(T, D), } -/// Iterates over contained `Event`. +/// Data and progress events of the captured stream, specialized to vector-based containers. +pub type Event = EventCore>; + +/// Iterates over contained `EventCore`. /// /// The `EventIterator` trait describes types that can iterate over references to events, /// and which can be used to replay a stream into a new timely dataflow computation. /// /// This method is not simply an iterator because of the lifetime in the result. -pub trait EventIterator { +pub trait EventIteratorCore { + /// Iterates over references to `EventCore` elements. + fn next(&mut self) -> Option<&EventCore>; +} + +/// A [EventIteratorCore] specialized to vector-based containers. +// TODO: use trait aliases once stable. +pub trait EventIterator: EventIteratorCore> { /// Iterates over references to `Event` elements. fn next(&mut self) -> Option<&Event>; } +impl>> EventIterator for E { + fn next(&mut self) -> Option<&Event> { + >::next(self) + } +} -/// Receives `Event` events. -pub trait EventPusher { +/// Receives `EventCore` events. +pub trait EventPusherCore { /// Provides a new `Event` to the pusher. - fn push(&mut self, event: Event); + fn push(&mut self, event: EventCore); } +/// A [EventPusherCore] specialized to vector-based containers. +// TODO: use trait aliases once stable. +pub trait EventPusher: EventPusherCore> {} +impl>> EventPusher for E {} + // implementation for the linked list behind a `Handle`. -impl EventPusher for ::std::sync::mpsc::Sender> { - fn push(&mut self, event: Event) { +impl EventPusherCore for ::std::sync::mpsc::Sender> { + fn push(&mut self, event: EventCore) { // NOTE: An Err(x) result just means "data not accepted" most likely // because the receiver is gone. No need to panic. let _ = self.send(event); @@ -47,37 +67,40 @@ pub mod link { use std::rc::Rc; use std::cell::RefCell; - use super::{Event, EventPusher, EventIterator}; + use super::{EventCore, EventPusherCore, EventIteratorCore}; - /// A linked list of Event. - pub struct EventLink { + /// A linked list of EventCore. + pub struct EventLinkCore { /// An event, if one exists. /// /// An event might not exist, if either we want to insert a `None` and have the output iterator pause, /// or in the case of the very first linked list element, which has no event when constructed. - pub event: Option>, + pub event: Option>, /// The next event, if it exists. - pub next: RefCell>>>, + pub next: RefCell>>>, } - impl EventLink { + /// A [EventLinkCore] specialized to vector-based containers. + pub type EventLink = EventLinkCore>; + + impl EventLinkCore { /// Allocates a new `EventLink`. - pub fn new() -> EventLink { - EventLink { event: None, next: RefCell::new(None) } + pub fn new() -> EventLinkCore { + EventLinkCore { event: None, next: RefCell::new(None) } } } // implementation for the linked list behind a `Handle`. - impl EventPusher for Rc> { - fn push(&mut self, event: Event) { - *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) })); + impl EventPusherCore for Rc> { + fn push(&mut self, event: EventCore) { + *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) })); let next = self.next.borrow().as_ref().unwrap().clone(); *self = next; } } - impl EventIterator for Rc> { - fn next(&mut self) -> Option<&Event> { + impl EventIteratorCore for Rc> { + fn next(&mut self) -> Option<&EventCore> { let is_some = self.next.borrow().is_some(); if is_some { let next = self.next.borrow().as_ref().unwrap().clone(); @@ -91,7 +114,7 @@ pub mod link { } // Drop implementation to prevent stack overflow through naive drop impl. - impl Drop for EventLink { + impl Drop for EventLinkCore { fn drop(&mut self) { while let Some(link) = self.next.replace(None) { if let Ok(head) = Rc::try_unwrap(link) { @@ -101,7 +124,7 @@ pub mod link { } } - impl Default for EventLink { + impl Default for EventLinkCore { fn default() -> Self { Self::new() } @@ -109,10 +132,10 @@ pub mod link { #[test] fn avoid_stack_overflow_in_drop() { - let mut event1 = Rc::new(EventLink::<(),()>::new()); + let mut event1 = Rc::new(EventLinkCore::<(),()>::new()); let _event2 = event1.clone(); for _ in 0 .. 1_000_000 { - event1.push(Event::Progress(vec![])); + event1.push(EventCore::Progress(vec![])); } } } @@ -122,33 +145,36 @@ pub mod binary { use std::io::Write; use abomonation::Abomonation; - use super::{Event, EventPusher, EventIterator}; + use super::{EventCore, EventPusherCore, EventIteratorCore}; - /// A wrapper for `W: Write` implementing `EventPusher`. - pub struct EventWriter { + /// A wrapper for `W: Write` implementing `EventPusherCore`. + pub struct EventWriterCore { stream: W, phant: ::std::marker::PhantomData<(T,D)>, } - impl EventWriter { + /// [EventWriterCore] specialized to vector-based containers. + pub type EventWriter = EventWriterCore, W>; + + impl EventWriterCore { /// Allocates a new `EventWriter` wrapping a supplied writer. - pub fn new(w: W) -> EventWriter { - EventWriter { + pub fn new(w: W) -> Self { + Self { stream: w, phant: ::std::marker::PhantomData, } } } - impl EventPusher for EventWriter { - fn push(&mut self, event: Event) { + impl EventPusherCore for EventWriterCore { + fn push(&mut self, event: EventCore) { // TODO: `push` has no mechanism to report errors, so we `unwrap`. unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); } } } /// A Wrapper for `R: Read` implementing `EventIterator`. - pub struct EventReader { + pub struct EventReaderCore { reader: R, bytes: Vec, buff1: Vec, @@ -158,10 +184,13 @@ pub mod binary { phant: ::std::marker::PhantomData<(T,D)>, } - impl EventReader { + /// [EventReaderCore] specialized to vector-based containers. + pub type EventReader = EventReaderCore, R>; + + impl EventReaderCore { /// Allocates a new `EventReader` wrapping a supplied reader. - pub fn new(r: R) -> EventReader { - EventReader { + pub fn new(r: R) -> Self { + Self { reader: r, bytes: vec![0u8; 1 << 20], buff1: vec![], @@ -173,12 +202,12 @@ pub mod binary { } } - impl EventIterator for EventReader { - fn next(&mut self) -> Option<&Event> { + impl EventIteratorCore for EventReaderCore { + fn next(&mut self) -> Option<&EventCore> { // if we can decode something, we should just return it! :D - if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { - let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); + if unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.is_some() { + let (item, rest) = unsafe { ::abomonation::decode::>(&mut self.buff1[self.consumed..]) }.unwrap(); self.consumed = self.valid - rest.len(); return Some(item); } diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/capture/extract.rs index e17146394..dcf57ae3b 100644 --- a/timely/src/dataflow/operators/capture/extract.rs +++ b/timely/src/dataflow/operators/capture/extract.rs @@ -1,12 +1,14 @@ //! Traits and types for extracting captured timely dataflow streams. -use super::Event; +use super::EventCore; +use crate::Container; +use crate::Data; /// Supports extracting a sequence of timestamp and data. pub trait Extract { /// Converts `self` into a sequence of timestamped data. /// - /// Currently this is only implemented for `Receiver>`, and is used only + /// Currently this is only implemented for `Receiver>>`, and is used only /// to easily pull data out of a timely dataflow computation once it has completed. /// /// # Examples @@ -16,7 +18,7 @@ pub trait Extract { /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; - /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; + /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -28,7 +30,7 @@ pub trait Extract { /// let send = send.lock().unwrap().clone(); /// /// // these are to capture/replay the stream. - /// let handle1 = Rc::new(EventLink::new()); + /// let handle1 = Rc::new(EventLinkCore::new()); /// let handle2 = Some(handle1.clone()); /// /// worker.dataflow::(|scope1| @@ -47,15 +49,9 @@ pub trait Extract { fn extract(self) -> Vec<(T, Vec)>; } -impl Extract for ::std::sync::mpsc::Receiver> { +impl Extract for ::std::sync::mpsc::Receiver>> { fn extract(self) -> Vec<(T, Vec)> { - let mut result = Vec::new(); - for event in self { - if let Event::Messages(time, data) = event { - result.push((time, data)); - } - } - result.sort_by(|x,y| x.0.cmp(&y.0)); + let mut result = self.extract_core(); let mut current = 0; for i in 1 .. result.len() { @@ -75,3 +71,61 @@ impl Extract for ::std::sync::mpsc::Receiver> { result } } + +/// Supports extracting a sequence of timestamp and data. +pub trait ExtractCore { + /// Converts `self` into a sequence of timestamped data. + /// + /// Currently this is only implemented for `Receiver>`, and is used only + /// to easily pull data out of a timely dataflow computation once it has completed. + /// + /// # Examples + /// + /// ```rust + /// use std::rc::Rc; + /// use std::sync::{Arc, Mutex}; + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; + /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, ExtractCore}; + /// + /// // get send and recv endpoints, wrap send to share + /// let (send, recv) = ::std::sync::mpsc::channel(); + /// let send = Arc::new(Mutex::new(send)); + /// + /// timely::execute(timely::Config::thread(), move |worker| { + /// + /// // this is only to validate the output. + /// let send = send.lock().unwrap().clone(); + /// + /// // these are to capture/replay the stream. + /// let handle1 = Rc::new(EventLinkCore::new()); + /// let handle2 = Some(handle1.clone()); + /// + /// worker.dataflow::(|scope1| + /// (0..10).to_stream(scope1) + /// .capture_into(handle1) + /// ); + /// + /// worker.dataflow(|scope2| { + /// handle2.replay_into(scope2) + /// .capture_into(send) + /// }); + /// }).unwrap(); + /// + /// assert_eq!(recv.extract_core().into_iter().flat_map(|x| x.1).collect::>(), (0..10).collect::>()); + /// ``` + fn extract_core(self) -> Vec<(T, C)>; +} + +impl ExtractCore for ::std::sync::mpsc::Receiver> { + fn extract_core(self) -> Vec<(T, C)> { + let mut result = Vec::new(); + for event in self { + if let EventCore::Messages(time, data) = event { + result.push((time, data)); + } + } + result.retain(|x| !x.1.is_empty()); + result + } +} diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/capture/mod.rs index 713b84c96..22d332ea0 100644 --- a/timely/src/dataflow/operators/capture/mod.rs +++ b/timely/src/dataflow/operators/capture/mod.rs @@ -22,10 +22,10 @@ //! use std::rc::Rc; //! use timely::dataflow::Scope; //! use timely::dataflow::operators::{Capture, ToStream, Inspect}; -//! use timely::dataflow::operators::capture::{EventLink, Replay}; +//! use timely::dataflow::operators::capture::{EventLinkCore, Replay}; //! //! timely::execute(timely::Config::thread(), |worker| { -//! let handle1 = Rc::new(EventLink::new()); +//! let handle1 = Rc::new(EventLinkCore::new()); //! let handle2 = Some(handle1.clone()); //! //! worker.dataflow::(|scope1| @@ -75,11 +75,11 @@ pub use self::capture::Capture; pub use self::replay::Replay; -pub use self::extract::Extract; -pub use self::event::{Event, EventPusher}; -pub use self::event::link::EventLink; -pub use self::event::binary::EventReader; -pub use self::event::binary::EventWriter; +pub use self::extract::{Extract, ExtractCore}; +pub use self::event::{Event, EventCore, EventPusher, EventPusherCore}; +pub use self::event::link::{EventLink, EventLinkCore}; +pub use self::event::binary::{EventReader, EventReaderCore}; +pub use self::event::binary::{EventWriter, EventWriterCore}; pub mod capture; pub mod replay; diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2492076af..2cc325419 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -38,20 +38,20 @@ //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. -use crate::Data; -use crate::dataflow::{Scope, Stream}; -use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::channels::pushers::CounterCore as PushCounter; +use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; -use super::Event; -use super::event::EventIterator; +use super::EventCore; +use super::event::EventIteratorCore; +use crate::Container; /// Replay a capture stream into a scope with the same timestamp. -pub trait Replay : Sized { +pub trait Replay : Sized { /// Replays `self` into the provided scope, as a `Stream`. - fn replay_into>(self, scope: &mut S) -> Stream { + fn replay_into>(self, scope: &mut S) -> StreamCore { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } /// Replays `self` into the provided scope, as a `Stream'. @@ -59,13 +59,13 @@ pub trait Replay : Sized { /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself.us - fn replay_core>(self, scope: &mut S, period: Option) -> Stream; + fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; } -impl Replay for I +impl Replay for I where I : IntoIterator, - ::Item: EventIterator+'static { - fn replay_core>(self, scope: &mut S, period: Option) -> Stream{ + ::Item: EventIteratorCore+'static { + fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); @@ -77,6 +77,7 @@ where I : IntoIterator, let mut output = PushBuffer::new(PushCounter::new(targets)); let mut event_streams = self.into_iter().collect::>(); let mut started = false; + let mut allocation: C = Default::default(); builder.build( move |progress| { @@ -91,12 +92,13 @@ where I : IntoIterator, for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { - match *event { - Event::Progress(ref vec) => { + match event { + EventCore::Progress(vec) => { progress.internals[0].extend(vec.iter().cloned()); }, - Event::Messages(ref time, ref data) => { - output.session(time).give_iterator(data.iter().cloned()); + EventCore::Messages(ref time, data) => { + allocation.clone_from(data); + output.session(time).give_container(&mut allocation); } } } diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/concat.rs index db5e0182a..449609f5b 100644 --- a/timely/src/dataflow/operators/concat.rs +++ b/timely/src/dataflow/operators/concat.rs @@ -1,12 +1,12 @@ //! Merges the contents of multiple streams. -use crate::Data; +use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamCore, Scope}; /// Merge the contents of two streams. -pub trait Concat { +pub trait Concat { /// Merge the contents of two streams. /// /// # Examples @@ -20,17 +20,17 @@ pub trait Concat { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(&self, _: &Stream) -> Stream; + fn concat(&self, _: &StreamCore) -> StreamCore; } -impl Concat for Stream { - fn concat(&self, other: &Stream) -> Stream { - self.scope().concatenate(vec![self.clone(), other.clone()]) +impl Concat for StreamCore { + fn concat(&self, other: &StreamCore) -> StreamCore { + self.scope().concatenate([self.clone(), other.clone()]) } } /// Merge the contents of multiple streams. -pub trait Concatenate { +pub trait Concatenate { /// Merge the contents of multiple streams. /// /// # Examples @@ -47,25 +47,25 @@ pub trait Concatenate { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concatenate(&self, sources: I) -> Stream + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator>; + I: IntoIterator>; } -impl Concatenate for Stream { - fn concatenate(&self, sources: I) -> Stream +impl Concatenate for StreamCore { + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator> + I: IntoIterator> { let clone = self.clone(); self.scope().concatenate(Some(clone).into_iter().chain(sources)) } } -impl Concatenate for G { - fn concatenate(&self, sources: I) -> Stream +impl Concatenate for G { + fn concatenate(&self, sources: I) -> StreamCore where - I: IntoIterator> + I: IntoIterator> { // create an operator builder. @@ -81,13 +81,13 @@ impl Concatenate for G { // build an operator that plays out all input data. builder.build(move |_capability| { - let mut vector = Vec::new(); + let mut vector = Default::default(); move |_frontier| { let mut output = output.activate(); for handle in handles.iter_mut() { handle.for_each(|time, data| { data.swap(&mut vector); - output.session(&time).give_vec(&mut vector); + output.session(&time).give_container(&mut vector); }) } } diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 6153d7f50..cc05b1b94 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -25,18 +25,18 @@ use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; use crate::order::Product; -use crate::Data; +use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::channels::pushers::{Counter, Tee}; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; +use crate::dataflow::channels::{BundleCore, Message}; use crate::worker::AsWorker; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamCore, Scope, Stream}; use crate::dataflow::scopes::{Child, ScopeParent}; use crate::dataflow::operators::delay::Delay; /// Extension trait to move a `Stream` into a child of its current `Scope`. -pub trait Enter, D: Data> { +pub trait Enter, C: Container> { /// Moves the `Stream` argument into a child of its current `Scope`. /// /// # Examples @@ -51,7 +51,7 @@ pub trait Enter, D: Data> { /// }); /// }); /// ``` - fn enter<'a>(&self, _: &Child<'a, G, T>) -> Stream, D>; + fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore, C>; } use crate::dataflow::scopes::child::Iterative; @@ -75,21 +75,21 @@ pub trait EnterAt { fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream, D> ; } -impl::Timestamp, T>, D>> EnterAt for E { +impl::Timestamp, T>, Vec>> EnterAt for E { fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) -> Stream, D> { self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum))) } } -impl, D: Data> Enter for Stream { - fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream, D> { +impl, C: Data+Container> Enter for StreamCore { + fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { use crate::scheduling::Scheduler; - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = TeeCore::::new(); let ingress = IngressNub { - targets: Counter::new(targets), + targets: CounterCore::new(targets), phantom: ::std::marker::PhantomData, activator: scope.activator_for(&scope.addr()), active: false, @@ -100,12 +100,12 @@ impl, D: Data> Enter for S let channel_id = scope.clone().new_identifier(); self.connect_to(input, ingress, channel_id); - Stream::new(Source::new(0, input.port), registrar, scope.clone()) + StreamCore::new(Source::new(0, input.port), registrar, scope.clone()) } } /// Extension trait to move a `Stream` to the parent of its current `Scope`. -pub trait Leave { +pub trait Leave { /// Moves a `Stream` to the parent of its current `Scope`. /// /// # Examples @@ -120,20 +120,20 @@ pub trait Leave { /// }); /// }); /// ``` - fn leave(&self) -> Stream; + fn leave(&self) -> StreamCore; } -impl<'a, G: Scope, D: Data, T: Timestamp+Refines> Leave for Stream, D> { - fn leave(&self) -> Stream { +impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, D> { + fn leave(&self) -> StreamCore { let scope = self.scope(); let output = scope.subgraph.borrow_mut().new_output(); - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = TeeCore::::new(); let channel_id = scope.clone().new_identifier(); self.connect_to(Target::new(0, output.port), EgressNub { targets, phantom: PhantomData }, channel_id); - Stream::new( + StreamCore::new( output, registrar, scope.parent, @@ -142,19 +142,19 @@ impl<'a, G: Scope, D: Data, T: Timestamp+Refines> Leave for } -struct IngressNub, TData: Data> { - targets: Counter>, +struct IngressNub, TData: Container> { + targets: CounterCore>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TData: Data> Push> for IngressNub { - fn push(&mut self, message: &mut Option>) { - if let Some(message) = message { +impl, TData: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { + if let Some(message) = element { let outer_message = message.as_mut(); - let data = ::std::mem::replace(&mut outer_message.data, Vec::new()); - let mut inner_message = Some(Bundle::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); + let data = ::std::mem::take(&mut outer_message.data); + let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { if let Some(inner_message) = inner_message.if_typed() { @@ -175,17 +175,17 @@ impl, TData: Data> Push, TData: Data> { - targets: Tee, + targets: TeeCore, phantom: PhantomData, } -impl Push> for EgressNub +impl Push> for EgressNub where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { let inner_message = message.as_mut(); - let data = ::std::mem::replace(&mut inner_message.data, Vec::new()); - let mut outer_message = Some(Bundle::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); + let data = ::std::mem::take(&mut inner_message.data); + let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { if let Some(outer_message) = outer_message.if_typed() { diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/exchange.rs index c106ce37f..7ca26d402 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/exchange.rs @@ -28,11 +28,11 @@ pub trait Exchange { // impl, D: ExchangeData> Exchange for Stream { impl Exchange for Stream { fn exchange(&self, route: impl Fn(&D)->u64+'static) -> Stream { - let mut vector = Vec::new(); + let mut vector = Default::default(); self.unary(ExchangePact::new(route), "Exchange", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - output.session(&time).give_vec(&mut vector); + output.session(&time).give_container(&mut vector); }); }) } diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index e4f855bc9..a7eb90c65 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -1,14 +1,14 @@ //! Create cycles in a timely dataflow graph. -use crate::Data; +use crate::{Container, Data}; use crate::progress::{Timestamp, PathSummary}; use crate::progress::frontier::Antichain; use crate::order::Product; -use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pushers::TeeCore; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamCore, Scope, Stream}; use crate::dataflow::scopes::child::Iterative; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OutputWrapper; @@ -37,6 +37,29 @@ pub trait Feedback { /// }); /// ``` fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); + + /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`. + /// + /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with + /// its `Handle` passed as an argument. Data passed through the stream will have their + /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// + /// timely::example(|scope| { + /// // circulate 0..10 for 100 iterations. + /// let (handle, cycle) = scope.feedback_core::>(1); + /// (0..10).to_stream(scope) + /// .concat(&cycle) + /// .inspect(|x| println!("seen: {:?}", x)) + /// .branch_when(|t| t < &100).1 + /// .connect_loop(handle); + /// }); + /// ``` + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -64,27 +87,31 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, Stream, D>); + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); } impl Feedback for G { fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { + self.feedback_core(summary) + } + + fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); - (Handle { builder, summary, output }, stream) + (HandleCore { builder, summary, output }, stream) } } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, Stream, D>) { - self.feedback(Product::new(Default::default(), summary)) + fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { + self.feedback_core(Product::new(Default::default(), summary)) } } /// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { +pub trait ConnectLoop { /// Connect a `Stream` to be the input of a loop variable. /// /// # Examples @@ -102,11 +129,11 @@ pub trait ConnectLoop { /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(&self, _: Handle); + fn connect_loop(&self, _: HandleCore); } -impl ConnectLoop for Stream { - fn connect_loop(&self, helper: Handle) { +impl ConnectLoop for StreamCore { + fn connect_loop(&self, helper: HandleCore) { let mut builder = helper.builder; let summary = helper.summary; @@ -114,7 +141,7 @@ impl ConnectLoop for Stream { let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); - let mut vector = Vec::new(); + let mut vector = Default::default(); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); input.for_each(|cap, data| { @@ -123,7 +150,7 @@ impl ConnectLoop for Stream { let new_cap = cap.delayed(&new_time); output .session(&new_cap) - .give_vec(&mut vector); + .give_container(&mut vector); } }); }); @@ -132,8 +159,11 @@ impl ConnectLoop for Stream { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct Handle { +pub struct HandleCore { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper>, + output: OutputWrapper>, } + +/// A `HandleCore` specialized for using `Vec` as container +pub type Handle = HandleCore>; diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index a4c151ea8..8e97492af 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -8,16 +8,15 @@ use std::default::Default; use std::rc::Rc; use std::cell::RefCell; -use crate::Data; - use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; -use crate::dataflow::{Stream, Scope}; -use crate::dataflow::channels::pushers::Tee; -use crate::dataflow::channels::pact::ParallelizationContract; +use crate::Container; +use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pact::ParallelizationContractCore; use crate::dataflow::operators::generic::operator_info::OperatorInfo; /// Contains type-free information about the operator properties. @@ -106,17 +105,17 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &Stream, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller where - P: ParallelizationContract { + P: ParallelizationContractCore { let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; self.new_input_connection(stream, pact, connection) } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &Stream, pact: P, connection: Vec::Summary>>) -> P::Puller + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller where - P: ParallelizationContract { + P: ParallelizationContractCore { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); @@ -132,18 +131,18 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (Tee, Stream) { + pub fn new_output(&mut self) -> (TeeCore, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, Stream) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (TeeCore, StreamCore) { - let (targets, registrar) = Tee::::new(); + let (targets, registrar) = TeeCore::::new(); let source = Source::new(self.index, self.shape.outputs); - let stream = Stream::new(source, registrar, self.scope.clone()); + let stream = StreamCore::new(source, registrar, self.scope.clone()); self.shape.outputs += 1; assert_eq!(self.shape.inputs, connection.len()); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 21eee25dd..cc505e7ee 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -4,20 +4,19 @@ use std::rc::Rc; use std::cell::RefCell; use std::default::Default; -use crate::Data; - use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; -use crate::dataflow::{Stream, Scope}; -use crate::dataflow::channels::pushers::Tee; -use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; -use crate::dataflow::channels::pact::ParallelizationContract; +use crate::Container; +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pushers::CounterCore as PushCounter; +use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pact::ParallelizationContractCore; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::operators::generic::handles::{InputHandle, new_input_handle, OutputWrapper}; +use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; @@ -57,9 +56,9 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &Stream, pact: P) -> InputHandle + pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore where - P: ParallelizationContract { + P: ParallelizationContractCore { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()]; self.new_input_connection(stream, pact, connection) @@ -73,9 +72,9 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &Stream, pact: P, connection: Vec::Summary>>) -> InputHandle + pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore where - P: ParallelizationContract { + P: ParallelizationContractCore { let puller = self.builder.new_input_connection(stream, pact, connection); @@ -87,7 +86,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, Stream) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -100,7 +99,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, Stream) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { let (tee, stream) = self.builder.new_output_connection(connection); @@ -224,8 +223,8 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::<()>(); - let (mut output2, _stream2) = builder.new_output::<()>(); + let (mut output1, _stream1) = builder.new_output::>(); + let (mut output2, _stream2) = builder.new_output::>(); builder.build(move |capabilities| { move |_frontiers| { @@ -254,8 +253,8 @@ mod tests { let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone()); // let mut input = builder.new_input(stream, Pipeline); - let (mut output1, _stream1) = builder.new_output::<()>(); - let (mut output2, _stream2) = builder.new_output::<()>(); + let (mut output1, _stream1) = builder.new_output::>(); + let (mut output2, _stream2) = builder.new_output::>(); builder.build(move |mut capabilities| { move |_frontiers| { diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index b9d69f634..f78d75c6e 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -6,42 +6,48 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::Data; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pullers::Counter as PullCounter; -use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::pushers::CounterCore as PushCounter; +use crate::dataflow::channels::pushers::buffer::{BufferCore, Session}; +use crate::dataflow::channels::BundleCore; use crate::communication::{Push, Pull, message::RefOrMut}; +use crate::Container; use crate::logging::TimelyLogger as Logger; use crate::dataflow::operators::CapabilityRef; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandle>> { +pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, logging: Option, } +/// Handle to an operator's input stream, specialized to vectors. +pub type InputHandle = InputHandleCore, P>; + /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandle<'a, T: Timestamp, D: 'a, P: Pull>+'a> { +pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { /// The underlying input handle. - pub handle: &'a mut InputHandle, + pub handle: &'a mut InputHandleCore, /// The frontier as reported by timely progress tracking. pub frontier: &'a MutableAntichain, } -impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { +/// Handle to an operator's input stream and frontier, specialized to vectors. +pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; + +impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut>)> { + pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { let internal = &self.internal; self.pull_counter.next().map(|bundle| { match bundle.as_ref_or_mut() { @@ -74,7 +80,7 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { /// }); /// ``` #[inline] - pub fn for_each, RefOrMut>)>(&mut self, mut logic: F) { + pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { // We inline `next()` so that we can use `self.logging` without cloning (and dropping) the logger. let internal = &self.internal; while let Some((cap, data)) = self.pull_counter.next().map(|bundle| { @@ -95,10 +101,10 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { } -impl<'a, T: Timestamp, D: Data, P: Pull>+'a> FrontieredInputHandle<'a, T, D, P> { +impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { /// Allocate a new frontiered input handle. - pub fn new(handle: &'a mut InputHandle, frontier: &'a MutableAntichain) -> Self { - FrontieredInputHandle { + pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { + FrontieredInputHandleCore { handle, frontier, } @@ -108,7 +114,7 @@ impl<'a, T: Timestamp, D: Data, P: Pull>+'a> FrontieredInputHandle< /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut>)> { + pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { self.handle.next() } @@ -131,7 +137,7 @@ impl<'a, T: Timestamp, D: Data, P: Pull>+'a> FrontieredInputHandle< /// }); /// ``` #[inline] - pub fn for_each, RefOrMut>)>(&mut self, logic: F) { + pub fn for_each, RefOrMut)>(&mut self, logic: F) { self.handle.for_each(logic) } @@ -142,14 +148,14 @@ impl<'a, T: Timestamp, D: Data, P: Pull>+'a> FrontieredInputHandle< } } -pub fn _access_pull_counter>>(input: &mut InputHandle) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>(pull_counter: PullCounter, internal: Rc>>>>>, logging: Option) -> InputHandle { - InputHandle { +pub fn new_input_handle>>(pull_counter: PullCounter, internal: Rc>>>>>, logging: Option) -> InputHandleCore { + InputHandleCore { pull_counter, internal, logging, @@ -162,14 +168,14 @@ pub fn new_input_handle>>(pull_counter: Pu /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: Buffer>, +pub struct OutputWrapper>> { + push_buffer: BufferCore>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: BufferCore>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -179,8 +185,8 @@ impl>> OutputWrapper { /// /// This method ensures that the only access to the push buffer is through the `OutputHandle` /// type which ensures the use of capabilities, and which calls `cease` when it is dropped. - pub fn activate(&mut self) -> OutputHandle { - OutputHandle { + pub fn activate(&mut self) -> OutputHandleCore { + OutputHandleCore { push_buffer: &mut self.push_buffer, internal_buffer: &self.internal_buffer, } @@ -189,12 +195,15 @@ impl>> OutputWrapper { /// Handle to an operator's output stream. -pub struct OutputHandle<'a, T: Timestamp, D: 'a, P: Push>+'a> { - push_buffer: &'a mut Buffer>, +pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push>+'a> { + push_buffer: &'a mut BufferCore>, internal_buffer: &'a Rc>>, } -impl<'a, T: Timestamp, D, P: Push>> OutputHandle<'a, T, D, P> { +/// Handle specialized to `Vec`-based container. +pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec, P>; + +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -217,13 +226,13 @@ impl<'a, T: Timestamp, D, P: Push>> OutputHandle<'a, T, D, P> { /// }); /// }); /// ``` - pub fn session<'b, C: CapabilityTrait>(&'b mut self, cap: &'b C) -> Session<'b, T, D, PushCounter> where 'a: 'b { + pub fn session<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter> where 'a: 'b { assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session(cap.time()) } } -impl<'a, T: Timestamp, D, P: Push>> Drop for OutputHandle<'a, T, D, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/mod.rs b/timely/src/dataflow/operators/generic/mod.rs index e22f0b548..e54f5e423 100644 --- a/timely/src/dataflow/operators/generic/mod.rs +++ b/timely/src/dataflow/operators/generic/mod.rs @@ -8,7 +8,7 @@ mod handles; mod notificator; mod operator_info; -pub use self::handles::{InputHandle, FrontieredInputHandle, OutputHandle, OutputWrapper}; +pub use self::handles::{InputHandle, InputHandleCore, FrontieredInputHandle, FrontieredInputHandleCore, OutputHandle, OutputHandleCore, OutputWrapper}; pub use self::notificator::{Notificator, FrontierNotificator}; pub use self::operator::{Operator, source}; diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 53a0d247a..268ea3f7c 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -1,22 +1,21 @@ //! Methods to construct generic streaming and blocking unary operators. -use crate::dataflow::channels::pushers::Tee; -use crate::dataflow::channels::pact::ParallelizationContract; +use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pact::ParallelizationContractCore; -use crate::dataflow::operators::generic::handles::{InputHandle, FrontieredInputHandle, OutputHandle}; +use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore}; use crate::dataflow::operators::capability::Capability; -use crate::Data; - -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, StreamCore}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; +use crate::Container; /// Methods to construct generic streaming and blocking operators. -pub trait Operator { +pub trait Operator { /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input. @@ -56,13 +55,13 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> Stream + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Data, + D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandle, - &mut OutputHandle>)+'static, - P: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContractCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -93,12 +92,12 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_notify, - &mut OutputHandle>, + fn unary_notify, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> Stream; + P: ParallelizationContractCore> + (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -128,13 +127,13 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(&self, pact: P, name: &str, constructor: B) -> Stream + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Data, + D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandle, - &mut OutputHandle>)+'static, - P: ParallelizationContract; + L: FnMut(&mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContractCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -186,16 +185,16 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Data, - D3: Data, + D2: Container, + D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandle, - &mut FrontieredInputHandle, - &mut OutputHandle>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore, + &mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContractCore, + P2: ParallelizationContractCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -242,15 +241,15 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_notify, - &mut InputHandle, - &mut OutputHandle>, + fn binary_notify, + &mut InputHandleCore, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (&self, other: &Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> Stream; + P1: ParallelizationContractCore, + P2: ParallelizationContractCore> + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -286,16 +285,16 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Data, - D3: Data, + D2: Container, + D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandle, - &mut InputHandle, - &mut OutputHandle>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract; + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContractCore, + P2: ParallelizationContractCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream @@ -322,19 +321,19 @@ pub trait Operator { /// ``` fn sink(&self, pact: P, name: &str, logic: L) where - L: FnMut(&mut FrontieredInputHandle)+'static, - P: ParallelizationContract; + L: FnMut(&mut FrontieredInputHandleCore)+'static, + P: ParallelizationContractCore; } -impl Operator for Stream { +impl Operator for StreamCore { - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> Stream + fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Data, + D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandle, - &mut OutputHandle>)+'static, - P: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContractCore { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -347,7 +346,7 @@ impl Operator for Stream { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input_handle = FrontieredInputHandle::new(&mut input, &frontiers[0]); + let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); let mut output_handle = output.activate(); logic(&mut input_handle, &mut output_handle); } @@ -356,12 +355,12 @@ impl Operator for Stream { stream } - fn unary_notify, - &mut OutputHandle>, + fn unary_notify, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { + P: ParallelizationContractCore> + (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -378,13 +377,13 @@ impl Operator for Stream { }) } - fn unary(&self, pact: P, name: &str, constructor: B) -> Stream + fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - D2: Data, + D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandle, - &mut OutputHandle>)+'static, - P: ParallelizationContract { + L: FnMut(&mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P: ParallelizationContractCore { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -406,16 +405,16 @@ impl Operator for Stream { stream } - fn binary_frontier(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Data, - D3: Data, + D2: Container, + D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandle, - &mut FrontieredInputHandle, - &mut OutputHandle>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore, + &mut FrontieredInputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContractCore, + P2: ParallelizationContractCore { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -429,8 +428,8 @@ impl Operator for Stream { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input1_handle = FrontieredInputHandle::new(&mut input1, &frontiers[0]); - let mut input2_handle = FrontieredInputHandle::new(&mut input2, &frontiers[1]); + let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]); + let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]); let mut output_handle = output.activate(); logic(&mut input1_handle, &mut input2_handle, &mut output_handle); } @@ -439,15 +438,15 @@ impl Operator for Stream { stream } - fn binary_notify, - &mut InputHandle, - &mut OutputHandle>, + fn binary_notify, + &mut InputHandleCore, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract> - (&self, other: &Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { + P1: ParallelizationContractCore, + P2: ParallelizationContractCore> + (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::new(); @@ -466,16 +465,16 @@ impl Operator for Stream { } - fn binary(&self, other: &Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream + fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - D2: Data, - D3: Data, + D2: Container, + D3: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandle, - &mut InputHandle, - &mut OutputHandle>)+'static, - P1: ParallelizationContract, - P2: ParallelizationContract { + L: FnMut(&mut InputHandleCore, + &mut InputHandleCore, + &mut OutputHandleCore>)+'static, + P1: ParallelizationContractCore, + P2: ParallelizationContractCore { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -500,15 +499,15 @@ impl Operator for Stream { fn sink(&self, pact: P, name: &str, mut logic: L) where - L: FnMut(&mut FrontieredInputHandle)+'static, - P: ParallelizationContract { + L: FnMut(&mut FrontieredInputHandleCore)+'static, + P: ParallelizationContractCore { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let mut input = builder.new_input(self, pact); builder.build(|_capabilities| { move |frontiers| { - let mut input_handle = FrontieredInputHandle::new(&mut input, &frontiers[0]); + let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); logic(&mut input_handle); } }); @@ -556,11 +555,11 @@ impl Operator for Stream { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> Stream +pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore where - D: Data, + D: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandle>)+'static { + L: FnMut(&mut OutputHandleCore>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); @@ -595,12 +594,12 @@ where /// timely::example(|scope| { /// /// -/// empty(scope) //-- type required in this example -/// .inspect(|_: &()| panic!("never called")); +/// empty(scope) // type required in this example +/// .inspect(|()| panic!("never called")); /// /// }); /// ``` -pub fn empty(scope: &G) -> Stream { +pub fn empty(scope: &G) -> StreamCore { source(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 11d02896d..72719c5f8 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -9,10 +9,12 @@ use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; use crate::progress::Source; -use crate::Data; +use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::{Stream, ScopeParent, Scope}; -use crate::dataflow::channels::{Message, pushers::{Tee, Counter}}; +use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore}; +use crate::dataflow::channels::pushers::{TeeCore, CounterCore}; +use crate::dataflow::channels::Message; + // TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something // TODO : more like a harness, with direct access to its inputs. @@ -58,6 +60,41 @@ pub trait Input : Scope { /// ``` fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); + /// Create a new [StreamCore] and [HandleCore] through which to supply input. + /// + /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used + /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce + /// data into the timely dataflow computation. + /// + /// The `HandleCore` also provides a means to indicate + /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely + /// to issue progress notifications. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::{Input, Inspect}; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = worker.dataflow(|scope| { + /// let (input, stream) = scope.new_input_core::>(); + /// stream.inspect(|x| println!("hello {:?}", x)); + /// input + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); + /// Create a new stream from a supplied interactive handle. /// /// This method creates a new timely stream whose data are supplied interactively through the `handle` @@ -89,20 +126,59 @@ pub trait Input : Scope { /// }); /// ``` fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; + + /// Create a new stream from a supplied interactive handle. + /// + /// This method creates a new timely stream whose data are supplied interactively through the `handle` + /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate + /// if it as attached to more than one stream. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::{Input, Inspect}; + /// use timely::dataflow::operators::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from_core(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { - let mut handle = Handle::new(); - let stream = self.input_from(&mut handle); - (handle, stream) + self.new_input_core() } fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { + self.input_from_core(handle) + } + + fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { + let mut handle = HandleCore::new(); + let stream = self.input_from_core(&mut handle); + (handle, stream) + } - let (output, registrar) = Tee::<::Timestamp, D>::new(); - let counter = Counter::new(output); + fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { + let (output, registrar) = TeeCore::<::Timestamp, D>::new(); + let counter = CounterCore::new(output); let produced = counter.produced().clone(); let index = self.allocate_operator_index(); @@ -126,7 +202,7 @@ impl Input for G where ::Timestamp: TotalOrder { copies, }), index); - Stream::new(Source::new(index, 0), registrar, self.clone()) + StreamCore::new(Source::new(index, 0), registrar, self.clone()) } } @@ -170,17 +246,19 @@ impl Operate for Operator { /// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct Handle { +pub struct HandleCore { activate: Vec, progress: Vec>>>, - pushers: Vec>>, - buffer1: Vec, - buffer2: Vec, + pushers: Vec>>, + buffer1: C, + buffer2: C, now_at: T, } -impl Handle { +/// A handle specialized to vector-based containers. +pub type Handle = HandleCore>; +impl HandleCore { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -208,12 +286,12 @@ impl Handle { /// }); /// ``` pub fn new() -> Self { - Handle { + Self { activate: Vec::new(), progress: Vec::new(), pushers: Vec::new(), - buffer1: Vec::with_capacity(Message::::default_length()), - buffer2: Vec::with_capacity(Message::::default_length()), + buffer1: Default::default(), + buffer2: Default::default(), now_at: T::minimum(), } } @@ -244,18 +322,18 @@ impl Handle { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> Stream + pub fn to_stream(&mut self, scope: &mut G) -> StreamCore where T: TotalOrder, G: ScopeParent, { - scope.input_from(self) + scope.input_from_core(self) } fn register( &mut self, - pusher: Counter>, - progress: Rc>> + pusher: CounterCore>, + progress: Rc>>, ) { // flush current contents, so new registrant does not see existing data. if !self.buffer1.is_empty() { self.flush(); } @@ -274,7 +352,7 @@ impl Handle { fn flush(&mut self) { for index in 0 .. self.pushers.len() { if index < self.pushers.len() - 1 { - self.buffer2.extend_from_slice(&self.buffer1[..]); + self.buffer2.clone_from(&self.buffer1); Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); debug_assert!(self.buffer2.is_empty()); } @@ -301,20 +379,35 @@ impl Handle { } } - #[inline] - /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. - pub fn send(&mut self, data: D) { - // assert!(self.buffer1.capacity() == Message::::default_length()); - self.buffer1.push(data); - if self.buffer1.len() == self.buffer1.capacity() { - self.flush(); - } - } - - /// Sends a batch of records into the corresponding timely dataflow `Stream`, at the current epoch. + /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. /// /// This method flushes single elements previously sent with `send`, to keep the insertion order. - pub fn send_batch(&mut self, buffer: &mut Vec) { + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::{Input, InspectCore}; + /// use timely::dataflow::operators::input::HandleCore; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = HandleCore::new(); + /// worker.dataflow(|scope| { + /// scope.input_from_core(&mut input) + /// .inspect_container(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send_batch(&mut vec![format!("{}", round)]); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send_batch(&mut self, buffer: &mut D) { if !buffer.is_empty() { // flush buffered elements to ensure local fifo. @@ -323,7 +416,7 @@ impl Handle { // push buffer (or clone of buffer) at each destination. for index in 0 .. self.pushers.len() { if index < self.pushers.len() - 1 { - self.buffer2.extend_from_slice(&buffer[..]); + self.buffer2.clone_from(&buffer); Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); assert!(self.buffer2.is_empty()); } @@ -370,13 +463,50 @@ impl Handle { } } +impl Handle { + #[inline] + /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::{Input, Inspect}; + /// use timely::dataflow::operators::input::Handle; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = Handle::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send(&mut self, data: D) { + // assert!(self.buffer1.capacity() == Message::::default_length()); + self.buffer1.push(data); + if self.buffer1.len() == self.buffer1.capacity() { + self.flush(); + } + } +} + impl Default for Handle { fn default() -> Self { Self::new() } } -impl Drop for Handle { +impl Drop for HandleCore { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index a8f09f787..53e0d0ca2 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -1,12 +1,13 @@ //! Extension trait and implementation for observing and action on streamed data. +use crate::Container; use crate::Data; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Stream, Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. -pub trait Inspect { +pub trait Inspect: InspectCore> { /// Runs a supplied closure on each observed data element. /// /// # Examples @@ -85,13 +86,43 @@ pub trait Inspect { } impl Inspect for Stream { + fn inspect_core(&self, mut func: F) -> Stream where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { + self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) + } +} + +/// Inspect containers +pub trait InspectCore { + /// Runs a supplied closure on each observed container, and each frontier advancement. + /// + /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data, + /// and `Err` for frontiers. Frontiers are only presented when they change. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Map, InspectCore}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .inspect_container(|event| { + /// match event { + /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()), + /// Err(frontier) => println!("frontier advanced to {:?}", frontier), + /// } + /// }); + /// }); + /// ``` + fn inspect_container(&self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; +} + +impl InspectCore for StreamCore { - fn inspect_core(&self, mut func: F) -> Stream - where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>)+'static + fn inspect_container(&self, mut func: F) -> StreamCore + where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { use crate::progress::timestamp::Timestamp; - let mut vector = Vec::new(); let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); + let mut vector = Default::default(); self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| { if input.frontier.frontier() != frontier.borrow() { frontier.clear(); @@ -100,8 +131,8 @@ impl Inspect for Stream { } input.for_each(|time, data| { data.swap(&mut vector); - func(Ok((&time, &vector[..]))); - output.session(&time).give_vec(&mut vector); + func(Ok((&time, &vector))); + output.session(&time).give_container(&mut vector); }); }) } diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 7b418df46..637a9a219 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -11,18 +11,18 @@ pub use self::enterleave::{Enter, EnterAt, Leave}; // pub use self::queue::*; pub use self::input::Input; -pub use self::unordered_input::UnorderedInput; +pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; pub use self::feedback::{Feedback, LoopVariable, ConnectLoop}; pub use self::concat::{Concat, Concatenate}; pub use self::partition::Partition; pub use self::map::Map; -pub use self::inspect::Inspect; +pub use self::inspect::{Inspect, InspectCore}; pub use self::filter::Filter; pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; pub use self::probe::Probe; -pub use self::to_stream::{ToStream, ToStreamAsync, Event}; +pub use self::to_stream::{ToStream, ToStreamCore, ToStreamAsync, Event}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 124be5972..7c5a8567e 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -5,18 +5,18 @@ use std::cell::RefCell; use crate::progress::Timestamp; use crate::progress::frontier::{AntichainRef, MutableAntichain}; -use crate::dataflow::channels::pushers::Counter as PushCounter; -use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; +use crate::dataflow::channels::pushers::CounterCore as PushCounter; +use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::Data; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamCore, Scope}; +use crate::Container; /// Monitors progress at a `Stream`. -pub trait Probe { +pub trait Probe { /// Constructs a progress probe which indicates which timestamps have elapsed at the operator. /// /// # Examples @@ -76,10 +76,10 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &mut Handle) -> Stream; + fn probe_with(&self, handle: &mut Handle) -> StreamCore; } -impl Probe for Stream { +impl Probe for StreamCore { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. @@ -87,7 +87,7 @@ impl Probe for Stream { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &mut Handle) -> Stream { + fn probe_with(&self, handle: &mut Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -97,7 +97,7 @@ impl Probe for Stream { let shared_frontier = handle.frontier.clone(); let mut started = false; - let mut vector = Vec::new(); + let mut vector = Default::default(); builder.build( move |progress| { @@ -120,7 +120,7 @@ impl Probe for Stream { RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; data.swap(&mut vector); - output.session(time).give_vec(&mut vector); + output.session(time).give_container(&mut vector); } output.cease(); diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 5a1709620..597ec9163 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -3,11 +3,11 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::Container; -use crate::dataflow::channels::Message; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::CapabilitySet; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::{StreamCore, Scope, Stream}; use crate::progress::Timestamp; use crate::Data; @@ -48,7 +48,8 @@ impl ToStream for I where I:: if let Some(element) = iterator.next() { let mut session = output.session(capability.as_ref().unwrap()); session.give(element); - for element in iterator.by_ref().take((256 * Message::::default_length()) - 1) { + let n = 256 * crate::container::buffer::default_capacity::(); + for element in iterator.by_ref().take(n - 1) { session.give(element); } activator.activate(); @@ -61,6 +62,57 @@ impl ToStream for I where I:: } } +/// Converts to a timely [StreamCore]. +pub trait ToStreamCore { + /// Converts to a timely [StreamCore]. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::{ToStreamCore, Capture}; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = Some((0..3).collect::>()).to_stream_core(scope).capture(); + /// let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream_core>(self, scope: &mut S) -> StreamCore; +} + +impl ToStreamCore for I where I::Item: Container { + fn to_stream_core>(self, scope: &mut S) -> StreamCore { + + source(scope, "ToStreamCore", |capability, info| { + + // Acquire an activator, so that the operator can rescheduled itself. + let activator = scope.activator_for(&info.address[..]); + + let mut iterator = self.into_iter().fuse(); + let mut capability = Some(capability); + + move |output| { + + if let Some(mut element) = iterator.next() { + let mut session = output.session(capability.as_ref().unwrap()); + session.give_container(&mut element); + let n = 256; + for mut element in iterator.by_ref().take(n - 1) { + session.give_container(&mut element); + } + activator.activate(); + } + else { + capability = None; + } + } + }) + } +} + /// Data and progress events of the native stream. pub enum Event { /// Indicates that timestamps have advanced to frontier F diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 6dc163bbf..c7e600234 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -2,6 +2,7 @@ use std::rc::Rc; use std::cell::RefCell; +use crate::Container; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -11,12 +12,12 @@ use crate::progress::Source; use crate::progress::ChangeBatch; use crate::Data; -use crate::dataflow::channels::pushers::{Tee, Counter as PushCounter}; -use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession}; +use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; +use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore}; use crate::dataflow::operators::{ActivateCapability, Capability}; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Stream, Scope, StreamCore}; /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { @@ -79,8 +80,76 @@ pub trait UnorderedInput { impl UnorderedInput for G { fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { + self.new_unordered_input_core() + } +} + +/// An unordered handle specialized to vectors. +pub type UnorderedHandle = UnorderedHandleCore>; + +/// Create a new `Stream` and `Handle` through which to supply input. +pub trait UnorderedInputCore { + /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This + /// input supports multiple open epochs (timestamps) at the same time. + /// + /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used + /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Capability` returned is for the default value of the timestamp type in use. The + /// capability can be dropped to inform the system that the input has advanced beyond the + /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp + /// should be obtained first, via the `delayed` function for `Capability`. + /// + /// To communicate the end-of-input drop all available capabilities. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// + /// use timely::*; + /// use timely::dataflow::operators::*; + /// use timely::dataflow::operators::capture::Extract; + /// use timely::dataflow::Stream; + /// + /// // get send and recv endpoints, wrap send to share + /// let (send, recv) = ::std::sync::mpsc::channel(); + /// let send = Arc::new(Mutex::new(send)); + /// + /// timely::execute(Config::thread(), move |worker| { + /// + /// // this is only to validate the output. + /// let send = send.lock().unwrap().clone(); + /// + /// // create and capture the unordered input. + /// let (mut input, mut cap) = worker.dataflow::(|scope| { + /// let (input, stream) = scope.new_unordered_input_core(); + /// stream.capture_into(send); + /// input + /// }); + /// + /// // feed values 0..10 at times 0..10. + /// for round in 0..10 { + /// input.session(cap.clone()).give(round); + /// cap = cap.delayed(&(round + 1)); + /// worker.step(); + /// } + /// }).unwrap(); + /// + /// let extract = recv.extract(); + /// for i in 0..10 { + /// assert_eq!(extract[i], (i, vec![i])); + /// } + /// ``` + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); +} + + +impl UnorderedInputCore for G { + fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { - let (output, registrar) = Tee::::new(); + let (output, registrar) = TeeCore::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); @@ -94,7 +163,7 @@ impl UnorderedInput for G { let cap = ActivateCapability::new(cap, &address, self.activations()); - let helper = UnorderedHandle::new(counter); + let helper = UnorderedHandleCore::new(counter); self.add_operator_with_index(Box::new(UnorderedOperator { name: "UnorderedInput".to_owned(), @@ -105,7 +174,7 @@ impl UnorderedInput for G { peers, }), index); - ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone())) + ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) } } @@ -144,21 +213,21 @@ impl Operate for UnorderedOperator { fn notify_me(&self) -> bool { false } } -/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. +/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandle { - buffer: PushBuffer>>, +pub struct UnorderedHandleCore { + buffer: PushBuffer>>, } -impl UnorderedHandle { - fn new(pusher: PushCounter>) -> UnorderedHandle { - UnorderedHandle { +impl UnorderedHandleCore { + fn new(pusher: PushCounter>) -> UnorderedHandleCore { + UnorderedHandleCore { buffer: PushBuffer::new(pusher), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 8a3bccfbd..aa5e48601 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -9,17 +9,18 @@ use crate::progress::{Source, Target}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::BundleCore; use std::fmt::{self, Debug}; +use crate::Container; // use dataflow::scopes::root::loggers::CHANNELS_Q; -/// Abstraction of a stream of `D: Data` records timestamped with `S::Timestamp`. +/// Abstraction of a stream of `D: Container` records timestamped with `S::Timestamp`. /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. #[derive(Clone)] -pub struct Stream { +pub struct StreamCore { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. @@ -28,12 +29,15 @@ pub struct Stream { ports: TeeHelper, } -impl Stream { +/// A stream batching data in vectors. +pub type Stream = StreamCore>; + +impl StreamCore { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { @@ -48,7 +52,7 @@ impl Stream { } /// Allocates a `Stream` from a supplied `Source` name and rendezvous point. pub fn new(source: Source, output: TeeHelper, scope: S) -> Self { - Stream { name: source, ports: output, scope } + Self { name: source, ports: output, scope } } /// The name of the stream's source operator. pub fn name(&self) -> &Source { &self.name } @@ -56,7 +60,7 @@ impl Stream { pub fn scope(&self) -> S { self.scope.clone() } } -impl Debug for Stream +impl Debug for StreamCore where S: Scope, { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 96714e737..a665e2f34 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -233,12 +233,12 @@ where use ::std::net::TcpStream; use crate::logging::BatchLogger; - use crate::dataflow::operators::capture::EventWriter; + use crate::dataflow::operators::capture::EventWriterCore; eprintln!("enabled COMM logging to {}", addr); if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); + let writer = EventWriterCore::new(stream); let mut logger = BatchLogger::new(writer); result = Some(crate::logging_core::Logger::new( ::std::time::Instant::now(), @@ -267,10 +267,10 @@ where use ::std::net::TcpStream; use crate::logging::{BatchLogger, TimelyEvent}; - use crate::dataflow::operators::capture::EventWriter; + use crate::dataflow::operators::capture::EventWriterCore; if let Ok(stream) = TcpStream::connect(&addr) { - let writer = EventWriter::new(stream); + let writer = EventWriterCore::new(stream); let mut logger = BatchLogger::new(writer); worker.log_register() .insert::("timely", move |time, data| diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 2241ae39f..96d7c7d99 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -76,6 +76,12 @@ pub use timely_communication::Config as CommunicationConfig; pub use worker::Config as WorkerConfig; pub use execute::Config as Config; +pub use timely_container::Container; +/// Re-export of the `timely_container` crate. +pub mod container { + pub use timely_container::*; +} + /// Re-export of the `timely_communication` crate. pub mod communication { pub use timely_communication::*; From 6015eff84b24bfb145fa65d32aba47812ca43643 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 9 Dec 2021 12:10:50 +0100 Subject: [PATCH 3/3] Introduce TimelyStack This is mostly a copy of ColumnStack plus a few implementations. We'd like to keep the columnar library mostly untouched and hence have our own variant within Timely. Signed-off-by: Moritz Hoffmann --- container/Cargo.toml | 5 + container/src/columnation.rs | 294 +++++++++++++++++++++++++++++++++++ container/src/lib.rs | 2 + 3 files changed, 301 insertions(+) create mode 100644 container/src/columnation.rs diff --git a/container/Cargo.toml b/container/Cargo.toml index 23d530654..2372746bd 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -4,3 +4,8 @@ version = "0.12.0" edition = "2018" description = "Container abstractions for Timely" license = "MIT" + +[dependencies] +columnation = { git = "https://github.com/antiguru/columnation", branch = "container" } +#columnation = { path = "../../columnation/" } +serde = { version = "1.0"} diff --git a/container/src/columnation.rs b/container/src/columnation.rs new file mode 100644 index 000000000..c86cde89c --- /dev/null +++ b/container/src/columnation.rs @@ -0,0 +1,294 @@ +//! A columnar container based on the columnation library. + +use std::iter::FromIterator; + +pub use columnation::*; + +/// An append-only vector that store records as columns. +/// +/// This container maintains elements that might conventionally own +/// memory allocations, but instead the pointers to those allocations +/// reference larger regions of memory shared with multiple instances +/// of the type. Elements can be retrieved as references, and care is +/// taken when this type is dropped to ensure that the correct memory +/// is returned (rather than the incorrect memory, from running the +/// elements `Drop` implementations). +pub struct TimelyStack { + local: Vec, + inner: T::InnerRegion, +} + +impl TimelyStack { + /// Construct a [TimelyStack], reserving space for `capacity` elements + /// + /// Note that the associated region is not initialized to a specific capacity + /// because we can't generally know how much space would be required. + pub fn with_capacity(capacity: usize) -> Self { + Self { + local: Vec::with_capacity(capacity), + inner: T::InnerRegion::default(), + } + } + + /// Copies an element in to the region. + /// + /// The element can be read by indexing + pub fn copy(&mut self, item: &T) { + // TODO: Some types `T` should just be cloned. + // E.g. types that are `Copy` or vecs of ZSTs. + unsafe { + self.local.push(self.inner.copy(item)); + } + } + /// Empties the collection. + pub fn clear(&mut self) { + unsafe { + // Unsafety justified in that setting the length to zero exposes + // no invalid data. + self.local.set_len(0); + self.inner.clear(); + } + } + /// Retain elements that pass a predicate, from a specified offset. + /// + /// This method may or may not reclaim memory in the inner region. + pub fn retain_from bool>(&mut self, index: usize, mut predicate: P) { + let mut write_position = index; + for position in index..self.local.len() { + if predicate(&self[position]) { + // TODO: compact the inner region and update pointers. + self.local.swap(position, write_position); + write_position += 1; + } + } + unsafe { + // Unsafety justified in that `write_position` is no greater than + // `self.local.len()` and so this exposes no invalid data. + self.local.set_len(write_position); + } + } + + /// Unsafe access to `local` data. The slices stores data that is backed by a region + /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice. + /// + /// Safety: Elements within `local` can be reordered, but not mutated, removed and/or dropped. + pub unsafe fn local(&mut self) -> &mut [T] { + &mut self.local[..] + } +} + +impl TimelyStack<(A, B)> { + /// Copies a destructured tuple `(A, B)` into this column stack. + /// + /// This serves situations where a tuple should be constructed from its constituents but not + /// not all elements are available as owned data. + /// + /// The element can be read by indexing + pub fn copy_destructured(&mut self, t1: &A, t2: &B) { + unsafe { + self.local.push(self.inner.copy_destructured(t1, t2)); + } + } +} + +impl TimelyStack<(A, B, C)> { + /// Copies a destructured tuple `(A, B, C)` into this column stack. + /// + /// This serves situations where a tuple should be constructed from its constituents but not + /// not all elements are available as owned data. + /// + /// The element can be read by indexing + pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) { + unsafe { + self.local.push(self.inner.copy_destructured(r0, r1, r2)); + } + } +} + +impl std::ops::Deref for TimelyStack { + type Target = [T]; + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.local[..] + } +} + +impl Drop for TimelyStack { + fn drop(&mut self) { + self.clear(); + } +} + +impl Default for TimelyStack { + fn default() -> Self { + Self { + local: Vec::new(), + inner: T::InnerRegion::default(), + } + } +} + +impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack { + fn from_iter>(iter: T) -> Self { + let mut iter = iter.into_iter(); + let mut c = TimelyStack::::with_capacity(iter.size_hint().0); + while let Some(element) = iter.next() { + c.copy(element); + } + + c + } +} + +impl PartialEq for TimelyStack { + fn eq(&self, other: &Self) -> bool { + PartialEq::eq(&self[..], &other[..]) + } +} + +impl Eq for TimelyStack {} + +impl std::fmt::Debug for TimelyStack { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + (&self[..]).fmt(f) + } +} + +impl Clone for TimelyStack { + fn clone(&self) -> Self { + let mut new: Self = Default::default(); + for item in &self[..] { + new.copy(item); + } + new + } + + fn clone_from(&mut self, source: &Self) { + self.clear(); + for item in &source[..] { + self.copy(item); + } + } +} + +mod serde { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + use crate::columnation::{Columnation, TimelyStack}; + + impl Serialize for TimelyStack { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeSeq; + let mut seq = serializer.serialize_seq(Some(self.local.len()))?; + for element in &self[..] { + seq.serialize_element(element)?; + } + seq.end() + } + } + + impl<'a, T: Columnation + Deserialize<'a>> Deserialize<'a> for TimelyStack { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'a>, + { + use serde::de::{SeqAccess, Visitor}; + use std::fmt; + use std::marker::PhantomData; + struct TimelyStackVisitor { + marker: PhantomData, + } + + impl<'de, T: Columnation> Visitor<'de> for TimelyStackVisitor + where + T: Deserialize<'de>, + { + type Value = TimelyStack; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a sequence") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let local = Vec::with_capacity( + seq.size_hint() + .unwrap_or(crate::buffer::default_capacity::()), + ); + let mut stack = TimelyStack { + local, + inner: T::InnerRegion::default(), + }; + + while let Some(value) = seq.next_element()? { + stack.copy(&value); + } + + Ok(stack) + } + } + + let visitor = TimelyStackVisitor { + marker: PhantomData, + }; + deserializer.deserialize_seq(visitor) + } + } +} + +mod container { + use crate::{Container, PushPartitioned}; + + use crate::columnation::{Columnation, TimelyStack}; + + impl Container for TimelyStack { + type Item = T; + + fn len(&self) -> usize { + self.local.len() + } + + fn is_empty(&self) -> bool { + self.local.is_empty() + } + + fn capacity(&self) -> usize { + self.local.capacity() + } + + fn clear(&mut self) { + TimelyStack::clear(self) + } + } + + impl PushPartitioned for TimelyStack { + fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + where + I: FnMut(&Self::Item) -> usize, + F: FnMut(usize, &mut Self), + { + fn ensure_capacity(this: &mut TimelyStack) { + let capacity = this.local.capacity(); + let desired_capacity = crate::buffer::default_capacity::(); + if capacity < desired_capacity { + this.local.reserve(desired_capacity - capacity); + } + } + + for datum in &self[..] { + let index = index(&datum); + ensure_capacity(&mut buffers[index]); + buffers[index].copy(datum); + if buffers[index].len() == buffers[index].local.capacity() { + flush(index, &mut buffers[index]); + } + } + self.clear(); + } + } +} diff --git a/container/src/lib.rs b/container/src/lib.rs index 6e465ba8d..a536faf55 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -2,6 +2,8 @@ #![forbid(missing_docs)] +pub mod columnation; + /// A container transferring data through dataflow edges /// /// A container stores a number of elements and thus is able to describe it length (`len()`) and