Skip to content

Commit 0b9c3a1

Browse files
committed
Wait for checkpoint to complete
1 parent 863d71a commit 0b9c3a1

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -379,17 +379,19 @@ class CheckpointSuite extends TestSuiteBase {
379379
}
380380
}
381381
clock.addToTime(batchDuration.milliseconds)
382-
Thread.sleep(1000) // To wait for execution to actually begin
383382
eventually(timeout(batchDuration * 5)) {
384-
assert(waiter.getNumStartedBatches === 3)
383+
// Wait until all files have been recorded and all batches have started
384+
assert(recordedFiles(ssc) === Seq(1, 2, 3) && waiter.getNumStartedBatches === 3)
385+
}
386+
// Wait for a checkpoint to be written
387+
val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
388+
eventually(timeout(batchDuration * 5)) {
389+
assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 5)
385390
}
386-
assert(waiter.getNumCompletedBatches === 2)
387-
logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]"))
388-
assert(outputStream.output.size > 0, "No files processed before restart")
389391
ssc.stop()
390-
391-
// Verify whether files created have been recorded correctly or not
392-
assert(recordedFiles(ssc) === Seq(1, 2, 3))
392+
// Check that we shut down while the third batch was being processed
393+
assert(waiter.getNumCompletedBatches === 2)
394+
assert(outputStream.output.flatten === Seq(1, 3))
393395
}
394396

395397
// The original StreamingContext has now been stopped.

0 commit comments

Comments
 (0)