Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::cell::RefCell;
use std::fmt::{self, Debug};

use crate::order::PartialOrder;
use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::scheduling::Activations;
Expand Down Expand Up @@ -223,6 +224,7 @@ impl Display for DowngradeError {

impl Error for DowngradeError {}

/// A shared list of shared output capability buffers.
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An capability of an input port. Holding onto this capability will implicitly holds onto a
Expand All @@ -232,25 +234,32 @@ type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
/// capability and turns it into a [Capability] for a specific output port.
pub struct InputCapability<T: Timestamp> {
/// Output capability buffers, for use in minting capabilities.
internal: CapabilityUpdates<T>,
/// Timestamp summaries for each output.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
// let borrow = ;
self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer))
let borrow = self.summaries.borrow();
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
})
}
}

impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(internal: CapabilityUpdates<T>, guard: ConsumedGuard<T>) -> Self {
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
summaries,
consumed_guard: guard,
}
}
Expand All @@ -270,15 +279,11 @@ impl<T: Timestamp> InputCapability<T> {

/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
// TODO : Test operator summary?
if !self.time().less_equal(new_time) {
panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time);
}
if output_port < self.internal.borrow().len() {
use crate::progress::timestamp::PathSummary;
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
}
}

Expand All @@ -287,18 +292,23 @@ impl<T: Timestamp> InputCapability<T> {
/// This method produces an owned capability which must be dropped to release the
/// capability. Users should take care that these capabilities are only stored for
/// as long as they are required, as failing to drop them may result in livelock.
///
/// This method panics if the timestamp summary to output zero strictly advances the time.
pub fn retain(self) -> Capability<T> {
// mint(self.time.clone(), self.internal)
self.retain_for_output(0)
}

/// Transforms to an owned capability for a specific output port.
///
/// This method panics if the timestamp summary to `output_port` strictly advances the time.
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
if output_port < self.internal.borrow().len() {
Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone())
use crate::progress::timestamp::PathSummary;
let self_time = self.time().clone();
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilies time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
}
}
}
Expand Down
16 changes: 13 additions & 3 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct OperatorBuilder<G: Scope> {
frontier: Vec<MutableAntichain<G::Timestamp>>,
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
/// For each input, a shared list of summaries to each output.
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
logging: Option<Logger>,
}
Expand All @@ -45,6 +47,7 @@ impl<G: Scope> OperatorBuilder<G> {
frontier: Vec::new(),
consumed: Vec::new(),
internal: Rc::new(RefCell::new(Vec::new())),
summaries: Vec::new(),
produced: Vec::new(),
logging,
}
Expand Down Expand Up @@ -76,13 +79,16 @@ impl<G: Scope> OperatorBuilder<G> {
where
P: ParallelizationContractCore<G::Timestamp, D> {

let puller = self.builder.new_input_connection(stream, pact, connection);
let puller = self.builder.new_input_connection(stream, pact, connection.clone());

let input = PullCounter::new(puller);
self.frontier.push(MutableAntichain::new());
self.consumed.push(input.consumed().clone());

new_input_handle(input, self.internal.clone(), self.logging.clone())
let shared_summary = Rc::new(RefCell::new(connection));
self.summaries.push(shared_summary.clone());

new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone())
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
Expand All @@ -101,14 +107,18 @@ impl<G: Scope> OperatorBuilder<G> {
/// antichain indicating that there is no connection from the input to the output.
pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {

let (tee, stream) = self.builder.new_output_connection(connection);
let (tee, stream) = self.builder.new_output_connection(connection.clone());

let internal = Rc::new(RefCell::new(ChangeBatch::new()));
self.internal.borrow_mut().push(internal.clone());

let mut buffer = PushBuffer::new(PushCounter::new(tee));
self.produced.push(buffer.inner().produced().clone());

for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
summary.borrow_mut().push(connection.clone());
}

(OutputWrapper::new(buffer, internal), stream)
}

Expand Down
19 changes: 16 additions & 3 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
Expand All @@ -24,6 +25,11 @@ use crate::dataflow::operators::capability::CapabilityTrait;
pub struct InputHandleCore<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> {
pull_counter: PullCounter<T, D, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
/// Timestamp summaries from this input to each output.
///
/// Each timestamp received through this input may only produce output timestamps
/// greater or equal to the input timestamp subjected to at least one of these summaries.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
logging: Option<Logger>,
}

Expand All @@ -49,13 +55,14 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
#[inline]
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
let internal = &self.internal;
let summaries = &self.summaries;
self.pull_counter.next_guarded().map(|(guard, bundle)| {
match bundle.as_ref_or_mut() {
RefOrMut::Ref(bundle) => {
(InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data))
(InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data))
},
RefOrMut::Mut(bundle) => {
(InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
(InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data))
},
}
})
Expand Down Expand Up @@ -145,10 +152,16 @@ pub fn _access_pull_counter<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>

/// Constructs an input handle.
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(pull_counter: PullCounter<T, D, P>, internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>, logging: Option<Logger>) -> InputHandleCore<T, D, P> {
pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(
pull_counter: PullCounter<T, D, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
logging: Option<Logger>
) -> InputHandleCore<T, D, P> {
InputHandleCore {
pull_counter,
internal,
summaries,
logging,
}
}
Expand Down