From 6eba6a6d3eefe53a47508bf5fa7d44dfb630f376 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 28 Nov 2022 09:45:18 -0500 Subject: [PATCH] Activate operators that may want to shut down --- timely/src/progress/subgraph.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 309d9077c..f41fee190 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -200,6 +200,7 @@ where incomplete_count, activations, temp_active: BinaryHeap::new(), + maybe_shutdown: Vec::new(), children: self.children, input_messages: self.input_messages, output_capabilities: self.output_capabilities, @@ -242,6 +243,7 @@ where // shared activations (including children). activations: Rc>, temp_active: BinaryHeap>, + maybe_shutdown: Vec, // shared state written to by the datapath, counting records entering this subgraph instance. input_messages: Vec>>>, @@ -461,6 +463,7 @@ where // Drain propagated information into shared progress structure. for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { if self.children[location.node].notify { @@ -477,6 +480,18 @@ where } } + // Consider scheduling each recipient of progress information to shut down. + self.maybe_shutdown.sort(); + self.maybe_shutdown.dedup(); + for child_index in self.maybe_shutdown.drain(..) { + let child_state = self.pointstamp_tracker.node_state(child_index); + let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty()); + let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty()); + if frontiers_empty && no_capabilities { + self.temp_active.push(Reverse(child_index)); + } + } + // Extract child zero frontier changes and report as internal capability changes. for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() { self.pointstamp_tracker