Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,11 @@ class CheckpointSuite extends TestSuiteBase {
}
}
}
clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
// Wait until all files have been recorded and all batches have started
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
}
clock.advance(batchDuration.milliseconds)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will add one batch, so I move it here. And because of this line, the clock of the recovery StreamingContext will be 14000ms.

// Wait for a checkpoint to be written
eventually(eventuallyTimeout) {
assert(Checkpoint.getCheckpointFiles(checkpointDir).size === 6)
Expand All @@ -454,9 +454,12 @@ class CheckpointSuite extends TestSuiteBase {
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
// So that the restarted StreamingContext's clock has gone forward in time since failure
ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
val oldClockTime = clock.getTimeMillis()
// "batchDuration.milliseconds * 3" has gone before restarting StreamingContext. And because
// the recovery time is read from the checkpoint time but the original clock doesn't align
// with the batch time, we need to add the offset "batchDuration.milliseconds / 2".
ssc.conf.set("spark.streaming.manualClock.jump",
(batchDuration.milliseconds / 2 + batchDuration.milliseconds * 3).toString)
val oldClockTime = clock.getTimeMillis() // 15000ms
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
Expand All @@ -467,10 +470,10 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah, I guess we just never asserted this??

// Verify that the clock has traveled forward to the expected time
eventually(eventuallyTimeout) {
clock.getTimeMillis() === oldClockTime
assert(clock.getTimeMillis() === oldClockTime)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous codes will set the clock to 14000ms. But actually there are 5 batches here. And the modification tile of file 7 will be set to 14000ms. However batch 14000ms may run before writeFile, so that the file 7 won't be used in batch 14000ms. Then file 7 and file 8 will be in the same batch (16000ms). This is the cause of this failure. Here is the log:

15/07/08 14:23:01.441 JobGenerator INFO FileInputDStream: New files at time 16000 ms:
file:/home/jenkins/workspace/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE/hadoop2.3/label/centos/target/tmp/spark-bc9590dd-4a55-41a2-a597-83792923792e/8
file:/home/jenkins/workspace/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE/hadoop2.3/label/centos/target/tmp/spark-bc9590dd-4a55-41a2-a597-83792923792e/7

After I fix spark.streaming.manualClock.jump, this case won't happen.

}
// Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
val numBatchesAfterRestart = 4
// There are 5 batches between 6000ms and 15000ms (inclusive).
val numBatchesAfterRestart = 5
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
}
Expand All @@ -483,7 +486,6 @@ class CheckpointSuite extends TestSuiteBase {
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary now.

}
}
clock.advance(batchDuration.milliseconds)
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
Expand Down