diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 839fc0b8f..4d4289dbf 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -80,7 +80,7 @@ fn main() { // Acquire a re-activator for this operator. use timely::scheduling::Scheduler; - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); let mut cap = Some(capability); move |output| { diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index abf6fef3a..d32d0a32c 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -26,7 +26,7 @@ pub trait ParallelizationContract { /// Type implementing `Pull` produced by this pact. type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. - fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); + fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller); } /// A direct connection @@ -36,7 +36,7 @@ pub struct Pipeline; impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; - fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) @@ -72,7 +72,7 @@ where type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; - fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + fn connect(self, allocator: &mut A, identifier: usize, address: Vec, logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 233acd18e..550b10fb1 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -347,10 +347,10 @@ impl CapabilityTrait for ActivateCapability { impl ActivateCapability { /// Creates a new activating capability. - pub fn new(capability: Capability, address: &[usize], activations: Rc>) -> Self { + pub fn new(capability: Capability, address: Vec, activations: Rc>) -> Self { Self { capability, - address: Rc::new(address.to_vec()), + address: Rc::new(address), activations, } } diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 2ab843f86..e30b64065 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -72,7 +72,7 @@ where let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); let address = builder.operator_info().address; - let activator = scope.activator_for(&address[..]); + let activator = scope.activator_for(address); let (targets, stream) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 23da0e201..cd005f181 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -62,7 +62,7 @@ impl, C: Data+Container> Enter Input for G where ::Timestamp: TotalOrder { let produced = counter.produced().clone(); let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); + let address = self.addr_for_child(index); - handle.activate.push(self.activator_for(&address[..])); + handle.activate.push(self.activator_for(address.clone())); let progress = Rc::new(RefCell::new(ChangeBatch::new())); diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index da40f59e6..69b270f59 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -37,7 +37,7 @@ impl ToStreamBuilder for I wh source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); let mut iterator = self.into_iter().fuse(); let mut capability = Some(capability); diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index c513cc922..bcad2d8d8 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -91,10 +91,9 @@ impl UnorderedInput for G { let peers = self.peers(); let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); + let address = self.addr_for_child(index); - let cap = ActivateCapability::new(cap, &address, self.activations()); + let cap = ActivateCapability::new(cap, address.clone(), self.activations()); let helper = UnorderedHandle::new(counter); diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index 4f01bd802..029506a59 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -87,7 +87,7 @@ pub fn iterator_source< let mut target = G::Timestamp::minimum(); source(scope, name, |cap, info| { let mut cap = Some(cap); - let activator = scope.activator_for(&info.address[..]); + let activator = scope.activator_for(info.address); move |output| { cap = cap.take().and_then(|mut cap| { loop { diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 9522b6f34..bf0a379eb 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -70,8 +70,7 @@ impl OperatorBuilder { let global = scope.new_identifier(); let index = scope.allocate_operator_index(); - let mut address = scope.addr(); - address.push(index); + let address = scope.addr_for_child(index); let peers = scope.peers(); OperatorBuilder { @@ -119,7 +118,7 @@ impl OperatorBuilder { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); - let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging); + let (sender, receiver) = pact.connect(&mut self.scope, channel_id, self.address.clone(), logging); let target = Target::new(self.index, self.shape.inputs); stream.connect_to(target, sender, channel_id); @@ -175,7 +174,7 @@ impl OperatorBuilder { /// Information describing the operator. pub fn operator_info(&self) -> OperatorInfo { - OperatorInfo::new(self.index, self.global, &self.address[..]) + OperatorInfo::new(self.index, self.global, self.address.clone()) } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index c266b0051..752b3be2e 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -64,7 +64,7 @@ impl OperatorBuilder { where P: ParallelizationContract { - let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()]; + let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect(); self.new_input_connection(stream, pact, connection) } @@ -94,7 +94,7 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { - let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; + let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect(); self.new_output_connection(connection) } diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index d6dd92c69..08075a758 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -535,7 +535,7 @@ impl Operator for StreamCore { /// /// source(scope, "Source", |capability, info| { /// -/// let activator = scope.activator_for(&info.address[..]); +/// let activator = scope.activator_for(info.address); /// /// let mut cap = Some(capability); /// move |output| { diff --git a/timely/src/dataflow/operators/generic/operator_info.rs b/timely/src/dataflow/operators/generic/operator_info.rs index caa9cf297..f2b5313f6 100644 --- a/timely/src/dataflow/operators/generic/operator_info.rs +++ b/timely/src/dataflow/operators/generic/operator_info.rs @@ -12,11 +12,11 @@ pub struct OperatorInfo { impl OperatorInfo { /// Construct a new `OperatorInfo`. - pub fn new(local_id: usize, global_id: usize, address: &[usize]) -> OperatorInfo { + pub fn new(local_id: usize, global_id: usize, address: Vec) -> OperatorInfo { OperatorInfo { local_id, global_id, - address: address.to_vec(), + address, } } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 07789304c..dae65550e 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -58,10 +58,10 @@ where fn config(&self) -> &Config { self.parent.config() } fn index(&self) -> usize { self.parent.index() } fn peers(&self) -> usize { self.parent.peers() } - fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>) { self.parent.allocate(identifier, address) } - fn pipeline(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>) { self.parent.pipeline(identifier, address) } fn new_identifier(&mut self) -> usize { @@ -97,6 +97,15 @@ where { fn name(&self) -> String { self.subgraph.borrow().name.clone() } fn addr(&self) -> Vec { self.subgraph.borrow().path.clone() } + + fn addr_for_child(&self, index: usize) -> Vec { + let path = &self.subgraph.borrow().path[..]; + let mut addr = Vec::with_capacity(path.len() + 1); + addr.extend_from_slice(path); + addr.push(index); + addr + } + fn add_edge(&self, source: Source, target: Target) { self.subgraph.borrow_mut().connect(source, target); } diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 50ee7b8fa..ba6566684 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -33,6 +33,10 @@ pub trait Scope: ScopeParent { /// A sequence of scope identifiers describing the path from the worker root to this scope. fn addr(&self) -> Vec; + /// A sequence of scope identifiers describing the path from the worker root to the child + /// indicated by `index`. + fn addr_for_child(&self, index: usize) -> Vec; + /// Connects a source of data with a target of the data. This only links the two for /// the purposes of tracking progress, rather than effect any data movement itself. fn add_edge(&self, source: Source, target: Target); diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 3bb640adc..27a484dfc 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -31,16 +31,15 @@ pub struct Progcaster { impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, path: &Vec, mut logging: Option, progress_logging: Option) -> Progcaster { + pub fn new(worker: &mut A, addr: Vec, mut logging: Option, progress_logging: Option) -> Progcaster { let channel_identifier = worker.new_identifier(); - let (pushers, puller) = worker.allocate(channel_identifier, &path[..]); + let (pushers, puller) = worker.allocate(channel_identifier, addr.clone()); logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent { identifier: channel_identifier, kind: crate::logging::CommChannelKind::Progress, })); let worker_index = worker.index(); - let addr = path.clone(); Progcaster { to_push: None, pushers, diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f41fee190..17a66e3c0 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -131,14 +131,16 @@ where /// Adds a new child to the subgraph. pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { - { - let mut child_path = self.path.clone(); + if let Some(l) = &mut self.logging { + let mut child_path = Vec::with_capacity(self.path.len() + 1); + child_path.extend_from_slice(&self.path[..]); child_path.push(index); - self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { + + l.log(crate::logging::OperatesEvent { id: identifier, addr: child_path, name: child.name().to_owned(), - })); + }); } self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone())) } @@ -163,7 +165,8 @@ where let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. - builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]); + let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect(); + builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); } @@ -181,7 +184,7 @@ where .map(|logger| reachability::logging::TrackerLogger::new(path, logger)); let (tracker, scope_summary) = builder.build(reachability_logging); - let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone()); + let progcaster = Progcaster::new(worker, self.path.clone(), self.logging.clone(), self.progress_logging.clone()); let mut incomplete = vec![true; self.children.len()]; incomplete[0] = false; diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index e86f44355..04c7091a1 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -224,9 +224,9 @@ pub struct Activator { impl Activator { /// Creates a new activation handle - pub fn new(path: &[usize], queue: Rc>) -> Self { + pub fn new(path: Vec, queue: Rc>) -> Self { Self { - path: path.to_vec(), + path, queue, } } @@ -259,9 +259,9 @@ pub struct SyncActivator { impl SyncActivator { /// Creates a new thread-safe activation handle. - pub fn new(path: &[usize], queue: SyncActivations) -> Self { + pub fn new(path: Vec, queue: SyncActivations) -> Self { Self { - path: path.to_vec(), + path, queue, } } diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index dc2ea35a0..6e36d34f7 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -26,12 +26,12 @@ pub trait Scheduler { fn activations(&self) -> Rc>; /// Constructs an `Activator` tied to the specified operator address. - fn activator_for(&self, path: &[usize]) -> Activator { + fn activator_for(&self, path: Vec) -> Activator { Activator::new(path, self.activations()) } /// Constructs a `SyncActivator` tied to the specified operator address. - fn sync_activator_for(&self, path: &[usize]) -> SyncActivator { + fn sync_activator_for(&self, path: Vec) -> SyncActivator { let sync_activations = self.activations().borrow().sync(); SyncActivator::new(path, sync_activations) } diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 320a3dcb8..a08920191 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -128,7 +128,7 @@ impl Sequencer { activator_source .borrow_mut() .replace(CatchupActivator { - activator: scope.activator_for(&info.address[..]), + activator: scope.activator_for(info.address), catchup_until: None, }); diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 3db8d9733..6809e080e 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -191,12 +191,12 @@ pub trait AsWorker : Scheduler { /// scheduled in response to the receipt of records on the channel. /// Most commonly, this would be the address of the *target* of the /// channel. - fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>); + fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>); /// Constructs a pipeline channel from the worker to itself. /// /// By default this method uses the native channel allocation mechanism, but the expectation is /// that this behavior will be overriden to be more efficient. - fn pipeline(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher>, ThreadPuller>); + fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>); /// Allocates a new worker-unique identifier. fn new_identifier(&mut self) -> usize; @@ -231,17 +231,17 @@ impl AsWorker for Worker { fn config(&self) -> &Config { &self.config } fn index(&self) -> usize { self.allocator.borrow().index() } fn peers(&self) -> usize { self.allocator.borrow().peers() } - fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>) { + fn allocate(&mut self, identifier: usize, address: Vec) -> (Vec>>>, Box>>) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); - paths.insert(identifier, address.to_vec()); + paths.insert(identifier, address); self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().allocate(identifier) } - fn pipeline(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher>, ThreadPuller>) { + fn pipeline(&mut self, identifier: usize, address: Vec) -> (ThreadPusher>, ThreadPuller>) { if address.is_empty() { panic!("Unacceptable address: Length zero"); } let mut paths = self.paths.borrow_mut(); - paths.insert(identifier, address.to_vec()); + paths.insert(identifier, address); self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().pipeline(identifier) }