From 1292e9e403a5b8ae3d2ef0198252b5dfc0ae09dc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 23 Jun 2020 07:16:25 -0400 Subject: [PATCH] relax debug test --- kafkaesque/Cargo.toml | 2 +- timely/src/progress/subgraph.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kafkaesque/Cargo.toml b/kafkaesque/Cargo.toml index ccd81d486..27d1d3f4b 100644 --- a/kafkaesque/Cargo.toml +++ b/kafkaesque/Cargo.toml @@ -10,4 +10,4 @@ abomonation="0.7" timely = { path = "../timely" } [dependencies.rdkafka] -version = "0.20.0" +version = "0.23.0" diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index e41136a5a..fa9535459 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -720,8 +720,9 @@ impl PerOperatorState { for (time, diff) in internal.iter() { if *diff > 0 { let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time))); - let internal = child_state.sources[output].pointstamps.less_equal(time); + let internal = child_state.sources[output].implications.less_equal(time); if !consumed && !internal { + println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications); panic!("Progress error; internal {:?}", self.name); } } @@ -731,8 +732,9 @@ impl PerOperatorState { for (time, diff) in produced.iter() { if *diff > 0 { let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time))); - let internal = child_state.sources[output].pointstamps.less_equal(time); + let internal = child_state.sources[output].implications.less_equal(time); if !consumed && !internal { + println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications); panic!("Progress error; produced {:?}", self.name); } }