Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ license = "MIT"

[dependencies]
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.1"
flatcontainer = "0.3"
serde = { version = "1.0"}
99 changes: 87 additions & 12 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl<T: Columnation> TimelyStack<T> {
/// The argument `items` may be cloned and iterated multiple times.
/// Please be careful if it contains side effects.
#[inline(always)]
pub fn reserve_items<'a, I>(&'a mut self, items: I)
pub fn reserve_items<'a, I>(&mut self, items: I)
where
I: Iterator<Item= &'a T>+Clone,
T: 'a,
{
self.local.reserve(items.clone().count());
self.inner.reserve_items(items);
Expand Down Expand Up @@ -240,24 +241,25 @@ impl<T: Columnation> Clone for TimelyStack<T> {
}
}

impl<T: Columnation> PushInto<TimelyStack<T>> for T {
impl<T: Columnation> PushInto<T> for TimelyStack<T> {
#[inline]
fn push_into(self, target: &mut TimelyStack<T>) {
target.copy(&self);
fn push_into(&mut self, item: T) {
self.copy(&item);
}
}

impl<T: Columnation> PushInto<TimelyStack<T>> for &T {
impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
#[inline]
fn push_into(self, target: &mut TimelyStack<T>) {
target.copy(self);
fn push_into(&mut self, item: &T) {
self.copy(item);
}
}

impl<T: Columnation> PushInto<TimelyStack<T>> for &&T {

impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
#[inline]
fn push_into(self, target: &mut TimelyStack<T>) {
target.copy(self);
fn push_into(&mut self, item: &&T) {
self.copy(*item);
}
}

Expand Down Expand Up @@ -333,7 +335,7 @@ mod serde {

mod container {
use std::ops::Deref;
use crate::{Container, PushContainer};
use crate::{Container, SizableContainer};

use crate::columnation::{Columnation, TimelyStack};

Expand Down Expand Up @@ -366,7 +368,7 @@ mod container {
}
}

impl<T: Columnation + 'static> PushContainer for TimelyStack<T> {
impl<T: Columnation + 'static> SizableContainer for TimelyStack<T> {
fn capacity(&self) -> usize {
self.capacity()
}
Expand All @@ -380,3 +382,76 @@ mod container {
}
}
}

mod flatcontainer {
//! A bare-bones flatcontainer region implementation for [`TimelyStack`].

use columnation::Columnation;
use flatcontainer::{Push, Region, ReserveItems};
use crate::columnation::TimelyStack;

#[derive(Debug, Clone)]
struct ColumnationRegion<T: Columnation> {
inner: TimelyStack<T>,
}

impl<T: Columnation> Default for ColumnationRegion<T> {
fn default() -> Self {
Self { inner: Default::default() }
}
}

impl<T: Columnation> Region for ColumnationRegion<T> {
type ReadItem<'a> = &'a T where Self: 'a;
type Index = usize;

fn merge_regions<'a>(regions: impl Iterator<Item=&'a Self> + Clone) -> Self where Self: 'a {
let mut inner = TimelyStack::default();
inner.reserve_regions(regions.map(|r| &r.inner));
Self { inner}
}

fn index(&self, index: Self::Index) -> Self::ReadItem<'_> {
&self.inner[index]
}

fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
self.inner.reserve_regions(regions.map(|r| &r.inner));
}

fn clear(&mut self) {
self.inner.clear();
}

fn heap_size<F: FnMut(usize, usize)>(&self, callback: F) {
self.inner.heap_size(callback);
}
}

impl<T: Columnation> Push<T> for ColumnationRegion<T> {
fn push(&mut self, item: T) -> Self::Index {
self.inner.copy(&item);
self.inner.len() - 1
}
}

impl<T: Columnation> Push<&T> for ColumnationRegion<T> {
fn push(&mut self, item: &T) -> Self::Index {
self.inner.copy(item);
self.inner.len() - 1
}
}

impl<T: Columnation> Push<&&T> for ColumnationRegion<T> {
fn push(&mut self, item: &&T) -> Self::Index {
self.inner.copy(*item);
self.inner.len() - 1
}
}

impl<'a, T: Columnation + 'a> ReserveItems<&'a T> for ColumnationRegion<T> {
fn reserve_items<I>(&mut self, items: I) where I: Iterator<Item=&'a T> + Clone {
self.inner.reserve_items(items);
}
}
}
10 changes: 5 additions & 5 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Present a [`FlatStack`] as a timely container.

pub use flatcontainer::*;
use crate::{buffer, Container, PushContainer, PushInto};
use crate::{buffer, Container, SizableContainer, PushInto};

impl<R: Region + Clone + 'static> Container for FlatStack<R> {
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
Expand All @@ -28,7 +28,7 @@ impl<R: Region + Clone + 'static> Container for FlatStack<R> {
}
}

impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
impl<R: Region + Clone + 'static> SizableContainer for FlatStack<R> {
fn capacity(&self) -> usize {
self.capacity()
}
Expand All @@ -42,9 +42,9 @@ impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
}
}

impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
impl<R: Region + Push<T> + Clone + 'static, T> PushInto<T> for FlatStack<R> {
#[inline]
fn push_into(self, target: &mut FlatStack<R>) {
target.copy(self);
fn push_into(&mut self, item: T) {
self.copy(item);
}
}
74 changes: 40 additions & 34 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ pub trait Container: Default + Clone + 'static {
/// The type of elements when draining the continer.
type Item<'a> where Self: 'a;

/// Push `item` into self
#[inline]
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}

/// The number of elements in this container
///
/// The length of a container must be consistent between sending and receiving it.
Expand Down Expand Up @@ -57,26 +63,10 @@ pub trait Container: Default + Clone + 'static {
fn drain(&mut self) -> Self::DrainIter<'_>;
}

/// A type that can push itself into a container.
pub trait PushInto<C> {
/// Push self into the target container.
fn push_into(self, target: &mut C);
}

/// A type that has the necessary infrastructure to push elements, without specifying how pushing
/// itself works. For this, pushable types should implement [`PushInto`].
pub trait PushContainer: Container {
/// Push `item` into self
#[inline]
fn push<T: PushInto<Self>>(&mut self, item: T) {
item.push_into(self)
}
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
/// A container that can absorb items of a specific type.
pub trait PushInto<T> {
/// Push item into self.
fn push_into(&mut self, item: T);
}

/// A type that can build containers from items.
Expand All @@ -99,7 +89,12 @@ pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container;
/// Add an item to a container.
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where Self::Container: PushContainer;
///
/// The restriction to [`SizeableContainer`] only exists so that types
/// relying on [`CapacityContainerBuilder`] only need to constrain their container
/// to [`Container`] instead of [`SizableContainer`], which otherwise would be a pervasive
/// requirement.
fn push<T>(&mut self, item: T) where Self::Container: SizableContainer + PushInto<T>;
/// Push a pre-built container.
fn push_container(&mut self, container: &mut Self::Container);
/// Extract assembled containers, potentially leaving unfinished data behind.
Expand All @@ -121,11 +116,21 @@ pub struct CapacityContainerBuilder<C>{
pending: VecDeque<C>,
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
}

impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
fn push<T: PushInto<Self::Container>>(&mut self, item: T) where C: PushContainer {
fn push<T>(&mut self, item: T) where C: SizableContainer + PushInto<T> {
if self.current.capacity() == 0 {
self.current = self.empty.take().unwrap_or_default();
// Discard any non-uniform capacity container.
Expand Down Expand Up @@ -212,7 +217,7 @@ impl<T: Clone + 'static> Container for Vec<T> {
}
}

impl<T: Clone + 'static> PushContainer for Vec<T> {
impl<T: Clone + 'static> SizableContainer for Vec<T> {
fn capacity(&self) -> usize {
self.capacity()
}
Expand All @@ -226,24 +231,25 @@ impl<T: Clone + 'static> PushContainer for Vec<T> {
}
}

impl<T> PushInto<Vec<T>> for T {
impl<T> PushInto<T> for Vec<T> {
#[inline]
fn push_into(self, target: &mut Vec<T>) {
target.push(self)
fn push_into(&mut self, item: T) {
self.push(item)
}
}

impl<T: Clone> PushInto<Vec<T>> for &T {

impl<T: Clone> PushInto<&T> for Vec<T> {
#[inline]
fn push_into(self, target: &mut Vec<T>) {
target.push(self.clone())
fn push_into(&mut self, item: &T) {
self.push(item.clone())
}
}

impl<T: Clone> PushInto<Vec<T>> for &&T {
impl<T: Clone> PushInto<&&T> for Vec<T> {
#[inline]
fn push_into(self, target: &mut Vec<T>) {
(*self).push_into(target);
fn push_into(&mut self, item: &&T) {
self.push_into(*item)
}
}

Expand Down Expand Up @@ -330,7 +336,7 @@ mod arc {
}

/// A container that can partition itself into pieces.
pub trait PushPartitioned: PushContainer {
pub trait PushPartitioned: SizableContainer {
/// Partition and push this container.
///
/// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
Expand All @@ -341,7 +347,7 @@ pub trait PushPartitioned: PushContainer {
F: FnMut(usize, &mut Self);
}

impl<T: PushContainer + 'static> PushPartitioned for T where for<'a> T::Item<'a>: PushInto<T> {
impl<T: SizableContainer> PushPartitioned for T where for<'a> T: PushInto<T::Item<'a>> {
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
for<'a> I: FnMut(&Self::Item<'a>) -> usize,
Expand Down
25 changes: 12 additions & 13 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! with the performance of batched sends.

use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushContainer, PushInto};
use crate::container::{ContainerBuilder, CapacityContainerBuilder, SizableContainer, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
Expand Down Expand Up @@ -114,11 +114,11 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P>
where
T: Eq+Clone,
CB::Container: PushContainer,
CB::Container: SizableContainer,
{
// Push a single item into the builder. Internal method for use by `Session`.
#[inline]
fn give<D: PushInto<CB::Container>>(&mut self, data: D) {
fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
self.builder.push(data);
self.extract();
}
Expand Down Expand Up @@ -155,20 +155,20 @@ impl<'a, T, CB, P: Push<Bundle<T, CB::Container>>+'a> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
CB::Container: PushContainer,
CB::Container: SizableContainer,
{
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give<D: PushInto<CB::Container>>(&mut self, data: D) {
pub fn give<D>(&mut self, data: D) where CB::Container: PushInto<D> {
self.buffer.give(data);
}

/// Provides an iterator of records at the time specified by the `Session`.
#[inline]
pub fn give_iterator<I, D>(&mut self, iter: I)
pub fn give_iterator<I>(&mut self, iter: I)
where
I: Iterator<Item=D>,
D: PushInto<CB::Container>,
I: Iterator,
CB::Container: PushInto<I::Item>,
{
for item in iter {
self.give(item);
Expand Down Expand Up @@ -197,16 +197,15 @@ where
{
/// Transmits a single record.
#[inline]
pub fn give<D: PushInto<CB::Container>>(&mut self, data: D) where CB::Container: PushContainer {
pub fn give<D>(&mut self, data: D) where CB::Container: SizableContainer + PushInto<D> {
self.buffer.give(data);
}
/// Transmits records produced by an iterator.
#[inline]
pub fn give_iterator<I, D>(&mut self, iter: I)
where
I: Iterator<Item=D>,
D: PushInto<CB::Container>,
CB::Container: PushContainer,
where
I: Iterator<Item=D>,
CB::Container: SizableContainer + PushInto<D>,
{
for item in iter {
self.give(item);
Expand Down
Loading