Skip to content

Commit bbf16a8

Browse files
committed
Introduce FlatContainer, container function for hinting
1 parent b407978 commit bbf16a8

File tree

18 files changed

+218
-8
lines changed

18 files changed

+218
-8
lines changed

container/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ license = "MIT"
77

88
[dependencies]
99
columnation = { git = "https://github.com/frankmcsherry/columnation" }
10+
flatcontainer = "0.1"
1011
serde = { version = "1.0"}

container/src/flatcontainer.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
//! Present a [`FlatStack`] as a timely container.
2+
3+
pub use flatcontainer::*;
4+
use crate::{buffer, Container, PushContainer, PushInto};
5+
6+
impl<R: Region + Clone + 'static> Container for FlatStack<R> {
7+
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
8+
type Item<'a> = R::ReadItem<'a> where Self: 'a;
9+
10+
fn len(&self) -> usize {
11+
self.len()
12+
}
13+
14+
fn clear(&mut self) {
15+
self.clear()
16+
}
17+
18+
type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;
19+
20+
fn iter<'a>(&'a self) -> Self::Iter<'a> {
21+
IntoIterator::into_iter(self)
22+
}
23+
24+
type DrainIter<'a> = Self::Iter<'a>;
25+
26+
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
27+
IntoIterator::into_iter(&*self)
28+
}
29+
}
30+
31+
impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
32+
fn capacity(&self) -> usize {
33+
self.capacity()
34+
}
35+
36+
fn preferred_capacity() -> usize {
37+
buffer::default_capacity::<R::Index>()
38+
}
39+
40+
fn reserve(&mut self, additional: usize) {
41+
self.reserve(additional);
42+
}
43+
}
44+
45+
impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
46+
#[inline]
47+
fn push_into(self, target: &mut FlatStack<R>) {
48+
target.copy(self);
49+
}
50+
}

container/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#![forbid(missing_docs)]
44

55
pub mod columnation;
6+
pub mod flatcontainer;
67

78
/// A container transferring data through dataflow edges
89
///

timely/examples/distinct.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ fn main() {
3535
}
3636
})
3737
})
38+
.container::<Vec<_>>()
3839
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
3940
.probe_with(&mut probe);
4041
});

timely/examples/flatcontainer.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//! Wordcount based on flatcontainer.
2+
3+
#[cfg(feature = "bincode")]
4+
use {
5+
std::collections::HashMap,
6+
timely::container::flatcontainer::{Containerized, FlatStack},
7+
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
8+
timely::dataflow::operators::core::InputHandle,
9+
timely::dataflow::operators::{Inspect, Operator, Probe},
10+
timely::dataflow::ProbeHandle,
11+
};
12+
13+
#[cfg(feature = "bincode")]
14+
fn main() {
15+
// initializes and runs a timely dataflow.
16+
timely::execute_from_args(std::env::args(), |worker| {
17+
let mut input =
18+
<InputHandle<_, FlatStack<<(String, i64) as Containerized>::Region>>>::new();
19+
let mut probe = ProbeHandle::new();
20+
21+
// create a new input, exchange data, and inspect its output
22+
worker.dataflow::<usize, _, _>(|scope| {
23+
input
24+
.to_stream(scope)
25+
.unary::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
26+
Pipeline,
27+
"Split",
28+
|_cap, _info| {
29+
move |input, output| {
30+
while let Some((time, data)) = input.next() {
31+
let mut session = output.session(&time);
32+
for (text, diff) in data.iter().flat_map(|(text, diff)| {
33+
text.split_whitespace().map(move |s| (s, diff))
34+
}) {
35+
session.give((text, diff));
36+
}
37+
}
38+
}
39+
},
40+
)
41+
.unary_frontier::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
42+
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
43+
"WordCount",
44+
|_capability, _info| {
45+
let mut queues = HashMap::new();
46+
let mut counts = HashMap::new();
47+
48+
move |input, output| {
49+
while let Some((time, data)) = input.next() {
50+
queues
51+
.entry(time.retain())
52+
.or_insert(Vec::new())
53+
.push(data.take());
54+
}
55+
56+
for (key, val) in queues.iter_mut() {
57+
if !input.frontier().less_equal(key.time()) {
58+
let mut session = output.session(key);
59+
for batch in val.drain(..) {
60+
for (word, diff) in batch.iter() {
61+
let entry =
62+
counts.entry(word.to_string()).or_insert(0i64);
63+
*entry += diff;
64+
session.give((word, *entry));
65+
}
66+
}
67+
}
68+
}
69+
70+
queues.retain(|_key, val| !val.is_empty());
71+
}
72+
},
73+
)
74+
.inspect(|x| println!("seen: {:?}", x))
75+
.probe_with(&mut probe);
76+
});
77+
78+
// introduce data and watch!
79+
for round in 0..10 {
80+
input.send(("flat container", 1));
81+
input.advance_to(round + 1);
82+
while probe.less_than(input.time()) {
83+
worker.step();
84+
}
85+
}
86+
})
87+
.unwrap();
88+
}
89+
90+
#[cfg(not(feature = "bincode"))]
91+
fn main() {
92+
eprintln!("Example requires feature bincode.");
93+
}

timely/examples/hashjoin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ fn main() {
7676
});
7777
}
7878
})
79+
.container::<Vec<_>>()
7980
.probe_with(&mut probe);
8081
});
8182

timely/examples/simple.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use timely::dataflow::operators::*;
55
fn main() {
66
timely::example(|scope| {
77
(0..10).to_stream(scope)
8+
.container::<Vec<_>>()
89
.inspect(|x| println!("seen: {:?}", x));
910
});
1011
}

timely/examples/wordcount.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ fn main() {
2424
.map(move |word| (word.to_owned(), diff))
2525
.collect::<Vec<_>>()
2626
)
27+
.container::<Vec<_>>()
2728
.unary_frontier(exchange, "WordCount", |_capability, _info| {
2829

2930
let mut queues = HashMap::new();
@@ -51,6 +52,7 @@ fn main() {
5152

5253
queues.retain(|_key, val| !val.is_empty());
5354
}})
55+
.container::<Vec<_>>()
5456
.inspect(|x| println!("seen: {:?}", x))
5557
.probe_with(&mut probe);
5658
});

timely/src/dataflow/operators/capability.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,8 @@ impl<T: Timestamp> CapabilitySet<T> {
426426
/// output.session(a_cap).give(());
427427
/// }
428428
/// }
429-
/// });
429+
/// })
430+
/// .container::<Vec<_>>();
430431
/// });
431432
/// ```
432433
pub fn from_elem(cap: Capability<T>) -> Self {

timely/src/dataflow/operators/core/input.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ pub trait Input : Scope {
8181
/// let mut input = Handle::new();
8282
/// worker.dataflow(|scope| {
8383
/// scope.input_from(&mut input)
84+
/// .container::<Vec<_>>()
8485
/// .inspect(|x| println!("hello {:?}", x));
8586
/// });
8687
///
@@ -198,6 +199,7 @@ impl<T: Timestamp, C: Container> Handle<T, C> {
198199
/// let mut input = Handle::new();
199200
/// worker.dataflow(|scope| {
200201
/// scope.input_from(&mut input)
202+
/// .container::<Vec<_>>()
201203
/// .inspect(|x| println!("hello {:?}", x));
202204
/// });
203205
///
@@ -235,6 +237,7 @@ impl<T: Timestamp, C: Container> Handle<T, C> {
235237
/// let mut input = Handle::new();
236238
/// worker.dataflow(|scope| {
237239
/// input.to_stream(scope)
240+
/// .container::<Vec<_>>()
238241
/// .inspect(|x| println!("hello {:?}", x));
239242
/// });
240243
///
@@ -404,6 +407,7 @@ impl<T: Timestamp, C: PushContainer> Handle<T, C> {
404407
/// let mut input = Handle::new();
405408
/// worker.dataflow(|scope| {
406409
/// scope.input_from(&mut input)
410+
/// .container::<Vec<_>>()
407411
/// .inspect(|x| println!("hello {:?}", x));
408412
/// });
409413
///

0 commit comments

Comments
 (0)