1818package org .apache .spark .streaming
1919
2020import java .io .File
21- import java .nio .charset .Charset
2221
23- import scala .collection .mutable .ArrayBuffer
22+ import scala .collection .mutable .{ SynchronizedBuffer , ArrayBuffer }
2423import scala .reflect .ClassTag
2524
25+ import com .google .common .base .Charsets
2626import com .google .common .io .Files
2727import org .apache .hadoop .conf .Configuration
2828import org .apache .hadoop .fs .{FileSystem , Path }
2929import org .apache .hadoop .io .{IntWritable , Text }
3030import org .apache .hadoop .mapred .TextOutputFormat
3131import org .apache .hadoop .mapreduce .lib .output .{TextOutputFormat => NewTextOutputFormat }
32+ import org .scalatest .concurrent .Eventually ._
3233
3334import org .apache .spark .SparkContext ._
3435import org .apache .spark .streaming .StreamingContext ._
@@ -47,8 +48,6 @@ class CheckpointSuite extends TestSuiteBase {
4748
4849 override def batchDuration = Milliseconds (500 )
4950
50- override def actuallyWait = true // to allow checkpoints to be written
51-
5251 override def beforeFunction () {
5352 super .beforeFunction()
5453 Utils .deleteRecursively(new File (checkpointDir))
@@ -145,7 +144,6 @@ class CheckpointSuite extends TestSuiteBase {
145144 ssc.start()
146145 advanceTimeWithRealDelay(ssc, 4 )
147146 ssc.stop()
148- System .clearProperty(" spark.streaming.manualClock.jump" )
149147 ssc = null
150148 }
151149
@@ -314,109 +312,161 @@ class CheckpointSuite extends TestSuiteBase {
314312 testCheckpointedOperation(input, operation, output, 7 )
315313 }
316314
317-
318315 // This tests whether file input stream remembers what files were seen before
319316 // the master failure and uses them again to process a large window operation.
320317 // It also tests whether batches, whose processing was incomplete due to the
321318 // failure, are re-processed or not.
322319 test(" recovery with file input stream" ) {
323320 // Set up the streaming context and input streams
321+ val batchDuration = Seconds (2 ) // Due to 1-second resolution of setLastModified() on some OS's.
324322 val testDir = Utils .createTempDir()
325- var ssc = new StreamingContext (master, framework, Seconds (1 ))
326- ssc.checkpoint(checkpointDir)
327- val fileStream = ssc.textFileStream(testDir.toString)
328- // Making value 3 take large time to process, to ensure that the master
329- // shuts down in the middle of processing the 3rd batch
330- val mappedStream = fileStream.map(s => {
331- val i = s.toInt
332- if (i == 3 ) Thread .sleep(2000 )
333- i
334- })
335-
336- // Reducing over a large window to ensure that recovery from master failure
337- // requires reprocessing of all the files seen before the failure
338- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds (30 ), Seconds (1 ))
339- val outputBuffer = new ArrayBuffer [Seq [Int ]]
340- var outputStream = new TestOutputStream (reducedStream, outputBuffer)
341- outputStream.register()
342- ssc.start()
343-
344- // Create files and advance manual clock to process them
345- // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
346- Thread .sleep(1000 )
347- for (i <- Seq (1 , 2 , 3 )) {
348- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
349- // wait to make sure that the file is written such that it gets shown in the file listings
350- Thread .sleep(1000 )
323+ val outputBuffer = new ArrayBuffer [Seq [Int ]] with SynchronizedBuffer [Seq [Int ]]
324+
325+ /**
326+ * Writes a file named `i` (which contains the number `i`) to the test directory and sets its
327+ * modification time to `clock`'s current time.
328+ */
329+ def writeFile (i : Int , clock : ManualClock ): Unit = {
330+ val file = new File (testDir, i.toString)
331+ Files .write(i + " \n " , file, Charsets .UTF_8 )
332+ assert(file.setLastModified(clock.currentTime()))
333+ // Check that the file's modification date is actually the value we wrote, since rounding or
334+ // truncation will break the test:
335+ assert(file.lastModified() === clock.currentTime())
351336 }
352- logInfo(" Output = " + outputStream.output.mkString(" ," ))
353- assert(outputStream.output.size > 0 , " No files processed before restart" )
354- ssc.stop()
355337
356- // Verify whether files created have been recorded correctly or not
357- var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
358- def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
359- assert(! recordedFiles.filter(_.endsWith(" 1" )).isEmpty)
360- assert(! recordedFiles.filter(_.endsWith(" 2" )).isEmpty)
361- assert(! recordedFiles.filter(_.endsWith(" 3" )).isEmpty)
362-
363- // Create files while the master is down
364- for (i <- Seq (4 , 5 , 6 )) {
365- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
366- Thread .sleep(1000 )
338+ /**
339+ * Returns ids that identify which files which have been recorded by the file input stream.
340+ */
341+ def recordedFiles (ssc : StreamingContext ): Seq [Int ] = {
342+ val fileInputDStream =
343+ ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
344+ val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
345+ filenames.map(_.split(File .separator).last.toInt).toSeq.sorted
367346 }
368347
369- // Recover context from checkpoint file and verify whether the files that were
370- // recorded before failure were saved and successfully recovered
371- logInfo(" *********** RESTARTING ************" )
372- ssc = new StreamingContext (checkpointDir)
373- fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
374- assert(! recordedFiles.filter(_.endsWith(" 1" )).isEmpty)
375- assert(! recordedFiles.filter(_.endsWith(" 2" )).isEmpty)
376- assert(! recordedFiles.filter(_.endsWith(" 3" )).isEmpty)
348+ try {
349+ // This is a var because it's re-assigned when we restart from a checkpoint
350+ var clock : ManualClock = null
351+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
352+ ssc.checkpoint(checkpointDir)
353+ clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
354+ val batchCounter = new BatchCounter (ssc)
355+ val fileStream = ssc.textFileStream(testDir.toString)
356+ // Make value 3 take a large time to process, to ensure that the driver
357+ // shuts down in the middle of processing the 3rd batch
358+ CheckpointSuite .batchThreeShouldBlockIndefinitely = true
359+ val mappedStream = fileStream.map(s => {
360+ val i = s.toInt
361+ if (i == 3 ) {
362+ while (CheckpointSuite .batchThreeShouldBlockIndefinitely) {
363+ Thread .sleep(Long .MaxValue )
364+ }
365+ }
366+ i
367+ })
368+
369+ // Reducing over a large window to ensure that recovery from driver failure
370+ // requires reprocessing of all the files seen before the failure
371+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30 , batchDuration)
372+ val outputStream = new TestOutputStream (reducedStream, outputBuffer)
373+ outputStream.register()
374+ ssc.start()
375+
376+ // Advance half a batch so that the first file is created after the StreamingContext starts
377+ clock.addToTime(batchDuration.milliseconds / 2 )
378+ // Create files and advance manual clock to process them
379+ for (i <- Seq (1 , 2 , 3 )) {
380+ writeFile(i, clock)
381+ // Advance the clock after creating the file to avoid a race when
382+ // setting its modification time
383+ clock.addToTime(batchDuration.milliseconds)
384+ if (i != 3 ) {
385+ // Since we want to shut down while the 3rd batch is processing
386+ eventually(eventuallyTimeout) {
387+ assert(batchCounter.getNumCompletedBatches === i)
388+ }
389+ }
390+ }
391+ clock.addToTime(batchDuration.milliseconds)
392+ eventually(eventuallyTimeout) {
393+ // Wait until all files have been recorded and all batches have started
394+ assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ) && batchCounter.getNumStartedBatches === 3 )
395+ }
396+ // Wait for a checkpoint to be written
397+ val fs = new Path (checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
398+ eventually(eventuallyTimeout) {
399+ assert(Checkpoint .getCheckpointFiles(checkpointDir, fs).size === 6 )
400+ }
401+ ssc.stop()
402+ // Check that we shut down while the third batch was being processed
403+ assert(batchCounter.getNumCompletedBatches === 2 )
404+ assert(outputStream.output.flatten === Seq (1 , 3 ))
405+ }
377406
378- // Restart stream computation
379- ssc.start()
380- for (i <- Seq (7 , 8 , 9 )) {
381- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
382- Thread .sleep(1000 )
383- }
384- Thread .sleep(1000 )
385- logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
386- assert(outputStream.output.size > 0 , " No files processed after restart" )
387- ssc.stop()
407+ // The original StreamingContext has now been stopped.
408+ CheckpointSuite .batchThreeShouldBlockIndefinitely = false
388409
389- // Verify whether files created while the driver was down have been recorded or not
390- assert(! recordedFiles.filter(_.endsWith(" 4" )).isEmpty)
391- assert(! recordedFiles.filter(_.endsWith(" 5" )).isEmpty)
392- assert(! recordedFiles.filter(_.endsWith(" 6" )).isEmpty)
393-
394- // Verify whether new files created after recover have been recorded or not
395- assert(! recordedFiles.filter(_.endsWith(" 7" )).isEmpty)
396- assert(! recordedFiles.filter(_.endsWith(" 8" )).isEmpty)
397- assert(! recordedFiles.filter(_.endsWith(" 9" )).isEmpty)
398-
399- // Append the new output to the old buffer
400- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
401- outputBuffer ++= outputStream.output
402-
403- val expectedOutput = Seq (1 , 3 , 6 , 10 , 15 , 21 , 28 , 36 , 45 )
404- logInfo(" --------------------------------" )
405- logInfo(" output, size = " + outputBuffer.size)
406- outputBuffer.foreach(x => logInfo(" [" + x.mkString(" ," ) + " ]" ))
407- logInfo(" expected output, size = " + expectedOutput.size)
408- expectedOutput.foreach(x => logInfo(" [" + x + " ]" ))
409- logInfo(" --------------------------------" )
410-
411- // Verify whether all the elements received are as expected
412- val output = outputBuffer.flatMap(x => x)
413- assert(output.contains(6 )) // To ensure that the 3rd input (i.e., 3) was processed
414- output.foreach(o => // To ensure all the inputs are correctly added cumulatively
415- assert(expectedOutput.contains(o), " Expected value " + o + " not found" )
416- )
417- // To ensure that all the inputs were received correctly
418- assert(expectedOutput.last === output.last)
419- Utils .deleteRecursively(testDir)
410+ // Create files while the streaming driver is down
411+ for (i <- Seq (4 , 5 , 6 )) {
412+ writeFile(i, clock)
413+ // Advance the clock after creating the file to avoid a race when
414+ // setting its modification time
415+ clock.addToTime(batchDuration.milliseconds)
416+ }
417+
418+ // Recover context from checkpoint file and verify whether the files that were
419+ // recorded before failure were saved and successfully recovered
420+ logInfo(" *********** RESTARTING ************" )
421+ withStreamingContext(new StreamingContext (checkpointDir)) { ssc =>
422+ // So that the restarted StreamingContext's clock has gone forward in time since failure
423+ ssc.conf.set(" spark.streaming.manualClock.jump" , (batchDuration * 3 ).milliseconds.toString)
424+ val oldClockTime = clock.currentTime()
425+ clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
426+ val batchCounter = new BatchCounter (ssc)
427+ val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
428+ // Check that we remember files that were recorded before the restart
429+ assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ))
430+
431+ // Restart stream computation
432+ ssc.start()
433+ // Verify that the clock has traveled forward to the expected time
434+ eventually(eventuallyTimeout) {
435+ clock.currentTime() === oldClockTime
436+ }
437+ // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
438+ val numBatchesAfterRestart = 4
439+ eventually(eventuallyTimeout) {
440+ assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
441+ }
442+ for ((i, index) <- Seq (7 , 8 , 9 ).zipWithIndex) {
443+ writeFile(i, clock)
444+ // Advance the clock after creating the file to avoid a race when
445+ // setting its modification time
446+ clock.addToTime(batchDuration.milliseconds)
447+ eventually(eventuallyTimeout) {
448+ assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1 )
449+ }
450+ }
451+ clock.addToTime(batchDuration.milliseconds)
452+ logInfo(" Output after restart = " + outputStream.output.mkString(" [" , " , " , " ]" ))
453+ assert(outputStream.output.size > 0 , " No files processed after restart" )
454+ ssc.stop()
455+
456+ // Verify whether files created while the driver was down (4, 5, 6) and files created after
457+ // recovery (7, 8, 9) have been recorded
458+ assert(recordedFiles(ssc) === (1 to 9 ))
459+
460+ // Append the new output to the old buffer
461+ outputBuffer ++= outputStream.output
462+
463+ // Verify whether all the elements received are as expected
464+ val expectedOutput = Seq (1 , 3 , 6 , 10 , 15 , 21 , 28 , 36 , 45 )
465+ assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
466+ }
467+ } finally {
468+ Utils .deleteRecursively(testDir)
469+ }
420470 }
421471
422472
@@ -473,12 +523,12 @@ class CheckpointSuite extends TestSuiteBase {
473523 */
474524 def advanceTimeWithRealDelay [V : ClassTag ](ssc : StreamingContext , numBatches : Long ): Seq [Seq [V ]] = {
475525 val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
476- logInfo(" Manual clock before advancing = " + clock.time )
526+ logInfo(" Manual clock before advancing = " + clock.currentTime() )
477527 for (i <- 1 to numBatches.toInt) {
478528 clock.addToTime(batchDuration.milliseconds)
479529 Thread .sleep(batchDuration.milliseconds)
480530 }
481- logInfo(" Manual clock after advancing = " + clock.time )
531+ logInfo(" Manual clock after advancing = " + clock.currentTime() )
482532 Thread .sleep(batchDuration.milliseconds)
483533
484534 val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
@@ -487,3 +537,7 @@ class CheckpointSuite extends TestSuiteBase {
487537 outputStream.output.map(_.flatten)
488538 }
489539}
540+
541+ private object CheckpointSuite extends Serializable {
542+ var batchThreeShouldBlockIndefinitely : Boolean = true
543+ }
0 commit comments