diff --git a/container/Cargo.toml b/container/Cargo.toml index 06abc9123..0897b19da 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -7,5 +7,5 @@ license = "MIT" [dependencies] columnation = { git = "https://github.com/frankmcsherry/columnation" } -flatcontainer = "0.1" +flatcontainer = "0.3" serde = { version = "1.0"} diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 116a17ad4..7fe1ffe83 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -37,9 +37,10 @@ impl TimelyStack { /// The argument `items` may be cloned and iterated multiple times. /// Please be careful if it contains side effects. #[inline(always)] - pub fn reserve_items<'a, I>(&'a mut self, items: I) + pub fn reserve_items<'a, I>(&mut self, items: I) where I: Iterator+Clone, + T: 'a, { self.local.reserve(items.clone().count()); self.inner.reserve_items(items); @@ -240,24 +241,25 @@ impl Clone for TimelyStack { } } -impl PushInto> for T { +impl PushInto for TimelyStack { #[inline] - fn push_into(self, target: &mut TimelyStack) { - target.copy(&self); + fn push_into(&mut self, item: T) { + self.copy(&item); } } -impl PushInto> for &T { +impl PushInto<&T> for TimelyStack { #[inline] - fn push_into(self, target: &mut TimelyStack) { - target.copy(self); + fn push_into(&mut self, item: &T) { + self.copy(item); } } -impl PushInto> for &&T { + +impl PushInto<&&T> for TimelyStack { #[inline] - fn push_into(self, target: &mut TimelyStack) { - target.copy(self); + fn push_into(&mut self, item: &&T) { + self.copy(*item); } } @@ -333,7 +335,7 @@ mod serde { mod container { use std::ops::Deref; - use crate::{Container, PushContainer}; + use crate::{Container, SizableContainer}; use crate::columnation::{Columnation, TimelyStack}; @@ -366,7 +368,7 @@ mod container { } } - impl PushContainer for TimelyStack { + impl SizableContainer for TimelyStack { fn capacity(&self) -> usize { self.capacity() } @@ -380,3 +382,76 @@ mod container { } } } + +mod flatcontainer { + //! A bare-bones flatcontainer region implementation for [`TimelyStack`]. + + use columnation::Columnation; + use flatcontainer::{Push, Region, ReserveItems}; + use crate::columnation::TimelyStack; + + #[derive(Debug, Clone)] + struct ColumnationRegion { + inner: TimelyStack, + } + + impl Default for ColumnationRegion { + fn default() -> Self { + Self { inner: Default::default() } + } + } + + impl Region for ColumnationRegion { + type ReadItem<'a> = &'a T where Self: 'a; + type Index = usize; + + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self where Self: 'a { + let mut inner = TimelyStack::default(); + inner.reserve_regions(regions.map(|r| &r.inner)); + Self { inner} + } + + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + &self.inner[index] + } + + fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator + Clone { + self.inner.reserve_regions(regions.map(|r| &r.inner)); + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn heap_size(&self, callback: F) { + self.inner.heap_size(callback); + } + } + + impl Push for ColumnationRegion { + fn push(&mut self, item: T) -> Self::Index { + self.inner.copy(&item); + self.inner.len() - 1 + } + } + + impl Push<&T> for ColumnationRegion { + fn push(&mut self, item: &T) -> Self::Index { + self.inner.copy(item); + self.inner.len() - 1 + } + } + + impl Push<&&T> for ColumnationRegion { + fn push(&mut self, item: &&T) -> Self::Index { + self.inner.copy(*item); + self.inner.len() - 1 + } + } + + impl<'a, T: Columnation + 'a> ReserveItems<&'a T> for ColumnationRegion { + fn reserve_items(&mut self, items: I) where I: Iterator + Clone { + self.inner.reserve_items(items); + } + } +} diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs index 026eef5b9..d2c3a6039 100644 --- a/container/src/flatcontainer.rs +++ b/container/src/flatcontainer.rs @@ -1,7 +1,7 @@ //! Present a [`FlatStack`] as a timely container. pub use flatcontainer::*; -use crate::{buffer, Container, PushContainer, PushInto}; +use crate::{buffer, Container, SizableContainer, PushInto}; impl Container for FlatStack { type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; @@ -28,7 +28,7 @@ impl Container for FlatStack { } } -impl PushContainer for FlatStack { +impl SizableContainer for FlatStack { fn capacity(&self) -> usize { self.capacity() } @@ -42,9 +42,9 @@ impl PushContainer for FlatStack { } } -impl> PushInto> for T { +impl + Clone + 'static, T> PushInto for FlatStack { #[inline] - fn push_into(self, target: &mut FlatStack) { - target.copy(self); + fn push_into(&mut self, item: T) { + self.copy(item); } } diff --git a/container/src/lib.rs b/container/src/lib.rs index ec97e1d8f..d1cd022ad 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -26,6 +26,12 @@ pub trait Container: Default + Clone + 'static { /// The type of elements when draining the continer. type Item<'a> where Self: 'a; + /// Push `item` into self + #[inline] + fn push(&mut self, item: T) where Self: PushInto { + self.push_into(item) + } + /// The number of elements in this container /// /// The length of a container must be consistent between sending and receiving it. @@ -57,26 +63,10 @@ pub trait Container: Default + Clone + 'static { fn drain(&mut self) -> Self::DrainIter<'_>; } -/// A type that can push itself into a container. -pub trait PushInto { - /// Push self into the target container. - fn push_into(self, target: &mut C); -} - -/// A type that has the necessary infrastructure to push elements, without specifying how pushing -/// itself works. For this, pushable types should implement [`PushInto`]. -pub trait PushContainer: Container { - /// Push `item` into self - #[inline] - fn push>(&mut self, item: T) { - item.push_into(self) - } - /// Return the capacity of the container. - fn capacity(&self) -> usize; - /// Return the preferred capacity of the container. - fn preferred_capacity() -> usize; - /// Reserve space for `additional` elements, possibly increasing the capacity of the container. - fn reserve(&mut self, additional: usize); +/// A container that can absorb items of a specific type. +pub trait PushInto { + /// Push item into self. + fn push_into(&mut self, item: T); } /// A type that can build containers from items. @@ -99,7 +89,12 @@ pub trait ContainerBuilder: Default + 'static { /// The container type we're building. type Container: Container; /// Add an item to a container. - fn push>(&mut self, item: T) where Self::Container: PushContainer; + /// + /// The restriction to [`SizeableContainer`] only exists so that types + /// relying on [`CapacityContainerBuilder`] only need to constrain their container + /// to [`Container`] instead of [`SizableContainer`], which otherwise would be a pervasive + /// requirement. + fn push(&mut self, item: T) where Self::Container: SizableContainer + PushInto; /// Push a pre-built container. fn push_container(&mut self, container: &mut Self::Container); /// Extract assembled containers, potentially leaving unfinished data behind. @@ -121,11 +116,21 @@ pub struct CapacityContainerBuilder{ pending: VecDeque, } +/// A container that can be sized and reveals its capacity. +pub trait SizableContainer: Container { + /// Return the capacity of the container. + fn capacity(&self) -> usize; + /// Return the preferred capacity of the container. + fn preferred_capacity() -> usize; + /// Reserve space for `additional` elements, possibly increasing the capacity of the container. + fn reserve(&mut self, additional: usize); +} + impl ContainerBuilder for CapacityContainerBuilder { type Container = C; #[inline] - fn push>(&mut self, item: T) where C: PushContainer { + fn push(&mut self, item: T) where C: SizableContainer + PushInto { if self.current.capacity() == 0 { self.current = self.empty.take().unwrap_or_default(); // Discard any non-uniform capacity container. @@ -212,7 +217,7 @@ impl Container for Vec { } } -impl PushContainer for Vec { +impl SizableContainer for Vec { fn capacity(&self) -> usize { self.capacity() } @@ -226,24 +231,25 @@ impl PushContainer for Vec { } } -impl PushInto> for T { +impl PushInto for Vec { #[inline] - fn push_into(self, target: &mut Vec) { - target.push(self) + fn push_into(&mut self, item: T) { + self.push(item) } } -impl PushInto> for &T { + +impl PushInto<&T> for Vec { #[inline] - fn push_into(self, target: &mut Vec) { - target.push(self.clone()) + fn push_into(&mut self, item: &T) { + self.push(item.clone()) } } -impl PushInto> for &&T { +impl PushInto<&&T> for Vec { #[inline] - fn push_into(self, target: &mut Vec) { - (*self).push_into(target); + fn push_into(&mut self, item: &&T) { + self.push_into(*item) } } @@ -330,7 +336,7 @@ mod arc { } /// A container that can partition itself into pieces. -pub trait PushPartitioned: PushContainer { +pub trait PushPartitioned: SizableContainer { /// Partition and push this container. /// /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to @@ -341,7 +347,7 @@ pub trait PushPartitioned: PushContainer { F: FnMut(usize, &mut Self); } -impl PushPartitioned for T where for<'a> T::Item<'a>: PushInto { +impl PushPartitioned for T where for<'a> T: PushInto> { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where for<'a> I: FnMut(&Self::Item<'a>) -> usize, diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 507b18d23..ae878358f 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -2,7 +2,7 @@ //! with the performance of batched sends. use crate::communication::Push; -use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushContainer, PushInto}; +use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer, PushInto}; use crate::dataflow::channels::{Bundle, Message}; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; @@ -114,11 +114,11 @@ impl>> Buffer>> Buffer where T: Eq+Clone, - CB::Container: PushContainer, + CB::Container: SizableContainer, { // Push a single item into the builder. Internal method for use by `Session`. #[inline] - fn give>(&mut self, data: D) { + fn give(&mut self, data: D) where CB::Container: PushInto { self.builder.push(data); self.extract(); } @@ -155,20 +155,20 @@ impl<'a, T, CB, P: Push>+'a> Session<'a, T, CB, P> where T: Eq + Clone + 'a, CB: ContainerBuilder + 'a, - CB::Container: PushContainer, + CB::Container: SizableContainer, { /// Provides one record at the time specified by the `Session`. #[inline] - pub fn give>(&mut self, data: D) { + pub fn give(&mut self, data: D) where CB::Container: PushInto { self.buffer.give(data); } /// Provides an iterator of records at the time specified by the `Session`. #[inline] - pub fn give_iterator(&mut self, iter: I) + pub fn give_iterator(&mut self, iter: I) where - I: Iterator, - D: PushInto, + I: Iterator, + CB::Container: PushInto, { for item in iter { self.give(item); @@ -197,16 +197,15 @@ where { /// Transmits a single record. #[inline] - pub fn give>(&mut self, data: D) where CB::Container: PushContainer { + pub fn give(&mut self, data: D) where CB::Container: SizableContainer + PushInto { self.buffer.give(data); } /// Transmits records produced by an iterator. #[inline] pub fn give_iterator(&mut self, iter: I) - where - I: Iterator, - D: PushInto, - CB::Container: PushContainer, + where + I: Iterator, + CB::Container: SizableContainer + PushInto, { for item in iter { self.give(item); diff --git a/timely/src/dataflow/operators/core/capture/extract.rs b/timely/src/dataflow/operators/core/capture/extract.rs index a49620285..fcf024f31 100644 --- a/timely/src/dataflow/operators/core/capture/extract.rs +++ b/timely/src/dataflow/operators/core/capture/extract.rs @@ -1,7 +1,7 @@ //! Traits and types for extracting captured timely dataflow streams. use super::Event; -use crate::{container::{PushContainer, PushInto}}; +use crate::{container::{SizableContainer, PushInto}}; /// Supports extracting a sequence of timestamp and data. pub trait Extract { @@ -48,9 +48,10 @@ pub trait Extract { fn extract(self) -> Vec<(T, C)>; } -impl Extract for ::std::sync::mpsc::Receiver> +impl Extract for ::std::sync::mpsc::Receiver> where - for<'a> C::Item<'a>: PushInto + Ord, + for<'a> C: PushInto>, + for<'a> C::Item<'a>: Ord, { fn extract(self) -> Vec<(T, C)> { let mut staged = std::collections::BTreeMap::new(); diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index ca1ff3385..f14b65320 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,5 +1,5 @@ //! Filters a stream by a predicate. -use crate::container::{Container, PushContainer, PushInto}; +use crate::container::{Container, SizableContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::operator::Operator; @@ -22,9 +22,9 @@ pub trait Filter { fn filter)->bool+'static>(&self, predicate: P) -> Self; } -impl Filter for StreamCore +impl Filter for StreamCore where - for<'a> C::Item<'a>: PushInto + for<'a> C: PushInto> { fn filter)->bool+'static>(&self, mut predicate: P) -> StreamCore { let mut container = Default::default(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index f929d045e..40d42cf19 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::container::{PushContainer, PushInto}; +use crate::container::{SizableContainer, PushInto}; use crate::scheduling::{Schedule, Activator}; @@ -390,7 +390,7 @@ impl Handle { } } -impl Handle { +impl Handle { #[inline] /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. /// @@ -419,7 +419,7 @@ impl Handle { /// } /// }); /// ``` - pub fn send>(&mut self, data: D) { + pub fn send(&mut self, data: D) where C: PushInto { self.buffer1.push(data); if self.buffer1.len() == self.buffer1.capacity() { self.flush(); diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 86ac39ec9..865114e73 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,6 +1,6 @@ //! Extension methods for `StreamCore` based on record-by-record transformation. -use crate::container::{Container, PushContainer, PushInto}; +use crate::container::{Container, SizableContainer, PushInto}; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; @@ -22,9 +22,8 @@ pub trait Map { /// }); /// ``` fn map(&self, mut logic: L) -> StreamCore - where - C2: PushContainer, - D2: PushInto, + where + C2: SizableContainer + PushInto, L: FnMut(C::Item<'_>)->D2 + 'static, { self.flat_map(move |x| std::iter::once(logic(x))) @@ -43,11 +42,10 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_map(&self, logic: L) -> StreamCore - where - C2: PushContainer, + fn flat_map(&self, logic: L) -> StreamCore + where I: IntoIterator, - I::Item: PushInto, + C2: SizableContainer + PushInto, L: FnMut(C::Item<'_>)->I + 'static, ; } @@ -56,11 +54,10 @@ impl Map for StreamCore { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_map(&self, mut logic: L) -> StreamCore - where - C2: PushContainer, + fn flat_map(&self, mut logic: L) -> StreamCore + where I: IntoIterator, - I::Item: PushInto, + C2: SizableContainer + PushInto, L: FnMut(C::Item<'_>)->I + 'static, { let mut container = Default::default(); diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 2a046f4af..97d481901 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,6 +1,6 @@ //! Operators that separate one stream into two streams based on some condition -use crate::container::{Container, PushContainer, PushInto}; +use crate::container::{Container, SizableContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, StreamCore}; @@ -32,10 +32,8 @@ pub trait OkErr { logic: L, ) -> (StreamCore, StreamCore) where - C1: PushContainer, - D1: PushInto, - C2: PushContainer, - D2: PushInto, + C1: SizableContainer + PushInto, + C2: SizableContainer + PushInto, L: FnMut(C::Item<'_>) -> Result+'static ; } @@ -46,10 +44,8 @@ impl OkErr for StreamCore { mut logic: L, ) -> (StreamCore, StreamCore) where - C1: PushContainer, - D1: PushInto, - C2: PushContainer, - D2: PushInto, + C1: SizableContainer + PushInto, + C2: SizableContainer + PushInto, L: FnMut(C::Item<'_>) -> Result+'static { let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index a7d874be4..dcb9aa684 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,6 +1,6 @@ //! Conversion to the `StreamCore` type from iterators. -use crate::container::{PushContainer, PushInto}; +use crate::container::{SizableContainer, PushInto}; use crate::Container; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{StreamCore, Scope}; @@ -26,7 +26,7 @@ pub trait ToStream { fn to_stream(self, scope: &mut S) -> StreamCore; } -impl ToStream for I where I::Item: PushInto { +impl ToStream for I where C: PushInto { fn to_stream(self, scope: &mut S) -> StreamCore { source(scope, "ToStream", |capability, info| {