From 088d26d5981e9bbc74bcc577d9cff089dabf74b8 Mon Sep 17 00:00:00 2001 From: Andrea Lattuada Date: Tue, 12 May 2020 12:39:31 +0200 Subject: [PATCH] Fix subgraph validate_progress It's unsafe to create capabilities based on the input frontier of the operator: a negative point-stamp change may be in flight, which would cause a race condition downstream --- timely/src/progress/subgraph.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index b4b2ffeed..e41136a5a 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -721,8 +721,7 @@ impl PerOperatorState { if *diff > 0 { let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time))); let internal = child_state.sources[output].pointstamps.less_equal(time); - let external = child_state.targets.iter().any(|x| x.implications.less_equal(time)); - if !consumed && !internal && !external { + if !consumed && !internal { panic!("Progress error; internal {:?}", self.name); } } @@ -733,8 +732,7 @@ impl PerOperatorState { if *diff > 0 { let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time))); let internal = child_state.sources[output].pointstamps.less_equal(time); - let external = child_state.targets.iter().any(|x| x.implications.less_equal(time)); - if !consumed && !internal && !external { + if !consumed && !internal { panic!("Progress error; produced {:?}", self.name); } }