diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 3055354bd..548caf5dd 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -221,6 +221,11 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session(cap.time()) } + + /// Flushes all pending data and indicate that no more data immediately follows. + pub fn cease(&mut self) { + self.push_buffer.cease(); + } } impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> {