Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
}

/// A guard type that updates the change batch counts on drop
pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> {
consumed: &'a Rc<RefCell<ChangeBatch<T>>>,
pub struct ConsumedGuard<T: Ord + Clone + 'static> {
consumed: Rc<RefCell<ChangeBatch<T>>>,
time: Option<T>,
len: usize,
}

impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> {
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
pub(crate) fn time(&self) -> &T {
&self.time.as_ref().unwrap()
}
}

impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
fn drop(&mut self) {
self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64);
// SAFETY: we're in a Drop impl, so this runs at most once
let time = self.time.take().unwrap();
self.consumed.borrow_mut().update(time, self.len as i64);
}
}

Expand All @@ -36,11 +44,11 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<'_, T>, &mut BundleCore<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut BundleCore<T, D>)> {
if let Some(message) = self.pullable.pull() {
if message.data.len() > 0 {
let guard = ConsumedGuard {
consumed: &self.consumed,
consumed: Rc::clone(&self.consumed),
time: Some(message.time.clone()),
len: message.data.len(),
};
Expand Down
44 changes: 22 additions & 22 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,39 +225,39 @@ impl Error for DowngradeError {}

type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An unowned capability, which can be used but not retained.
/// An capability of an input port. Holding onto this capability will implicitly holds onto a
/// capability for all the outputs ports this input is connected to, after the connection summaries
/// have been applied.
///
/// The capability reference supplies a `retain(self)` method which consumes the reference
/// and turns it into an owned capability
pub struct CapabilityRef<'cap, T: Timestamp+'cap> {
time: &'cap T,
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
/// capability and turns it into a [Capability] for a specific output port.
pub struct InputCapability<T: Timestamp> {
internal: CapabilityUpdates<T>,
/// A drop guard that updates the consumed capability this CapabilityRef refers to on drop
_consumed_guard: ConsumedGuard<'cap, T>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<'cap, T: Timestamp+'cap> CapabilityTrait<T> for CapabilityRef<'cap, T> {
fn time(&self) -> &T { self.time }
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
// let borrow = ;
self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer))
}
}

impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates<T>, guard: ConsumedGuard<'cap, T>) -> Self {
CapabilityRef {
time,
pub(crate) fn new(internal: CapabilityUpdates<T>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
_consumed_guard: guard,
consumed_guard: guard,
}
}

/// The timestamp associated with this capability.
pub fn time(&self) -> &T {
self.time
self.consumed_guard.time()
}

/// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
Expand All @@ -271,7 +271,7 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
// TODO : Test operator summary?
if !self.time.less_equal(new_time) {
if !self.time().less_equal(new_time) {
panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time);
}
if output_port < self.internal.borrow().len() {
Expand All @@ -295,26 +295,26 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
/// Transforms to an owned capability for a specific output port.
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
if output_port < self.internal.borrow().len() {
Capability::new(self.time.clone(), self.internal.borrow()[output_port].clone())
Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
}
}
}

impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> {
impl<T: Timestamp> Deref for InputCapability<T> {
type Target = T;

fn deref(&self) -> &T {
self.time
self.time()
}
}

impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> {
impl<T: Timestamp> Debug for InputCapability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("CapabilityRef")
.field("time", &self.time)
f.debug_struct("InputCapability")
.field("time", self.time())
.field("internal", &"...")
.finish()
}
Expand Down
14 changes: 7 additions & 7 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::communication::{Push, Pull, message::RefOrMut};
use crate::Container;
use crate::logging::TimelyLogger as Logger;

use crate::dataflow::operators::CapabilityRef;
use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;

/// Handle to an operator's input stream.
Expand Down Expand Up @@ -47,15 +47,15 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
/// Returns `None` when there's no more data available.
#[inline]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<D>)> {
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
let internal = &self.internal;
self.pull_counter.next_guarded().map(|(guard, bundle)| {
match bundle.as_ref_or_mut() {
RefOrMut::Ref(bundle) => {
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data))
(InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data))
},
RefOrMut::Mut(bundle) => {
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
(InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
},
}
})
Expand All @@ -80,7 +80,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
/// });
/// ```
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
let mut logging = self.logging.take();
while let Some((cap, data)) = self.next() {
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
Expand All @@ -105,7 +105,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInp
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
/// Returns `None` when there's no more data available.
#[inline]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<D>)> {
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
self.handle.next()
}

Expand All @@ -128,7 +128,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInp
/// });
/// ```
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<D>)>(&mut self, logic: F) {
pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, logic: F) {
self.handle.for_each(logic)
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ pub mod count;

// keep "mint" module-private
mod capability;
pub use self::capability::{ActivateCapability, Capability, CapabilityRef, CapabilitySet, DowngradeError};
pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError};