Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use timely::dataflow::operators::{Input, Exchange, Probe};

// use timely::dataflow::operators::capture::EventWriter;
// use timely::dataflow::ScopeParent;
use timely::logging::TimelyEvent;
use timely::logging::{TimelyEvent, TimelyProgressEvent};

fn main() {
// initializes and runs a timely dataflow.
Expand All @@ -21,6 +21,26 @@ fn main() {
data.iter().for_each(|x| println!("LOG1: {:?}", x))
);

// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, _, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
}
println!();
})
);

// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope
Expand Down
9 changes: 7 additions & 2 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::progress::{Source, Target};
use crate::progress::timestamp::Refines;
use crate::order::Product;
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::worker::{AsWorker, Config};

use super::{ScopeParent, Scope};
Expand All @@ -32,6 +33,8 @@ where
pub parent: G,
/// The log writer for this scope.
pub logging: Option<Logger>,
/// The progress log writer for this scope.
pub progress_logging: Option<ProgressLogger>,
}

impl<'a, G, T> Child<'a, G, T>
Expand Down Expand Up @@ -115,12 +118,13 @@ where
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.subgraph.borrow().path.clone();

let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), name));
let subscope = RefCell::new(SubgraphBuilder::new_from(index, path, self.logging().clone(), self.progress_logging.clone(), name));
let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
};
func(&mut builder)
};
Expand All @@ -143,7 +147,8 @@ where
Child {
subgraph: self.subgraph,
parent: self.parent.clone(),
logging: self.logging.clone()
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
}
}
}
70 changes: 60 additions & 10 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub type WorkerIdentifier = usize;
pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
/// Logger for timely dataflow system events.
pub type TimelyLogger = Logger<TimelyEvent>;
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;

use std::time::Duration;
use crate::dataflow::operators::capture::{Event, EventPusher};
Expand Down Expand Up @@ -70,9 +72,63 @@ pub struct ChannelsEvent {
pub target: (usize, usize),
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// Encapsulates Any and Debug for dynamically typed timestamps in logs
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random comment, not a change but: I bet this trait eventually becomes useful beyond progress events. At least, assuming there is any other reason to report information about timestamps (e.g., maybe other timestamp things, I dunno) they'll probably make use of this too. Again: nothing to change, just thinking out loud.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I thought the same when I wrote it, but it seemed unnecessary to push it to a separate module at this stage.

/// Upcasts this `ProgressEventTimestamp` to `Any`.
///
/// NOTE: This is required until https://github.com/rust-lang/rfcs/issues/2765 is fixed
///
/// # Example
/// ```rust
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
/// let ts: &timely::logging::ProgressEventTimestampVec = &ts;
/// for (n, p, t, d) in ts.iter() {
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
/// }
/// println!();
/// ```
fn as_any(&self) -> &dyn std::any::Any;

/// Returns the name of the concrete type of this object.
///
/// # Note
///
/// This is intended for diagnostic use. The exact contents and format of the
/// string returned are not specified, other than being a best-effort
/// description of the type. For example, amongst the strings
/// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
/// `"std::option::Option<std::string::String>"`.
fn type_name(&self) -> &'static str;
}
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
fn as_any(&self) -> &dyn std::any::Any { self }

fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
}

/// A vector of progress updates in logs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but this would be a great place to explain the need for a trait (vs a type or a struct that matches the same description).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the doc in the latest commit.

///
/// This exists to support upcasting of the concrecte progress update vectors to
/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
/// use a single allocation for the entire vector (as opposed to a `Box` allocation
/// for each dynamically typed element).
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
/// Iterate over the contents of the vector
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
}

impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
let t: &dyn ProgressEventTimestamp = t;
(n, p, t, d)
}))
}
}

#[derive(Debug)]
/// Send or receive of progress information.
pub struct ProgressEvent {
pub struct TimelyProgressEvent {
/// `true` if the event is a send, and `false` if it is a receive.
pub is_send: bool,
/// Source worker index.
Expand All @@ -84,9 +140,9 @@ pub struct ProgressEvent {
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
pub addr: Vec<usize>,
/// List of message updates, containing Target descriptor, timestamp as string, and delta.
pub messages: Vec<(usize, usize, String, i64)>,
pub messages: Box<dyn ProgressEventTimestampVec>,
/// List of capability updates, containing Source descriptor, timestamp as string, and delta.
pub internal: Vec<(usize, usize, String, i64)>,
pub internal: Box<dyn ProgressEventTimestampVec>,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
Expand Down Expand Up @@ -225,8 +281,6 @@ pub enum TimelyEvent {
Operates(OperatesEvent),
/// Channel creation.
Channels(ChannelsEvent),
/// Progress message send or receive.
Progress(ProgressEvent),
/// Progress propagation (reasoning).
PushProgress(PushProgressEvent),
/// Message send or receive.
Expand Down Expand Up @@ -259,10 +313,6 @@ impl From<ChannelsEvent> for TimelyEvent {
fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
}

impl From<ProgressEvent> for TimelyEvent {
fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) }
}

impl From<PushProgressEvent> for TimelyEvent {
fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
}
Expand Down
44 changes: 18 additions & 26 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::{Location, Port};
use crate::communication::{Message, Push, Pull};
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;

/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
pub type ProgressVec<T> = Vec<((Location, T), i64)>;
Expand All @@ -25,12 +26,12 @@ pub struct Progcaster<T:Timestamp> {
/// Communication channel identifier
channel_identifier: usize,

logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
}

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
Expand All @@ -48,7 +49,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
counter: 0,
addr,
channel_identifier,
logging,
progress_logging,
}
}

Expand All @@ -58,34 +59,26 @@ impl<T:Timestamp+Send> Progcaster<T> {
changes.compact();
if !changes.is_empty() {

// This logging is relatively more expensive than other logging, as we
// have formatting and string allocations on the main path. We do have
// local type information about the timestamp, and we could log *that*
// explicitly, but the consumer would have to know what to look for and
// interpret appropriately. That's a big ask, so let's start with this,
// and as folks need more performant logging think about allowing users
// to select the more efficient variant.
self.logging.as_ref().map(|l| {
self.progress_logging.as_ref().map(|l| {

// Pre-allocate enough space; we transfer ownership, so there is not
// an apportunity to re-use allocations (w/o changing the logging
// interface to accept references).
let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));
Comment on lines +67 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm super confused that this works. I would have thought that as soon as this is boxed up you lose access to the push methods. Or is it because there is an auto-conversion from Box<T> to Box<dyn Trait> for any T: Trait? No change needed, just surprising to me!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe push works because of https://doc.rust-lang.org/std/boxed/struct.Box.html#impl-Deref (Deref impl for Box). And yes, you can upcast a Box<T> to a Box<dyn Trait> for the same reason you can upcast a &T to a &dyn Trait for T: Trait.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the type of messages and internal is still the concrete, non-dyn vector type. The upcast happens on assignment to the target struct below.


// TODO: Reconsider `String` type or perhaps re-use allocation.
for ((location, time), diff) in changes.iter() {
match location.port {
Port::Target(port) => {
messages.push((location.node, port, format!("{:?}", time), *diff))
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, format!("{:?}", time), *diff))
internal.push((location.node, port, time.clone(), *diff))
}
}
}

l.log(crate::logging::ProgressEvent {
l.log(crate::logging::TimelyProgressEvent {
is_send: true,
source: self.source,
channel: self.channel_identifier,
Expand Down Expand Up @@ -138,32 +131,31 @@ impl<T:Timestamp+Send> Progcaster<T> {

// See comments above about the relatively high cost of this logging, and our
// options for improving it if performance limits users who want other logging.
self.logging.as_ref().map(|l| {
self.progress_logging.as_ref().map(|l| {

let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));

// TODO: Reconsider `String` type or perhaps re-use allocation.
for ((location, time), diff) in recv_changes.iter() {

match location.port {
Port::Target(port) => {
messages.push((location.node, port, format!("{:?}", time), *diff))
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, format!("{:?}", time), *diff))
internal.push((location.node, port, time.clone(), *diff))
}
}
}

l.log(crate::logging::ProgressEvent {
l.log(crate::logging::TimelyProgressEvent {
is_send: false,
source: source,
seq_no: counter,
channel,
addr: addr.clone(),
messages,
internal,
messages: messages,
internal: internal,
});
});

Expand Down
8 changes: 7 additions & 1 deletion timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::BinaryHeap;
use std::cmp::Reverse;

use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;

use crate::scheduling::Schedule;
use crate::scheduling::activate::Activations;
Expand Down Expand Up @@ -63,6 +64,9 @@ where

/// Logging handle
logging: Option<Logger>,

/// Progress logging handle
progress_logging: Option<ProgressLogger>,
}

impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
Expand Down Expand Up @@ -95,6 +99,7 @@ where
index: usize,
mut path: Vec<usize>,
logging: Option<Logger>,
progress_logging: Option<ProgressLogger>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
Expand All @@ -114,6 +119,7 @@ where
input_messages: Vec::new(),
output_capabilities: Vec::new(),
logging,
progress_logging,
}
}

Expand Down Expand Up @@ -169,7 +175,7 @@ where

let (tracker, scope_summary) = builder.build();

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone());
let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());

let mut incomplete = vec![true; self.children.len()];
incomplete[0] = false;
Expand Down
4 changes: 3 additions & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,16 @@ impl<A: Allocate> Worker<A> {
let dataflow_index = self.allocate_dataflow_index();
let identifier = self.new_identifier();

let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), name);
let progress_logging = self.logging.borrow_mut().get("timely/progress");
let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name);
let subscope = RefCell::new(subscope);

let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
logging: logging.clone(),
progress_logging: progress_logging.clone(),
};
func(&mut resources, &mut builder)
};
Expand Down