Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,13 @@ impl<T, D> Message<T, D> {
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or a new `Vec`. Note that the returned vector is always initialized with
/// a capacity of [Self::default_length] elements.
/// leaves in place, or a new `Vec`. If the returned vector has a different capacity than the
/// default, it will be replaced with an empty vector.
///
/// Clients are responsible to ensure that their buffers are allocated before reusing them.
#[inline]
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

Self::push_at_no_allocation(buffer, time, pusher);

// Allocate a default buffer to avoid oddly sized or empty buffers
if buffer.capacity() != Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
}
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or a new empty `Vec`. If the pusher leaves a vector with a capacity larger
/// than [Self::default_length], the vector is initialized with a new vector with
/// [Self::default_length] capacity.
#[inline]
pub fn push_at_no_allocation<P: Push<Bundle<T, D>>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(Bundle::from_typed(message));
Expand All @@ -80,9 +67,9 @@ impl<T, D> Message<T, D> {
}
}

// Avoid memory leaks by buffers growing out of bounds
if buffer.capacity() > Self::default_length() {
*buffer = Vec::with_capacity(Self::default_length());
// Avoid oddly-sized buffers
if std::mem::size_of::<D>() > 0 && buffer.capacity() != Self::default_length() {
*buffer = Default::default();
}
}
}
8 changes: 6 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {
pub fn new(pusher: P) -> Buffer<T, D, P> {
Buffer {
time: None,
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Default::default(),
pusher,
}
}
Expand Down Expand Up @@ -65,8 +65,12 @@ impl<T, D, P: Push<Bundle<T, D>>> Buffer<T, D, P> where T: Eq+Clone {

// internal method for use by `Session`.
fn give(&mut self, data: D) {
// Ensure default capacity
if self.buffer.capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffer.capacity();
self.buffer.reserve(to_reserve);
}
self.buffer.push(data);
// assert!(self.buffer.capacity() == Message::<O::Data>::default_length());
if self.buffer.len() == self.buffer.capacity() {
self.flush();
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D,
fn flush(&mut self, index: usize) {
if !self.buffers[index].is_empty() {
if let Some(ref time) = self.current {
Message::push_at_no_allocation(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]);
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
let mut pushers = self.shared.borrow_mut();
if let Some(message) = message {
for index in 1..pushers.len() {
// Ensure default capacity
if self.buffer.capacity() != Message::<T, D>::default_length() {
self.buffer = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer.extend_from_slice(&message.data);
Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
}
Expand All @@ -44,7 +48,7 @@ impl<T, D> Tee<T, D> {
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
let shared = Rc::new(RefCell::new(Vec::new()));
let port = Tee {
buffer: Vec::with_capacity(Message::<T, D>::default_length()),
buffer: Default::default(),
shared: shared.clone(),
};

Expand All @@ -55,7 +59,7 @@ impl<T, D> Tee<T, D> {
impl<T, D> Clone for Tee<T, D> {
fn clone(&self) -> Tee<T, D> {
Tee {
buffer: Vec::with_capacity(self.buffer.capacity()),
buffer: Default::default(),
shared: self.shared.clone(),
}
}
Expand Down
42 changes: 22 additions & 20 deletions timely/src/dataflow/operators/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
activate: Vec::new(),
progress: Vec::new(),
pushers: Vec::new(),
buffer1: Vec::with_capacity(Message::<T, D>::default_length()),
buffer2: Vec::with_capacity(Message::<T, D>::default_length()),
buffer1: Default::default(),
buffer2: Default::default(),
now_at: T::minimum(),
}
}
Expand Down Expand Up @@ -258,7 +258,7 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
progress: Rc<RefCell<ChangeBatch<T>>>
) {
// flush current contents, so new registrant does not see existing data.
if !self.buffer1.is_empty() { self.flush(); }
self.flush();

// we need to produce an appropriate update to the capabilities for `progress`, in case a
// user has decided to drive the handle around a bit before registering it.
Expand All @@ -272,8 +272,15 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
// flushes our buffer at each of the destinations. there can be more than one; clone if needed.
#[inline(never)]
fn flush(&mut self) {
if self.buffer1.is_empty() {
return;
}
for index in 0 .. self.pushers.len() {
if index < self.pushers.len() - 1 {
// Ensure default capacity
if self.buffer2.capacity() != Message::<T, D>::default_length() {
self.buffer2 = Vec::with_capacity(Message::<T, D>::default_length());
}
self.buffer2.extend_from_slice(&self.buffer1[..]);
Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
debug_assert!(self.buffer2.is_empty());
Expand All @@ -288,7 +295,7 @@ impl<T:Timestamp, D: Data> Handle<T, D> {

// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
fn close_epoch(&mut self) {
if !self.buffer1.is_empty() { self.flush(); }
self.flush();
for pusher in self.pushers.iter_mut() {
pusher.done();
}
Expand All @@ -304,7 +311,11 @@ impl<T:Timestamp, D: Data> Handle<T, D> {
#[inline]
/// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
pub fn send(&mut self, data: D) {
// assert!(self.buffer1.capacity() == Message::<T, D>::default_length());
// Ensure default capacity
if self.buffer1.capacity() < Message::<T, D>::default_length() {
let to_reserve = Message::<T, D>::default_length() - self.buffer1.capacity();
self.buffer1.reserve(to_reserve);
}
self.buffer1.push(data);
if self.buffer1.len() == self.buffer1.capacity() {
self.flush();
Expand All @@ -318,21 +329,12 @@ impl<T:Timestamp, D: Data> Handle<T, D> {

if !buffer.is_empty() {
// flush buffered elements to ensure local fifo.
if !self.buffer1.is_empty() { self.flush(); }

// push buffer (or clone of buffer) at each destination.
for index in 0 .. self.pushers.len() {
if index < self.pushers.len() - 1 {
self.buffer2.extend_from_slice(&buffer[..]);
Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
assert!(self.buffer2.is_empty());
}
else {
Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]);
assert!(buffer.is_empty());
}
}
buffer.clear();
self.flush();

// flush sends and clears the contents of self.buffer1, re-use by swapping buffers
::std::mem::swap(&mut self.buffer1, buffer);
self.flush();
::std::mem::swap(&mut self.buffer1, buffer);
}
}

Expand Down