diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 7d174f083..c3828a6ca 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -47,26 +47,13 @@ impl Message { } /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher - /// leaves in place, or a new `Vec`. Note that the returned vector is always initialized with - /// a capacity of [Self::default_length] elements. + /// leaves in place, or a new `Vec`. If the returned vector has a different capacity than the + /// default, it will be replaced with an empty vector. + /// + /// Clients are responsible to ensure that their buffers are allocated before reusing them. #[inline] pub fn push_at>>(buffer: &mut Vec, time: T, pusher: &mut P) { - Self::push_at_no_allocation(buffer, time, pusher); - - // Allocate a default buffer to avoid oddly sized or empty buffers - if buffer.capacity() != Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); - } - } - - /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher - /// leaves in place, or a new empty `Vec`. If the pusher leaves a vector with a capacity larger - /// than [Self::default_length], the vector is initialized with a new vector with - /// [Self::default_length] capacity. - #[inline] - pub fn push_at_no_allocation>>(buffer: &mut Vec, time: T, pusher: &mut P) { - let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); let mut bundle = Some(Bundle::from_typed(message)); @@ -80,9 +67,9 @@ impl Message { } } - // Avoid memory leaks by buffers growing out of bounds - if buffer.capacity() > Self::default_length() { - *buffer = Vec::with_capacity(Self::default_length()); + // Avoid oddly-sized buffers + if std::mem::size_of::() > 0 && buffer.capacity() != Self::default_length() { + *buffer = Default::default(); } } } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 288d5ff88..47663c308 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -23,7 +23,7 @@ impl>> Buffer where T: Eq+Clone { pub fn new(pusher: P) -> Buffer { Buffer { time: None, - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Default::default(), pusher, } } @@ -65,8 +65,12 @@ impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. fn give(&mut self, data: D) { + // Ensure default capacity + if self.buffer.capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffer.capacity(); + self.buffer.reserve(to_reserve); + } self.buffer.push(data); - // assert!(self.buffer.capacity() == Message::::default_length()); if self.buffer.len() == self.buffer.capacity() { self.flush(); } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 21c781cff..de613d2cc 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -31,7 +31,7 @@ impl>, H: FnMut(&T, &D)->u64> Exchange Push> for Tee { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { + // Ensure default capacity + if self.buffer.capacity() != Message::::default_length() { + self.buffer = Vec::with_capacity(Message::::default_length()); + } self.buffer.extend_from_slice(&message.data); Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]); } @@ -44,7 +48,7 @@ impl Tee { pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); let port = Tee { - buffer: Vec::with_capacity(Message::::default_length()), + buffer: Default::default(), shared: shared.clone(), }; @@ -55,7 +59,7 @@ impl Tee { impl Clone for Tee { fn clone(&self) -> Tee { Tee { - buffer: Vec::with_capacity(self.buffer.capacity()), + buffer: Default::default(), shared: self.shared.clone(), } } diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 11d02896d..94367b6cf 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -212,8 +212,8 @@ impl Handle { activate: Vec::new(), progress: Vec::new(), pushers: Vec::new(), - buffer1: Vec::with_capacity(Message::::default_length()), - buffer2: Vec::with_capacity(Message::::default_length()), + buffer1: Default::default(), + buffer2: Default::default(), now_at: T::minimum(), } } @@ -258,7 +258,7 @@ impl Handle { progress: Rc>> ) { // flush current contents, so new registrant does not see existing data. - if !self.buffer1.is_empty() { self.flush(); } + self.flush(); // we need to produce an appropriate update to the capabilities for `progress`, in case a // user has decided to drive the handle around a bit before registering it. @@ -272,8 +272,15 @@ impl Handle { // flushes our buffer at each of the destinations. there can be more than one; clone if needed. #[inline(never)] fn flush(&mut self) { + if self.buffer1.is_empty() { + return; + } for index in 0 .. self.pushers.len() { if index < self.pushers.len() - 1 { + // Ensure default capacity + if self.buffer2.capacity() != Message::::default_length() { + self.buffer2 = Vec::with_capacity(Message::::default_length()); + } self.buffer2.extend_from_slice(&self.buffer1[..]); Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); debug_assert!(self.buffer2.is_empty()); @@ -288,7 +295,7 @@ impl Handle { // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } + self.flush(); for pusher in self.pushers.iter_mut() { pusher.done(); } @@ -304,7 +311,11 @@ impl Handle { #[inline] /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. pub fn send(&mut self, data: D) { - // assert!(self.buffer1.capacity() == Message::::default_length()); + // Ensure default capacity + if self.buffer1.capacity() < Message::::default_length() { + let to_reserve = Message::::default_length() - self.buffer1.capacity(); + self.buffer1.reserve(to_reserve); + } self.buffer1.push(data); if self.buffer1.len() == self.buffer1.capacity() { self.flush(); @@ -318,21 +329,12 @@ impl Handle { if !buffer.is_empty() { // flush buffered elements to ensure local fifo. - if !self.buffer1.is_empty() { self.flush(); } - - // push buffer (or clone of buffer) at each destination. - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.extend_from_slice(&buffer[..]); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); - assert!(buffer.is_empty()); - } - } - buffer.clear(); + self.flush(); + + // flush sends and clears the contents of self.buffer1, re-use by swapping buffers + ::std::mem::swap(&mut self.buffer1, buffer); + self.flush(); + ::std::mem::swap(&mut self.buffer1, buffer); } }