From 97caf19cb1aa7fb4447d3c739b6d99b7cce6cf68 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 29 Jan 2023 13:07:06 -0500 Subject: [PATCH 1/2] Validate timestamp summary before forming capability --- timely/src/dataflow/operators/capability.rs | 34 +++++++++++-------- .../dataflow/operators/generic/builder_rc.rs | 16 +++++++-- .../src/dataflow/operators/generic/handles.rs | 19 +++++++++-- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 8a7b9f28b..3630bb4c2 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -27,6 +27,7 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use crate::order::PartialOrder; +use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::scheduling::Activations; @@ -223,6 +224,7 @@ impl Display for DowngradeError { impl Error for DowngradeError {} +/// A shared list of shared output capability buffers. type CapabilityUpdates = Rc>>>>>; /// An capability of an input port. Holding onto this capability will implicitly holds onto a @@ -232,7 +234,10 @@ type CapabilityUpdates = Rc>>>>>; /// This input capability supplies a `retain_for_output(self)` method which consumes the input /// capability and turns it into a [Capability] for a specific output port. pub struct InputCapability { + /// Output capability buffers, for use in minting capabilities. internal: CapabilityUpdates, + /// Timestamp summaries for each output. + summaries: Rc>>>, /// A drop guard that updates the consumed capability this InputCapability refers to on drop consumed_guard: ConsumedGuard, } @@ -240,7 +245,6 @@ pub struct InputCapability { impl CapabilityTrait for InputCapability { fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - // let borrow = ; self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer)) } } @@ -248,9 +252,10 @@ impl CapabilityTrait for InputCapability { impl InputCapability { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(internal: CapabilityUpdates, guard: ConsumedGuard) -> Self { + pub(crate) fn new(internal: CapabilityUpdates, summaries: Rc>>>, guard: ConsumedGuard) -> Self { InputCapability { internal, + summaries, consumed_guard: guard, } } @@ -270,15 +275,11 @@ impl InputCapability { /// Delays capability for a specific output port. pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { - // TODO : Test operator summary? - if !self.time().less_equal(new_time) { - panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time); - } - if output_port < self.internal.borrow().len() { + use crate::progress::timestamp::PathSummary; + if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) { Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone()) - } - else { - panic!("Attempted to acquire a capability for a non-existent output port."); + } else { + panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", new_time, self.summaries.borrow()[output_port], self.time()); } } @@ -287,18 +288,23 @@ impl InputCapability { /// This method produces an owned capability which must be dropped to release the /// capability. Users should take care that these capabilities are only stored for /// as long as they are required, as failing to drop them may result in livelock. + /// + /// This method panics if the timestamp summary to output zero strictly advances the time. pub fn retain(self) -> Capability { - // mint(self.time.clone(), self.internal) self.retain_for_output(0) } /// Transforms to an owned capability for a specific output port. + /// + /// This method panics if the timestamp summary to `output_port` strictly advances the time. pub fn retain_for_output(self, output_port: usize) -> Capability { - if output_port < self.internal.borrow().len() { - Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone()) + use crate::progress::timestamp::PathSummary; + let self_time = self.time().clone(); + if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) { + Capability::new(self_time, self.internal.borrow()[output_port].clone()) } else { - panic!("Attempted to acquire a capability for a non-existent output port."); + panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", self_time, self.summaries.borrow()[output_port], self_time); } } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index cc505e7ee..ca80e3182 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -31,6 +31,8 @@ pub struct OperatorBuilder { frontier: Vec>, consumed: Vec>>>, internal: Rc>>>>>, + /// For each input, a shared list of summaries to each output. + summaries: Vec::Summary>>>>>, produced: Vec>>>, logging: Option, } @@ -45,6 +47,7 @@ impl OperatorBuilder { frontier: Vec::new(), consumed: Vec::new(), internal: Rc::new(RefCell::new(Vec::new())), + summaries: Vec::new(), produced: Vec::new(), logging, } @@ -76,13 +79,16 @@ impl OperatorBuilder { where P: ParallelizationContractCore { - let puller = self.builder.new_input_connection(stream, pact, connection); + let puller = self.builder.new_input_connection(stream, pact, connection.clone()); let input = PullCounter::new(puller); self.frontier.push(MutableAntichain::new()); self.consumed.push(input.consumed().clone()); - new_input_handle(input, self.internal.clone(), self.logging.clone()) + let shared_summary = Rc::new(RefCell::new(connection)); + self.summaries.push(shared_summary.clone()); + + new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone()) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. @@ -101,7 +107,7 @@ impl OperatorBuilder { /// antichain indicating that there is no connection from the input to the output. pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { - let (tee, stream) = self.builder.new_output_connection(connection); + let (tee, stream) = self.builder.new_output_connection(connection.clone()); let internal = Rc::new(RefCell::new(ChangeBatch::new())); self.internal.borrow_mut().push(internal.clone()); @@ -109,6 +115,10 @@ impl OperatorBuilder { let mut buffer = PushBuffer::new(PushCounter::new(tee)); self.produced.push(buffer.inner().produced().clone()); + for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { + summary.borrow_mut().push(connection.clone()); + } + (OutputWrapper::new(buffer, internal), stream) } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index c73c928bc..eeddf5295 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -6,6 +6,7 @@ use std::rc::Rc; use std::cell::RefCell; +use crate::progress::Antichain; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; @@ -24,6 +25,11 @@ use crate::dataflow::operators::capability::CapabilityTrait; pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, + /// Timestamp summaries from this input to each output. + /// + /// Each timestamp received through this input may only produce output timestamps + /// greater or equal to the input timestamp subjected to at least one of these summaries. + summaries: Rc>>>, logging: Option, } @@ -49,13 +55,14 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< #[inline] pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { let internal = &self.internal; + let summaries = &self.summaries; self.pull_counter.next_guarded().map(|(guard, bundle)| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data)) + (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data)) + (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data)) }, } }) @@ -145,10 +152,16 @@ pub fn _access_pull_counter /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>(pull_counter: PullCounter, internal: Rc>>>>>, logging: Option) -> InputHandleCore { +pub fn new_input_handle>>( + pull_counter: PullCounter, + internal: Rc>>>>>, + summaries: Rc>>>, + logging: Option +) -> InputHandleCore { InputHandleCore { pull_counter, internal, + summaries, logging, } } From 16766de8e9c404bb6f048a3f93288f8d80896b51 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 10 Feb 2023 09:50:05 -0500 Subject: [PATCH 2/2] Validate summary for CapabilityTrait --- timely/src/dataflow/operators/capability.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 3630bb4c2..8afb7f509 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -245,7 +245,11 @@ pub struct InputCapability { impl CapabilityTrait for InputCapability { fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer)) + let borrow = self.summaries.borrow(); + self.internal.borrow().iter().enumerate().any(|(index, rc)| { + // To be valid, the output buffer must match and the timestamp summary needs to be the default. + Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default() + }) } }