@@ -338,14 +338,14 @@ class CheckpointSuite extends TestSuiteBase {
338338 }
339339
340340 try {
341- // This is a var because it's re-assigned when we restart from a checkpoint:
341+ // This is a var because it's re-assigned when we restart from a checkpoint
342342 var clock : ManualClock = null
343343 withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
344344 ssc.checkpoint(checkpointDir)
345345 clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
346346 val waiter = new StreamingTestWaiter (ssc)
347347 val fileStream = ssc.textFileStream(testDir.toString)
348- // MKW value 3 take a large time to process, to ensure that the driver
348+ // Make value 3 take a large time to process, to ensure that the driver
349349 // shuts down in the middle of processing the 3rd batch
350350 val mappedStream = fileStream.map(s => {
351351 val i = s.toInt
@@ -373,14 +373,16 @@ class CheckpointSuite extends TestSuiteBase {
373373 clock.addToTime(batchDuration.milliseconds)
374374 waiter.waitForTotalBatchesStarted(3 , batchDuration * 5 )
375375 Thread .sleep(1000 ) // To wait for execution to actually begin
376- logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
377- assert(outputStream.output.size > 0 , " No files processed after restart" )
376+ logInfo(" Output after first start = " + outputStream.output.mkString(" [" , " , " , " ]" ))
377+ assert(outputStream.output.size > 0 , " No files processed before restart" )
378378 ssc.stop()
379379
380380 // Verify whether files created have been recorded correctly or not
381381 assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ))
382382 }
383383
384+ // The original StreamingContext has now been stopped.
385+
384386 // Create files while the streaming driver is down
385387 for (i <- Seq (4 , 5 , 6 )) {
386388 writeFile(i, clock)
@@ -391,14 +393,15 @@ class CheckpointSuite extends TestSuiteBase {
391393 // recorded before failure were saved and successfully recovered
392394 logInfo(" *********** RESTARTING ************" )
393395 withStreamingContext(new StreamingContext (checkpointDir)) { ssc =>
394- // Copy over the time from the old clock so that we don't appear to have time-traveled:
396+ // Copy over the time from the old clock so that we don't appear to have time-traveled
395397 clock = {
396398 val newClock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
397399 newClock.setTime(clock.currentTime())
398400 newClock
399401 }
400402 val waiter = new StreamingTestWaiter (ssc)
401403 val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
404+ // Check that we remember files that were recorded before the restart
402405 assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ))
403406
404407 // Restart stream computation
@@ -410,12 +413,12 @@ class CheckpointSuite extends TestSuiteBase {
410413 waiter.waitForTotalBatchesCompleted(index + 1 , batchDuration * 5 )
411414 }
412415 clock.addToTime(batchDuration.milliseconds)
413- logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
416+ logInfo(" Output after restart = " + outputStream.output.mkString(" [" , " , " , " ]" ))
414417 assert(outputStream.output.size > 0 , " No files processed after restart" )
415418 ssc.stop()
416419
417420 // Verify whether files created while the driver was down (4, 5, 6) and files created after
418- // recovery (7, 8, 9), have been recorded or not
421+ // recovery (7, 8, 9) have been recorded
419422 assert(recordedFiles(ssc) === (1 to 9 ))
420423
421424 // Append the new output to the old buffer
0 commit comments