Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,54 +32,54 @@ impl Generic {
/// The index of the worker out of `(0..self.peers())`.
pub fn index(&self) -> usize {
match self {
&Generic::Thread(ref t) => t.index(),
&Generic::Process(ref p) => p.index(),
&Generic::ProcessBinary(ref pb) => pb.index(),
&Generic::ZeroCopy(ref z) => z.index(),
Generic::Thread(t) => t.index(),
Generic::Process(p) => p.index(),
Generic::ProcessBinary(pb) => pb.index(),
Generic::ZeroCopy(z) => z.index(),
}
}
/// The number of workers.
pub fn peers(&self) -> usize {
match self {
&Generic::Thread(ref t) => t.peers(),
&Generic::Process(ref p) => p.peers(),
&Generic::ProcessBinary(ref pb) => pb.peers(),
&Generic::ZeroCopy(ref z) => z.peers(),
Generic::Thread(t) => t.peers(),
Generic::Process(p) => p.peers(),
Generic::ProcessBinary(pb) => pb.peers(),
Generic::ZeroCopy(z) => z.peers(),
}
}
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
match self {
&mut Generic::Thread(ref mut t) => t.allocate(identifier),
&mut Generic::Process(ref mut p) => p.allocate(identifier),
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(identifier),
&mut Generic::ZeroCopy(ref mut z) => z.allocate(identifier),
Generic::Thread(t) => t.allocate(identifier),
Generic::Process(p) => p.allocate(identifier),
Generic::ProcessBinary(pb) => pb.allocate(identifier),
Generic::ZeroCopy(z) => z.allocate(identifier),
}
}
/// Perform work before scheduling operators.
fn receive(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.receive(),
&mut Generic::Process(ref mut p) => p.receive(),
&mut Generic::ProcessBinary(ref mut pb) => pb.receive(),
&mut Generic::ZeroCopy(ref mut z) => z.receive(),
Generic::Thread(t) => t.receive(),
Generic::Process(p) => p.receive(),
Generic::ProcessBinary(pb) => pb.receive(),
Generic::ZeroCopy(z) => z.receive(),
}
}
/// Perform work after scheduling operators.
pub fn release(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.release(),
&mut Generic::Process(ref mut p) => p.release(),
&mut Generic::ProcessBinary(ref mut pb) => pb.release(),
&mut Generic::ZeroCopy(ref mut z) => z.release(),
Generic::Thread(t) => t.release(),
Generic::Process(p) => p.release(),
Generic::ProcessBinary(pb) => pb.release(),
Generic::ZeroCopy(z) => z.release(),
}
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
match self {
&Generic::Thread(ref t) => t.events(),
&Generic::Process(ref p) => p.events(),
&Generic::ProcessBinary(ref pb) => pb.events(),
&Generic::ZeroCopy(ref z) => z.events(),
Generic::Thread(ref t) => t.events(),
Generic::Process(ref p) => p.events(),
Generic::ProcessBinary(ref pb) => pb.events(),
Generic::ZeroCopy(ref z) => z.events(),
}
}
}
Expand All @@ -96,10 +96,10 @@ impl Allocate for Generic {
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
&Generic::Thread(ref t) => t.await_events(_duration),
&Generic::Process(ref p) => p.await_events(_duration),
&Generic::ProcessBinary(ref pb) => pb.await_events(_duration),
&Generic::ZeroCopy(ref z) => z.await_events(_duration),
Generic::Thread(t) => t.await_events(_duration),
Generic::Process(p) => p.await_events(_duration),
Generic::ProcessBinary(pb) => pb.await_events(_duration),
Generic::ZeroCopy(z) => z.await_events(_duration),
}
}
}
Expand Down
26 changes: 13 additions & 13 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl AllocateBuilder for ProcessBuilder {
let buzzer = Buzzer::new();
worker.send(buzzer).expect("Failed to send buzzer");
}
let mut buzzers = Vec::new();
let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
for worker in self.buzzers_recv.iter() {
buzzers.push(worker.recv().expect("Failed to recv buzzer"));
}
Expand Down Expand Up @@ -69,19 +69,19 @@ pub struct Process {

impl Process {
/// Access the wrapped inner allocator.
pub fn inner<'a>(&'a mut self) -> &'a mut Thread { &mut self.inner }
pub fn inner(&mut self) -> &mut Thread { &mut self.inner }
/// Allocate a list of connected intra-process allocators.
pub fn new_vector(peers: usize) -> Vec<ProcessBuilder> {

let mut counters_send = Vec::new();
let mut counters_recv = Vec::new();
let mut counters_send = Vec::with_capacity(peers);
let mut counters_recv = Vec::with_capacity(peers);
for _ in 0 .. peers {
let (send, recv) = crossbeam_channel::unbounded();
counters_send.push(send);
counters_recv.push(recv);
}

let channels = Arc::new(Mutex::new(HashMap::new()));
let channels = Arc::new(Mutex::new(HashMap::with_capacity(peers)));

// Allocate matrix of buzzer send and recv endpoints.
let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);
Expand Down Expand Up @@ -116,23 +116,23 @@ impl Allocate for Process {
// first worker that enters this critical section

// ensure exclusive access to shared list of channels
let mut channels = self.channels.lock().ok().expect("mutex error?");
let mut channels = self.channels.lock().expect("mutex error?");

let (sends, recv, empty) = {

// we may need to alloc a new channel ...
let entry = channels.entry(identifier).or_insert_with(|| {

let mut pushers = Vec::new();
let mut pullers = Vec::new();
for index in 0 .. self.peers {
let mut pushers = Vec::with_capacity(self.peers);
let mut pullers = Vec::with_capacity(self.peers);
for buzzer in self.buzzers.iter() {
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = crossbeam_channel::unbounded();
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
pushers.push((Pusher { target: s }, self.buzzers[index].clone()));
pushers.push((Pusher { target: s }, buzzer.clone()));
pullers.push(Puller { source: r, current: None });
}

let mut to_box = Vec::new();
let mut to_box = Vec::with_capacity(pullers.len());
for recv in pullers.into_iter() {
to_box.push(Some((pushers.clone(), recv)));
}
Expand Down Expand Up @@ -164,8 +164,8 @@ impl Allocate for Process {

let sends =
sends.into_iter()
.enumerate()
.map(|(i,(s,b))| CountPusher::new(s, identifier, self.counters_send[i].clone(), b))
.zip(self.counters_send.iter())
.map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
.map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>)
.collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Thread {
let pusher = Pusher { target: shared.clone() };
let pusher = CountPusher::new(pusher, identifier, events.clone());
let puller = Puller { source: shared, current: None };
let puller = CountPuller::new(puller, identifier, events.clone());
let puller = CountPuller::new(puller, identifier, events);
(pusher, puller)
}
}
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl Allocate for ProcessAllocator {
}
self.channel_id_bound = Some(identifier);

let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());

for target_index in 0 .. self.peers() {

Expand Down
92 changes: 43 additions & 49 deletions communication/src/allocator/zero_copy/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,58 +75,52 @@ pub fn initialize_networking_from_sockets(
let mut promises_iter = promises.into_iter();
let mut futures_iter = futures.into_iter();

let mut send_guards = Vec::new();
let mut recv_guards = Vec::new();
let mut send_guards = Vec::with_capacity(sockets.len());
let mut recv_guards = Vec::with_capacity(sockets.len());

// for each process, if a stream exists (i.e. not local) ...
for index in 0..sockets.len() {

if let Some(stream) = sockets[index].take() {
// remote process

let remote_recv = promises_iter.next().unwrap();

{
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:send-{}", index))
.spawn(move || {

let logger = log_sender(CommunicationSetup {
process: my_index,
sender: true,
remote: Some(index),
});

send_loop(stream, remote_recv, my_index, index, logger);
})?;

send_guards.push(join_guard);
}

let remote_send = futures_iter.next().unwrap();

{
// let remote_sends = remote_sends.clone();
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:recv-{}", index))
.spawn(move || {
let logger = log_sender(CommunicationSetup {
process: my_index,
sender: false,
remote: Some(index),
});
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
})?;

recv_guards.push(join_guard);
}
for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
let remote_recv = promises_iter.next().unwrap();

{
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:send-{}", index))
.spawn(move || {

let logger = log_sender(CommunicationSetup {
process: my_index,
sender: true,
remote: Some(index),
});

send_loop(stream, remote_recv, my_index, index, logger);
})?;

send_guards.push(join_guard);
}

let remote_send = futures_iter.next().unwrap();

{
// let remote_sends = remote_sends.clone();
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:recv-{}", index))
.spawn(move || {
let logger = log_sender(CommunicationSetup {
process: my_index,
sender: false,
remote: Some(index),
});
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
})?;

recv_guards.push(join_guard);
}
}

Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl<T, P: BytesPush> Pusher<T, P> {
/// Creates a new `Pusher` from a header and shared byte buffer.
pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
Pusher {
header: header,
sender: sender,
header,
sender,
phantom: ::std::marker::PhantomData,
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<G: Scope> OperatorBuilder<G> {
L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
{
// create capabilities, discard references to their creation.
let mut capabilities = Vec::new();
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for output_index in 0 .. self.internal.borrow().len() {
let borrow = &self.internal.borrow()[output_index];
capabilities.push(mint_capability(G::Timestamp::minimum(), borrow.clone()));
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::dataflow::{Scope, Stream};
use crate::Data;

/// Partition a stream of records into multiple streams.
pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {
pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
/// Produces `parts` output streams, containing records produced and assigned by `route`.
///
/// # Examples
Expand All @@ -27,12 +27,11 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {

impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {

let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());

let mut input = builder.new_input(self, Pipeline);
let mut outputs = Vec::new();
let mut streams = Vec::new();
let mut outputs = Vec::with_capacity(parts as usize);
let mut streams = Vec::with_capacity(parts as usize);

for _ in 0 .. parts {
let (output, stream) = builder.new_output();
Expand All @@ -47,6 +46,7 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D
input.for_each(|time, data| {
data.swap(&mut vector);
let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();

for datum in vector.drain(..) {
let (part, datum2) = route(datum);
sessions[part as usize].give(datum2);
Expand All @@ -57,4 +57,4 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D

streams
}
}
}
2 changes: 1 addition & 1 deletion timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<T: Timestamp> Builder<T> {
// Initially this list contains observed locations with no incoming
// edges, but as the algorithm develops we add to it any locations
// that can only be reached by nodes that have been on this list.
let mut worklist = Vec::new();
let mut worklist = Vec::with_capacity(in_degree.len());
for (key, val) in in_degree.iter() {
if *val == 0 {
worklist.push(*key);
Expand Down
2 changes: 2 additions & 0 deletions timely/src/synchronization/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ impl<T: ExchangeData> Sequencer<T> {
// grab each command and queue it up
input.for_each(|time, data| {
data.swap(&mut vector);

recvd.reserve(vector.len());
for (worker, counter, element) in vector.drain(..) {
recvd.push(((time.time().clone(), worker, counter), element));
}
Expand Down