1818package org .apache .spark .streaming
1919
2020import java .io .File
21- import java .nio .charset .Charset
2221
23- import scala .collection .mutable .ArrayBuffer
22+ import scala .collection .mutable .{ SynchronizedBuffer , ArrayBuffer }
2423import scala .reflect .ClassTag
2524
25+ import com .google .common .base .Charsets
2626import com .google .common .io .Files
2727import org .apache .hadoop .conf .Configuration
2828import org .apache .hadoop .fs .{FileSystem , Path }
@@ -46,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase {
4646
4747 override def batchDuration = Milliseconds (500 )
4848
49- override def actuallyWait = true // to allow checkpoints to be written
50-
5149 override def beforeFunction () {
5250 super .beforeFunction()
5351 Utils .deleteRecursively(new File (checkpointDir))
@@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
281279 // failure, are re-processed or not.
282280 test(" recovery with file input stream" ) {
283281 // Set up the streaming context and input streams
282+ val batchDuration = Seconds (2 ) // Due to 1-second resolution of setLastModified() on some OS's.
284283 val testDir = Utils .createTempDir()
285- var ssc = new StreamingContext (master, framework, Seconds (1 ))
286- ssc.checkpoint(checkpointDir)
287- val fileStream = ssc.textFileStream(testDir.toString)
288- // Making value 3 take large time to process, to ensure that the master
289- // shuts down in the middle of processing the 3rd batch
290- val mappedStream = fileStream.map(s => {
291- val i = s.toInt
292- if (i == 3 ) Thread .sleep(2000 )
293- i
294- })
295-
296- // Reducing over a large window to ensure that recovery from master failure
297- // requires reprocessing of all the files seen before the failure
298- val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds (30 ), Seconds (1 ))
299- val outputBuffer = new ArrayBuffer [Seq [Int ]]
300- var outputStream = new TestOutputStream (reducedStream, outputBuffer)
301- outputStream.register()
302- ssc.start()
303-
304- // Create files and advance manual clock to process them
305- // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
306- Thread .sleep(1000 )
307- for (i <- Seq (1 , 2 , 3 )) {
308- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
309- // wait to make sure that the file is written such that it gets shown in the file listings
310- Thread .sleep(1000 )
284+ val outputBuffer = new ArrayBuffer [Seq [Int ]] with SynchronizedBuffer [Seq [Int ]]
285+
286+ def writeFile (i : Int , clock : ManualClock ): Unit = {
287+ val file = new File (testDir, i.toString)
288+ Files .write(i + " \n " , file, Charsets .UTF_8 )
289+ assert(file.setLastModified(clock.currentTime()))
290+ // Check that the file's modification date is actually the value we wrote, since rounding or
291+ // truncation will break the test:
292+ assert(file.lastModified() === clock.currentTime())
311293 }
312- logInfo(" Output = " + outputStream.output.mkString(" ," ))
313- assert(outputStream.output.size > 0 , " No files processed before restart" )
314- ssc.stop()
315294
316- // Verify whether files created have been recorded correctly or not
317- var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
318- def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
319- assert(! recordedFiles.filter(_.endsWith(" 1" )).isEmpty)
320- assert(! recordedFiles.filter(_.endsWith(" 2" )).isEmpty)
321- assert(! recordedFiles.filter(_.endsWith(" 3" )).isEmpty)
322-
323- // Create files while the master is down
324- for (i <- Seq (4 , 5 , 6 )) {
325- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
326- Thread .sleep(1000 )
295+ def recordedFiles (ssc : StreamingContext ): Seq [Int ] = {
296+ val fileInputDStream =
297+ ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
298+ val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
299+ filenames.map(_.split(File .separator).last.toInt).toSeq.sorted
327300 }
328301
329- // Recover context from checkpoint file and verify whether the files that were
330- // recorded before failure were saved and successfully recovered
331- logInfo(" *********** RESTARTING ************" )
332- ssc = new StreamingContext (checkpointDir)
333- fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf [FileInputDStream [_, _, _]]
334- assert(! recordedFiles.filter(_.endsWith(" 1" )).isEmpty)
335- assert(! recordedFiles.filter(_.endsWith(" 2" )).isEmpty)
336- assert(! recordedFiles.filter(_.endsWith(" 3" )).isEmpty)
302+ try {
303+ // This is a var because it's re-assigned when we restart from a checkpoint:
304+ var clock : ManualClock = null
305+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
306+ ssc.checkpoint(checkpointDir)
307+ clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
308+ val waiter = new StreamingTestWaiter (ssc)
309+ val fileStream = ssc.textFileStream(testDir.toString)
310+ // MKW value 3 take a large time to process, to ensure that the driver
311+ // shuts down in the middle of processing the 3rd batch
312+ val mappedStream = fileStream.map(s => {
313+ val i = s.toInt
314+ if (i == 3 ) Thread .sleep(4000 )
315+ i
316+ })
317+
318+ // Reducing over a large window to ensure that recovery from driver failure
319+ // requires reprocessing of all the files seen before the failure
320+ val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30 , batchDuration)
321+ val outputStream = new TestOutputStream (reducedStream, outputBuffer)
322+ outputStream.register()
323+ ssc.start()
324+
325+ clock.addToTime(batchDuration.milliseconds)
326+ // Create files and advance manual clock to process them
327+ for (i <- Seq (1 , 2 , 3 )) {
328+ writeFile(i, clock)
329+ clock.addToTime(batchDuration.milliseconds)
330+ if (i != 3 ) {
331+ // Since we want to shut down while the 3rd batch is processing
332+ waiter.waitForTotalBatchesCompleted(i, batchDuration * 5 )
333+ }
334+ }
335+ clock.addToTime(batchDuration.milliseconds)
336+ waiter.waitForTotalBatchesStarted(3 , batchDuration * 5 )
337+ Thread .sleep(1000 ) // To wait for execution to actually begin
338+ logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
339+ assert(outputStream.output.size > 0 , " No files processed after restart" )
340+ ssc.stop()
341+
342+ // Verify whether files created have been recorded correctly or not
343+ assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ))
344+ }
337345
338- // Restart stream computation
339- ssc.start()
340- for (i <- Seq (7 , 8 , 9 )) {
341- Files .write(i + " \n " , new File (testDir, i.toString), Charset .forName(" UTF-8" ))
342- Thread .sleep(1000 )
343- }
344- Thread .sleep(1000 )
345- logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
346- assert(outputStream.output.size > 0 , " No files processed after restart" )
347- ssc.stop()
346+ // Create files while the streaming driver is down
347+ for (i <- Seq (4 , 5 , 6 )) {
348+ writeFile(i, clock)
349+ clock.addToTime(1000 )
350+ }
348351
349- // Verify whether files created while the driver was down have been recorded or not
350- assert(! recordedFiles.filter(_.endsWith(" 4" )).isEmpty)
351- assert(! recordedFiles.filter(_.endsWith(" 5" )).isEmpty)
352- assert(! recordedFiles.filter(_.endsWith(" 6" )).isEmpty)
353-
354- // Verify whether new files created after recover have been recorded or not
355- assert(! recordedFiles.filter(_.endsWith(" 7" )).isEmpty)
356- assert(! recordedFiles.filter(_.endsWith(" 8" )).isEmpty)
357- assert(! recordedFiles.filter(_.endsWith(" 9" )).isEmpty)
358-
359- // Append the new output to the old buffer
360- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
361- outputBuffer ++= outputStream.output
362-
363- val expectedOutput = Seq (1 , 3 , 6 , 10 , 15 , 21 , 28 , 36 , 45 )
364- logInfo(" --------------------------------" )
365- logInfo(" output, size = " + outputBuffer.size)
366- outputBuffer.foreach(x => logInfo(" [" + x.mkString(" ," ) + " ]" ))
367- logInfo(" expected output, size = " + expectedOutput.size)
368- expectedOutput.foreach(x => logInfo(" [" + x + " ]" ))
369- logInfo(" --------------------------------" )
370-
371- // Verify whether all the elements received are as expected
372- val output = outputBuffer.flatMap(x => x)
373- assert(output.contains(6 )) // To ensure that the 3rd input (i.e., 3) was processed
374- output.foreach(o => // To ensure all the inputs are correctly added cumulatively
375- assert(expectedOutput.contains(o), " Expected value " + o + " not found" )
376- )
377- // To ensure that all the inputs were received correctly
378- assert(expectedOutput.last === output.last)
379- Utils .deleteRecursively(testDir)
352+ // Recover context from checkpoint file and verify whether the files that were
353+ // recorded before failure were saved and successfully recovered
354+ logInfo(" *********** RESTARTING ************" )
355+ withStreamingContext(new StreamingContext (checkpointDir)) { ssc =>
356+ // Copy over the time from the old clock so that we don't appear to have time-traveled:
357+ clock = {
358+ val newClock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
359+ newClock.setTime(clock.currentTime())
360+ newClock
361+ }
362+ val waiter = new StreamingTestWaiter (ssc)
363+ val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf [TestOutputStream [Int ]]
364+ assert(recordedFiles(ssc) === Seq (1 , 2 , 3 ))
365+
366+ // Restart stream computation
367+ ssc.start()
368+ clock.addToTime(batchDuration.milliseconds)
369+ for ((i, index) <- Seq (7 , 8 , 9 ).zipWithIndex) {
370+ writeFile(i, clock)
371+ clock.addToTime(batchDuration.milliseconds)
372+ waiter.waitForTotalBatchesCompleted(index + 1 , batchDuration * 5 )
373+ }
374+ clock.addToTime(batchDuration.milliseconds)
375+ logInfo(" Output = " + outputStream.output.mkString(" [" , " , " , " ]" ))
376+ assert(outputStream.output.size > 0 , " No files processed after restart" )
377+ ssc.stop()
378+
379+ // Verify whether files created while the driver was down (4, 5, 6) and files created after
380+ // recovery (7, 8, 9), have been recorded or not
381+ assert(recordedFiles(ssc) === (1 to 9 ))
382+
383+ // Append the new output to the old buffer
384+ outputBuffer ++= outputStream.output
385+
386+ val expectedOutput = Seq (1 , 3 , 6 , 10 , 15 , 21 , 28 , 36 , 45 )
387+ logInfo(" --------------------------------" )
388+ logInfo(s " output, size = ${outputBuffer.size}" )
389+ outputBuffer.foreach(x => logInfo(s " [ ${x.mkString(" ," )}] " ))
390+ logInfo(s " expected output, size = ${expectedOutput.size}" )
391+ expectedOutput.foreach(x => logInfo(s " [ $x] " ))
392+ logInfo(" --------------------------------" )
393+
394+ // Verify whether all the elements received are as expected
395+ val output = outputBuffer.flatMap(x => x)
396+ assert(output.contains(6 )) // To ensure that the 3rd input (i.e., 3) was processed
397+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
398+ assert(expectedOutput.contains(o), s " Expected value $o not found " )
399+ )
400+ // To ensure that all the inputs were received correctly
401+ assert(expectedOutput.last === output.last)
402+ }
403+ } finally {
404+ Utils .deleteRecursively(testDir)
405+ }
380406 }
381407
382408
0 commit comments