From de1f20ec735c6fa3ebf4a214e2108c5a74bc260d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 18 Jan 2023 16:11:33 -0500 Subject: [PATCH] Add cease to output handles In certain situations, a handle survives longer than we would like to wait for a flush to happen. In this cases, an explicit call to cease can help to indicate to the rest of the system that no more data follows immediately, which is equivalent to dropping the handle. Specifically, in async code the handle can be long-lived and survive await points, which makes it more important to signal momentary completion to the system. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/generic/handles.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 3055354bd..548caf5dd 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -221,6 +221,11 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session(cap.time()) } + + /// Flushes all pending data and indicate that no more data immediately follows. + pub fn cease(&mut self) { + self.push_buffer.cease(); + } } impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> {