Skip to content

Commit 7e313ee

Browse files
authored
exchange: remove one layer of boxing (#456)
* exchange: remove one layer of boxing We were previously forced to use a boxed trait object because we were unable to name the type of the constructed closure. This patch fixes it by introducing a custom `ParallelizationHasher` trait that the `ParallelizationContractCore` expects with a blanket implementation for all closures taking two arguments. This is to maintain backwards compatibility with any code that passes a closure directly to `ExchangeCore::new`. Then a wrapper type `DataHasher<F>` is introduced that allows us to both specialize the `ParallelizationHasher` implementation for single argument closures and at the same time name the type which is what we need to remove one layer of boxed trait objects. Signed-off-by: Petros Angelatos <[email protected]> * exchange: disallow exchanging based on time Signed-off-by: Petros Angelatos <[email protected]>
1 parent dd6187c commit 7e313ee

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,13 @@ impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationC
7373
where
7474
C: Data + Container + PushPartitioned<Item=D>,
7575
{
76-
// TODO: The closure in the type prevents us from naming it.
77-
// Could specialize `ExchangePusher` to a time-free version.
78-
type Pusher = Box<dyn Push<BundleCore<T, C>>>;
79-
type Puller = Box<dyn Pull<BundleCore<T, C>>>;
80-
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
76+
type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
77+
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
78+
79+
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
8180
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
8281
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
83-
(Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
82+
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
8483
}
8584
}
8685

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use crate::dataflow::channels::{BundleCore, Message};
77

88
// TODO : Software write combining
99
/// Distributes records among target pushees according to a distribution function.
10-
pub struct Exchange<T, C: Container, D, P: Push<BundleCore<T, C>>, H: FnMut(&T, &D) -> u64> {
10+
pub struct Exchange<T, C: Container, D, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> {
1111
pushers: Vec<P>,
1212
buffers: Vec<C>,
1313
current: Option<T>,
1414
hash_func: H,
1515
phantom: std::marker::PhantomData<D>,
1616
}
1717

18-
impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&T, &D)->u64> Exchange<T, C, D, P, H> {
18+
impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Exchange<T, C, D, P, H> {
1919
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
2020
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, C, D, P, H> {
2121
let mut buffers = vec![];
@@ -40,7 +40,7 @@ impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&T, &D
4040
}
4141
}
4242

43-
impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&T, &D)->u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
43+
impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
4444
where
4545
C: PushPartitioned<Item=D>
4646
{
@@ -72,7 +72,7 @@ where
7272
let pushers = &mut self.pushers;
7373
data.push_partitioned(
7474
&mut self.buffers,
75-
move |datum| ((hash_func)(time, datum) & mask) as usize,
75+
move |datum| ((hash_func)(datum) & mask) as usize,
7676
|index, buffer| {
7777
Message::push_at(buffer, time.clone(), &mut pushers[index]);
7878
}
@@ -84,7 +84,7 @@ where
8484
let pushers = &mut self.pushers;
8585
data.push_partitioned(
8686
&mut self.buffers,
87-
move |datum| ((hash_func)(time, datum) % num_pushers) as usize,
87+
move |datum| ((hash_func)(datum) % num_pushers) as usize,
8888
|index, buffer| {
8989
Message::push_at(buffer, time.clone(), &mut pushers[index]);
9090
}

0 commit comments

Comments
 (0)