Skip to content

Commit 1372766

Browse files
author
Andrew Or
committed
Fix flaky AccumulatorSuite#verifyPeakExecutionMemorySet
Events are posted asynchronously so we may not have called onStageCompleted or onJobEnd etc. before asserting things. We should wait until all events have been processed before proceeding.
1 parent 6322d3a commit 1372766

File tree

1 file changed

+2
-0
lines changed

1 file changed

+2
-0
lines changed

core/src/test/scala/org/apache/spark/AccumulatorSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ private[spark] object AccumulatorSuite {
307307
val listener = new SaveInfoListener
308308
sc.addSparkListener(listener)
309309
testBody
310+
// wait until all events have been processed before proceeding to assert things
311+
sc.listenerBus.waitUntilEmpty(10 * 1000)
310312
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
311313
val isSet = accums.exists { a =>
312314
a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)

0 commit comments

Comments
 (0)