From a95ddc41f2b10b57fa18e75c865d7ef4507cd771 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 16 Dec 2014 00:50:01 -0800 Subject: [PATCH 01/19] Modify FileInputDStream to use Clock class. --- .../spark/streaming/dstream/FileInputDStream.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 5f13fdc5579ed..f44e561c7bf86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -74,12 +74,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { + // This is a def so that it works during checkpoint recovery: + private def clock = ssc.scheduler.clock + // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData // Initial ignore threshold based on which old, existing files in the directory (at the time of // starting the streaming application) will be ignored or considered - private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L + private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L /* * Make sure that the information of files selected in the last few batches are remembered. @@ -151,7 +154,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas */ private def findNewFiles(currentTime: Long): Array[String] = { try { - lastNewFileFindingTime = System.currentTimeMillis + lastNewFileFindingTime = clock.currentTime() // Calculate ignore threshold val modTimeIgnoreThreshold = math.max( @@ -164,7 +167,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) - val timeTaken = System.currentTimeMillis - lastNewFileFindingTime + val timeTaken = clock.currentTime() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) if (timeTaken > slideDuration.milliseconds) { From 3c3efc3f75521020f482d56b41465a6373448cf5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 16 Dec 2014 17:40:35 -0800 Subject: [PATCH 02/19] Synchronize `currentTime` in ManualClock --- .../main/scala/org/apache/spark/streaming/util/Clock.scala | 6 ++++-- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- .../scala/org/apache/spark/streaming/TestSuiteBase.scala | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index 7cd867ce34b87..d6d96d7ba00fd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -59,9 +59,11 @@ class SystemClock() extends Clock { private[streaming] class ManualClock() extends Clock { - var time = 0L + private var time = 0L - def currentTime() = time + def currentTime() = this.synchronized { + time + } def setTime(timeToSet: Long) = { this.synchronized { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 86b96785d7b87..349630de840a2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -639,7 +639,7 @@ class BasicOperationsSuite extends TestSuiteBase { if (rememberDuration != null) ssc.remember(rememberDuration) val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput) val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) + assert(clock.currentTime() === Seconds(10).milliseconds) assert(output.size === numExpectedOutput) operatedStream } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c97998add8ffa..f08ab4a7dc150 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -433,12 +433,12 @@ class CheckpointSuite extends TestSuiteBase { */ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.time) + logInfo("Manual clock before advancing = " + clock.currentTime()) for (i <- 1 to numBatches.toInt) { clock.addToTime(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.time) + logInfo("Manual clock after advancing = " + clock.currentTime()) Thread.sleep(batchDuration.milliseconds) val outputStream = ssc.graph.getOutputStreams.filter { dstream => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 52972f63c6c5c..d63734356258d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -291,7 +291,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Advance manual clock val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.time) + logInfo("Manual clock before advancing = " + clock.currentTime()) if (actuallyWait) { for (i <- 1 to numBatches) { logInfo("Actually waiting for " + batchDuration) @@ -301,7 +301,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } else { clock.addToTime(numBatches * batchDuration.milliseconds) } - logInfo("Manual clock after advancing = " + clock.time) + logInfo("Manual clock after advancing = " + clock.currentTime()) // Wait until expected number of output items have been generated val startTime = System.currentTimeMillis() From dda1403f3eaabe9125b87ac45ac3e7b0d667e9de Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 Dec 2014 01:03:00 -0800 Subject: [PATCH 03/19] Add StreamingTestWaiter class. --- .../spark/streaming/TestSuiteBase.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index d63734356258d..d0629950130e7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -21,11 +21,13 @@ import java.io.{ObjectInputStream, IOException} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.SynchronizedBuffer +import scala.concurrent.TimeoutException import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} +import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.{SparkConf, Logging} import org.apache.spark.rdd.RDD @@ -103,6 +105,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten)) } +/** + * This is an interface that can be used to block until certain events occur, such as + * the start/completion of batches. This is much less brittle than waiting on wall-clock time. + * Internally, this is implemented using a StreamingListener. Constructing a new instance of this + * class automatically registers a StreamingListener on the given StreamingContext. + */ +class StreamingTestWaiter(ssc: StreamingContext) { + + // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized` + private var numCompletedBatches = 0 + private var numStartedBatches = 0 + + private val listener = new StreamingListener { + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = + StreamingTestWaiter.this.synchronized { + numStartedBatches += 1 + StreamingTestWaiter.this.notifyAll() + } + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = + StreamingTestWaiter.this.synchronized { + numCompletedBatches += 1 + StreamingTestWaiter.this.notifyAll() + } + } + ssc.addStreamingListener(listener) + + def getNumCompletedBatches: Int = this.synchronized { + numCompletedBatches + } + + def getNumStartedBatches: Int = this.synchronized { + numStartedBatches + } + + /** + * Block until the number of completed batches reaches the given threshold. + */ + def waitForTotalBatchesCompleted( + targetNumBatches: Int, + timeout: Duration): Unit = this.synchronized { + val startTime = System.currentTimeMillis() + def successful = getNumCompletedBatches >= targetNumBatches + def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds + while (!timedOut && !successful) { + this.wait(timeout.milliseconds) + } + if (!successful && timedOut) { + throw new TimeoutException(s"Waited for $targetNumBatches completed batches, but only" + + s" $numCompletedBatches have completed after $timeout") + } + } + + /** + * Block until the number of started batches reaches the given threshold. + */ + def waitForTotalBatchesStarted( + targetNumBatches: Int, + timeout: Duration): Unit = this.synchronized { + val startTime = System.currentTimeMillis() + def successful = getNumStartedBatches >= targetNumBatches + def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds + while (!timedOut && !successful) { + this.wait(timeout.milliseconds) + } + if (!successful && timedOut) { + throw new TimeoutException(s"Waited for $targetNumBatches started batches, but only" + + s" $numStartedBatches have started after $timeout") + } + } +} + /** * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. From d4f2d87729b20f1060d456a6074f2da6a4e79cb3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 Dec 2014 01:03:54 -0800 Subject: [PATCH 04/19] Refactor file input stream tests to not rely on SystemClock. --- .../spark/streaming/InputStreamsSuite.scala | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 307052a4a9cbb..0bd196ec6d66d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -28,12 +28,10 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue} -import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually._ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel @@ -234,45 +232,48 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } def testFileStream(newFilesOnly: Boolean) { - var ssc: StreamingContext = null val testDir: File = null try { val testDir = Utils.createTempDir() + // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) - Thread.sleep(1000) // Set up the streaming context and input streams - val newConf = conf.clone.set( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - ssc = new StreamingContext(newConf, batchDuration) - val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the directory - val input = Seq(1, 2, 3, 4, 5) - input.foreach { i => - Thread.sleep(batchDuration.milliseconds) - val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charset.forName("UTF-8")) - logInfo("Created file " + file) - } + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // This `setTime` call ensures that the clock is past the creation time of `existingFile` + clock.setTime(existingFile.lastModified + 1000) + val waiter = new StreamingTestWaiter(ssc) + val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Over time, create files in the directory + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + clock.addToTime(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(i + "\n", file, Charset.forName("UTF-8")) + assert(file.setLastModified(clock.currentTime())) + assert(file.lastModified === clock.currentTime) + logInfo("Created file " + file) + waiter.waitForTotalBatchesCompleted(i, timeout = batchDuration * 5) + } - // Verify that all the files have been read - val expectedOutput = if (newFilesOnly) { - input.map(_.toString).toSet - } else { - (Seq(0) ++ input).map(_.toString).toSet - } - eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { + // Verify that all the files have been read + val expectedOutput = if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet + } assert(outputBuffer.flatten.toSet === expectedOutput) } } finally { - if (ssc != null) ssc.stop() if (testDir != null) Utils.deleteRecursively(testDir) } } From c8f06b10431c555dab3be461622d5d96aa807685 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 25 Dec 2014 02:14:06 -0800 Subject: [PATCH 05/19] Remove Thread.sleep calls in FileInputStream CheckpointSuite test. Hopefully this will fix SPARK-1600. --- .../spark/streaming/CheckpointSuite.scala | 212 ++++++++++-------- 1 file changed, 119 insertions(+), 93 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index f08ab4a7dc150..18e80fb0a198f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,11 +18,11 @@ package org.apache.spark.streaming import java.io.File -import java.nio.charset.Charset -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag +import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -46,8 +46,6 @@ class CheckpointSuite extends TestSuiteBase { override def batchDuration = Milliseconds(500) - override def actuallyWait = true // to allow checkpoints to be written - override def beforeFunction() { super.beforeFunction() Utils.deleteRecursively(new File(checkpointDir)) @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase { // failure, are re-processed or not. test("recovery with file input stream") { // Set up the streaming context and input streams + val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's. val testDir = Utils.createTempDir() - var ssc = new StreamingContext(master, framework, Seconds(1)) - ssc.checkpoint(checkpointDir) - val fileStream = ssc.textFileStream(testDir.toString) - // Making value 3 take large time to process, to ensure that the master - // shuts down in the middle of processing the 3rd batch - val mappedStream = fileStream.map(s => { - val i = s.toInt - if (i == 3) Thread.sleep(2000) - i - }) - - // Reducing over a large window to ensure that recovery from master failure - // requires reprocessing of all the files seen before the failure - val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) - val outputBuffer = new ArrayBuffer[Seq[Int]] - var outputStream = new TestOutputStream(reducedStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files and advance manual clock to process them - // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(1000) - for (i <- Seq(1, 2, 3)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - // wait to make sure that the file is written such that it gets shown in the file listings - Thread.sleep(1000) + val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + + def writeFile(i: Int, clock: ManualClock): Unit = { + val file = new File(testDir, i.toString) + Files.write(i + "\n", file, Charsets.UTF_8) + assert(file.setLastModified(clock.currentTime())) + // Check that the file's modification date is actually the value we wrote, since rounding or + // truncation will break the test: + assert(file.lastModified() === clock.currentTime()) } - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0, "No files processed before restart") - ssc.stop() - // Verify whether files created have been recorded correctly or not - var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten - assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) - - // Create files while the master is down - for (i <- Seq(4, 5, 6)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - Thread.sleep(1000) + def recordedFiles(ssc: StreamingContext): Seq[Int] = { + val fileInputDStream = + ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten + filenames.map(_.split(File.separator).last.toInt).toSeq.sorted } - // Recover context from checkpoint file and verify whether the files that were - // recorded before failure were saved and successfully recovered - logInfo("*********** RESTARTING ************") - ssc = new StreamingContext(checkpointDir) - fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + try { + // This is a var because it's re-assigned when we restart from a checkpoint: + var clock: ManualClock = null + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + ssc.checkpoint(checkpointDir) + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val waiter = new StreamingTestWaiter(ssc) + val fileStream = ssc.textFileStream(testDir.toString) + // MKW value 3 take a large time to process, to ensure that the driver + // shuts down in the middle of processing the 3rd batch + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) Thread.sleep(4000) + i + }) + + // Reducing over a large window to ensure that recovery from driver failure + // requires reprocessing of all the files seen before the failure + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputStream = new TestOutputStream(reducedStream, outputBuffer) + outputStream.register() + ssc.start() + + clock.addToTime(batchDuration.milliseconds) + // Create files and advance manual clock to process them + for (i <- Seq(1, 2, 3)) { + writeFile(i, clock) + clock.addToTime(batchDuration.milliseconds) + if (i != 3) { + // Since we want to shut down while the 3rd batch is processing + waiter.waitForTotalBatchesCompleted(i, batchDuration * 5) + } + } + clock.addToTime(batchDuration.milliseconds) + waiter.waitForTotalBatchesStarted(3, batchDuration * 5) + Thread.sleep(1000) // To wait for execution to actually begin + logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) + assert(outputStream.output.size > 0, "No files processed after restart") + ssc.stop() + + // Verify whether files created have been recorded correctly or not + assert(recordedFiles(ssc) === Seq(1, 2, 3)) + } - // Restart stream computation - ssc.start() - for (i <- Seq(7, 8, 9)) { - Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) - Thread.sleep(1000) - } - Thread.sleep(1000) - logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) - assert(outputStream.output.size > 0, "No files processed after restart") - ssc.stop() + // Create files while the streaming driver is down + for (i <- Seq(4, 5, 6)) { + writeFile(i, clock) + clock.addToTime(1000) + } - // Verify whether files created while the driver was down have been recorded or not - assert(!recordedFiles.filter(_.endsWith("4")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("5")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("6")).isEmpty) - - // Verify whether new files created after recover have been recorded or not - assert(!recordedFiles.filter(_.endsWith("7")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("8")).isEmpty) - assert(!recordedFiles.filter(_.endsWith("9")).isEmpty) - - // Append the new output to the old buffer - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] - outputBuffer ++= outputStream.output - - val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - val output = outputBuffer.flatMap(x => x) - assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed - output.foreach(o => // To ensure all the inputs are correctly added cumulatively - assert(expectedOutput.contains(o), "Expected value " + o + " not found") - ) - // To ensure that all the inputs were received correctly - assert(expectedOutput.last === output.last) - Utils.deleteRecursively(testDir) + // Recover context from checkpoint file and verify whether the files that were + // recorded before failure were saved and successfully recovered + logInfo("*********** RESTARTING ************") + withStreamingContext(new StreamingContext(checkpointDir)) { ssc => + // Copy over the time from the old clock so that we don't appear to have time-traveled: + clock = { + val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock] + newClock.setTime(clock.currentTime()) + newClock + } + val waiter = new StreamingTestWaiter(ssc) + val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + assert(recordedFiles(ssc) === Seq(1, 2, 3)) + + // Restart stream computation + ssc.start() + clock.addToTime(batchDuration.milliseconds) + for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { + writeFile(i, clock) + clock.addToTime(batchDuration.milliseconds) + waiter.waitForTotalBatchesCompleted(index + 1, batchDuration * 5) + } + clock.addToTime(batchDuration.milliseconds) + logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) + assert(outputStream.output.size > 0, "No files processed after restart") + ssc.stop() + + // Verify whether files created while the driver was down (4, 5, 6) and files created after + // recovery (7, 8, 9), have been recorded or not + assert(recordedFiles(ssc) === (1 to 9)) + + // Append the new output to the old buffer + outputBuffer ++= outputStream.output + + val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) + logInfo("--------------------------------") + logInfo(s"output, size = ${outputBuffer.size}") + outputBuffer.foreach(x => logInfo(s"[${x.mkString(",")}]")) + logInfo(s"expected output, size = ${expectedOutput.size}") + expectedOutput.foreach(x => logInfo(s"[$x]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + val output = outputBuffer.flatMap(x => x) + assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed + output.foreach(o => // To ensure all the inputs are correctly added cumulatively + assert(expectedOutput.contains(o), s"Expected value $o not found") + ) + // To ensure that all the inputs were received correctly + assert(expectedOutput.last === output.last) + } + } finally { + Utils.deleteRecursively(testDir) + } } From da32f3fc16a04bbff9b8ebf7a857fc32b8f02f1b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 22:31:06 -0800 Subject: [PATCH 06/19] Fix log message and comment typos --- .../spark/streaming/CheckpointSuite.scala | 17 ++++++++++------- .../apache/spark/streaming/TestSuiteBase.scala | 8 ++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 08c91bbbfe4ef..2e9379113c2af 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -338,14 +338,14 @@ class CheckpointSuite extends TestSuiteBase { } try { - // This is a var because it's re-assigned when we restart from a checkpoint: + // This is a var because it's re-assigned when we restart from a checkpoint var clock: ManualClock = null withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => ssc.checkpoint(checkpointDir) clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val waiter = new StreamingTestWaiter(ssc) val fileStream = ssc.textFileStream(testDir.toString) - // MKW value 3 take a large time to process, to ensure that the driver + // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch val mappedStream = fileStream.map(s => { val i = s.toInt @@ -373,14 +373,16 @@ class CheckpointSuite extends TestSuiteBase { clock.addToTime(batchDuration.milliseconds) waiter.waitForTotalBatchesStarted(3, batchDuration * 5) Thread.sleep(1000) // To wait for execution to actually begin - logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) - assert(outputStream.output.size > 0, "No files processed after restart") + logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]")) + assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() // Verify whether files created have been recorded correctly or not assert(recordedFiles(ssc) === Seq(1, 2, 3)) } + // The original StreamingContext has now been stopped. + // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { writeFile(i, clock) @@ -391,7 +393,7 @@ class CheckpointSuite extends TestSuiteBase { // recorded before failure were saved and successfully recovered logInfo("*********** RESTARTING ************") withStreamingContext(new StreamingContext(checkpointDir)) { ssc => - // Copy over the time from the old clock so that we don't appear to have time-traveled: + // Copy over the time from the old clock so that we don't appear to have time-traveled clock = { val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock] newClock.setTime(clock.currentTime()) @@ -399,6 +401,7 @@ class CheckpointSuite extends TestSuiteBase { } val waiter = new StreamingTestWaiter(ssc) val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + // Check that we remember files that were recorded before the restart assert(recordedFiles(ssc) === Seq(1, 2, 3)) // Restart stream computation @@ -410,12 +413,12 @@ class CheckpointSuite extends TestSuiteBase { waiter.waitForTotalBatchesCompleted(index + 1, batchDuration * 5) } clock.addToTime(batchDuration.milliseconds) - logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) + logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() // Verify whether files created while the driver was down (4, 5, 6) and files created after - // recovery (7, 8, 9), have been recorded or not + // recovery (7, 8, 9) have been recorded assert(recordedFiles(ssc) === (1 to 9)) // Append the new output to the old buffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index d0629950130e7..344948c638bbe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -106,10 +106,10 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], } /** - * This is an interface that can be used to block until certain events occur, such as - * the start/completion of batches. This is much less brittle than waiting on wall-clock time. - * Internally, this is implemented using a StreamingListener. Constructing a new instance of this - * class automatically registers a StreamingListener on the given StreamingContext. + * An object that can be used to block until certain events occur, such as batch start/completion. + * This is much less brittle than waiting on wall-clock time. Internally, this is implemented using + * a StreamingListener. Constructing a new instance automatically registers a StreamingListener on + * the given StreamingContext. */ class StreamingTestWaiter(ssc: StreamingContext) { From 566a63f39647b524b3fbdf357836274ff48bee85 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 22:31:06 -0800 Subject: [PATCH 07/19] Fix log message and comment typos From dbb824700d9de9099d1fba8c96fc8ac6da63b0ae Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 23:00:18 -0800 Subject: [PATCH 08/19] Remove last remaining sleep() call --- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 2e9379113c2af..56746e57c0605 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -372,7 +372,7 @@ class CheckpointSuite extends TestSuiteBase { } clock.addToTime(batchDuration.milliseconds) waiter.waitForTotalBatchesStarted(3, batchDuration * 5) - Thread.sleep(1000) // To wait for execution to actually begin + assert(waiter.getNumCompletedBatches === 2) logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() From fffc51c97bc048a13b8c2cb7ecfe80d41047fb76 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 23:54:18 -0800 Subject: [PATCH 09/19] Revert "Remove last remaining sleep() call" This reverts commit dbb824700d9de9099d1fba8c96fc8ac6da63b0ae. --- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 56746e57c0605..2e9379113c2af 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -372,7 +372,7 @@ class CheckpointSuite extends TestSuiteBase { } clock.addToTime(batchDuration.milliseconds) waiter.waitForTotalBatchesStarted(3, batchDuration * 5) - assert(waiter.getNumCompletedBatches === 2) + Thread.sleep(1000) // To wait for execution to actually begin logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() From 15b48eee1b85c81629a2ea2005aae002d99e09b5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 4 Jan 2015 23:59:45 -0800 Subject: [PATCH 10/19] Replace several TestWaiter methods w/ ScalaTest eventually. --- .../spark/streaming/CheckpointSuite.scala | 17 +++++-- .../spark/streaming/InputStreamsSuite.scala | 5 +- .../spark/streaming/TestSuiteBase.scala | 47 ++++--------------- 3 files changed, 27 insertions(+), 42 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 2e9379113c2af..fe717b066b5a7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.scalatest.concurrent.Eventually._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock @@ -360,19 +361,25 @@ class CheckpointSuite extends TestSuiteBase { outputStream.register() ssc.start() - clock.addToTime(batchDuration.milliseconds) + // Advance half a batch so that the first file is created after the StreamingContext starts + clock.addToTime(batchDuration.milliseconds / 2) // Create files and advance manual clock to process them for (i <- Seq(1, 2, 3)) { writeFile(i, clock) clock.addToTime(batchDuration.milliseconds) if (i != 3) { // Since we want to shut down while the 3rd batch is processing - waiter.waitForTotalBatchesCompleted(i, batchDuration * 5) + eventually(timeout(batchDuration * 5)) { + assert(waiter.getNumCompletedBatches === i) + } } } clock.addToTime(batchDuration.milliseconds) - waiter.waitForTotalBatchesStarted(3, batchDuration * 5) Thread.sleep(1000) // To wait for execution to actually begin + eventually(timeout(batchDuration * 5)) { + assert(waiter.getNumStartedBatches === 3) + } + assert(waiter.getNumCompletedBatches === 2) logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() @@ -410,7 +417,9 @@ class CheckpointSuite extends TestSuiteBase { for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { writeFile(i, clock) clock.addToTime(batchDuration.milliseconds) - waiter.waitForTotalBatchesCompleted(index + 1, batchDuration * 5) + eventually(timeout(batchDuration * 5)) { + assert(waiter.getNumCompletedBatches === index + 1) + } } clock.addToTime(batchDuration.milliseconds) logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 0bd196ec6d66d..26fa4c174f772 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -32,6 +32,7 @@ import scala.language.postfixOps import com.google.common.io.Files import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually._ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel @@ -262,7 +263,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(file.setLastModified(clock.currentTime())) assert(file.lastModified === clock.currentTime) logInfo("Created file " + file) - waiter.waitForTotalBatchesCompleted(i, timeout = batchDuration * 5) + eventually(timeout(batchDuration * 5)) { + assert(waiter.getNumCompletedBatches === i) + } } // Verify that all the files have been read diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 344948c638bbe..4accf0f1e9904 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -21,10 +21,11 @@ import java.io.{ObjectInputStream, IOException} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.SynchronizedBuffer -import scala.concurrent.TimeoutException +import scala.language.implicitConversions import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.time.{Span, Milliseconds => ScalaTestMilliseconds} import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener} @@ -138,42 +139,6 @@ class StreamingTestWaiter(ssc: StreamingContext) { def getNumStartedBatches: Int = this.synchronized { numStartedBatches } - - /** - * Block until the number of completed batches reaches the given threshold. - */ - def waitForTotalBatchesCompleted( - targetNumBatches: Int, - timeout: Duration): Unit = this.synchronized { - val startTime = System.currentTimeMillis() - def successful = getNumCompletedBatches >= targetNumBatches - def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds - while (!timedOut && !successful) { - this.wait(timeout.milliseconds) - } - if (!successful && timedOut) { - throw new TimeoutException(s"Waited for $targetNumBatches completed batches, but only" + - s" $numCompletedBatches have completed after $timeout") - } - } - - /** - * Block until the number of started batches reaches the given threshold. - */ - def waitForTotalBatchesStarted( - targetNumBatches: Int, - timeout: Duration): Unit = this.synchronized { - val startTime = System.currentTimeMillis() - def successful = getNumStartedBatches >= targetNumBatches - def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds - while (!timedOut && !successful) { - this.wait(timeout.milliseconds) - } - if (!successful && timedOut) { - throw new TimeoutException(s"Waited for $targetNumBatches started batches, but only" + - s" $numStartedBatches have started after $timeout") - } - } } /** @@ -236,6 +201,14 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { before(beforeFunction) after(afterFunction) + /** + * Implicit conversion which allows streaming Durations to be used with ScalaTest methods, + * such as `eventually`. + */ + implicit def streamingDurationToScalatestSpan(duration: Duration): Span = { + Span(duration.milliseconds, ScalaTestMilliseconds) + } + /** * Run a block of code with the given StreamingContext and automatically * stop the context when the block completes or when an exception is thrown. From b4442c3538ad462a0a7d39f4b2049ed230e92665 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 00:50:08 -0800 Subject: [PATCH 11/19] batchTimeToSelectedFiles should be thread-safe --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index f44e561c7bf86..d107210c13521 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} @@ -94,8 +96,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas remember(durationToRemember) // Map of batch-time to selected file info for the remembered batches + // This is a concurrent map because it's also accessed in unit tests @transient private[streaming] var batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] + new ConcurrentHashMap[Time, Array[String]].asScala // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -270,7 +273,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] () - batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]() + batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } From 863d71af830e74729f89559b7ebc6749c1de9fde Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 14:04:53 -0800 Subject: [PATCH 12/19] Remove Thread.sleep that was used to make task run slowly --- .../apache/spark/streaming/CheckpointSuite.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index fe717b066b5a7..76fa7affff050 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -311,7 +311,6 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } - // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the @@ -348,9 +347,14 @@ class CheckpointSuite extends TestSuiteBase { val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch + TaskControlFlags.taskThreeShouldBlockIndefinitely = true val mappedStream = fileStream.map(s => { val i = s.toInt - if (i == 3) Thread.sleep(4000) + if (i == 3) { + while (TaskControlFlags.taskThreeShouldBlockIndefinitely) { + Thread.sleep(Long.MaxValue) + } + } i }) @@ -389,6 +393,7 @@ class CheckpointSuite extends TestSuiteBase { } // The original StreamingContext has now been stopped. + TaskControlFlags.taskThreeShouldBlockIndefinitely = false // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { @@ -523,3 +528,8 @@ class CheckpointSuite extends TestSuiteBase { outputStream.output.map(_.flatten) } } + +// Global object with flags for controlling tasks' behavior. +private object TaskControlFlags extends Serializable { + var taskThreeShouldBlockIndefinitely: Boolean = true +} \ No newline at end of file From 0b9c3a1a4c615326b0db118001da837a703a386c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 14:14:33 -0800 Subject: [PATCH 13/19] Wait for checkpoint to complete --- .../spark/streaming/CheckpointSuite.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 76fa7affff050..a6c526e9ee917 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -379,17 +379,19 @@ class CheckpointSuite extends TestSuiteBase { } } clock.addToTime(batchDuration.milliseconds) - Thread.sleep(1000) // To wait for execution to actually begin eventually(timeout(batchDuration * 5)) { - assert(waiter.getNumStartedBatches === 3) + // Wait until all files have been recorded and all batches have started + assert(recordedFiles(ssc) === Seq(1, 2, 3) && waiter.getNumStartedBatches === 3) + } + // Wait for a checkpoint to be written + val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) + eventually(timeout(batchDuration * 5)) { + assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 5) } - assert(waiter.getNumCompletedBatches === 2) - logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]")) - assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() - - // Verify whether files created have been recorded correctly or not - assert(recordedFiles(ssc) === Seq(1, 2, 3)) + // Check that we shut down while the third batch was being processed + assert(waiter.getNumCompletedBatches === 2) + assert(outputStream.output.flatten === Seq(1, 3)) } // The original StreamingContext has now been stopped. From 3939432bfc51387dc6b6da9dc548e210b51ff55d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 14:23:12 -0800 Subject: [PATCH 14/19] Rename StreamingTestWaiter to BatchCounter --- .../spark/streaming/CheckpointSuite.scala | 12 ++++++------ .../spark/streaming/InputStreamsSuite.scala | 4 ++-- .../apache/spark/streaming/TestSuiteBase.scala | 17 ++++++++--------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a6c526e9ee917..704ef1f14876d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -343,7 +343,7 @@ class CheckpointSuite extends TestSuiteBase { withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => ssc.checkpoint(checkpointDir) clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val waiter = new StreamingTestWaiter(ssc) + val batchCounter = new BatchCounter(ssc) val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch @@ -374,14 +374,14 @@ class CheckpointSuite extends TestSuiteBase { if (i != 3) { // Since we want to shut down while the 3rd batch is processing eventually(timeout(batchDuration * 5)) { - assert(waiter.getNumCompletedBatches === i) + assert(batchCounter.getNumCompletedBatches === i) } } } clock.addToTime(batchDuration.milliseconds) eventually(timeout(batchDuration * 5)) { // Wait until all files have been recorded and all batches have started - assert(recordedFiles(ssc) === Seq(1, 2, 3) && waiter.getNumStartedBatches === 3) + assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) } // Wait for a checkpoint to be written val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) @@ -390,7 +390,7 @@ class CheckpointSuite extends TestSuiteBase { } ssc.stop() // Check that we shut down while the third batch was being processed - assert(waiter.getNumCompletedBatches === 2) + assert(batchCounter.getNumCompletedBatches === 2) assert(outputStream.output.flatten === Seq(1, 3)) } @@ -413,7 +413,7 @@ class CheckpointSuite extends TestSuiteBase { newClock.setTime(clock.currentTime()) newClock } - val waiter = new StreamingTestWaiter(ssc) + val batchCounter = new BatchCounter(ssc) val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] // Check that we remember files that were recorded before the restart assert(recordedFiles(ssc) === Seq(1, 2, 3)) @@ -425,7 +425,7 @@ class CheckpointSuite extends TestSuiteBase { writeFile(i, clock) clock.addToTime(batchDuration.milliseconds) eventually(timeout(batchDuration * 5)) { - assert(waiter.getNumCompletedBatches === index + 1) + assert(batchCounter.getNumCompletedBatches === index + 1) } } clock.addToTime(batchDuration.milliseconds) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 26fa4c174f772..3ab913a46c9da 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -246,7 +246,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] // This `setTime` call ensures that the clock is past the creation time of `existingFile` clock.setTime(existingFile.lastModified + 1000) - val waiter = new StreamingTestWaiter(ssc) + val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] @@ -264,7 +264,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(file.lastModified === clock.currentTime) logInfo("Created file " + file) eventually(timeout(batchDuration * 5)) { - assert(waiter.getNumCompletedBatches === i) + assert(batchCounter.getNumCompletedBatches === i) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4accf0f1e9904..8cfd215828f63 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -107,27 +107,26 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], } /** - * An object that can be used to block until certain events occur, such as batch start/completion. - * This is much less brittle than waiting on wall-clock time. Internally, this is implemented using - * a StreamingListener. Constructing a new instance automatically registers a StreamingListener on + * An object that counts the number of started / completed batches. This is implemented using a + * StreamingListener. Constructing a new instance automatically registers a StreamingListener on * the given StreamingContext. */ -class StreamingTestWaiter(ssc: StreamingContext) { +class BatchCounter(ssc: StreamingContext) { - // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized` + // All access to this state should be guarded by `BatchCounter.this.synchronized` private var numCompletedBatches = 0 private var numStartedBatches = 0 private val listener = new StreamingListener { override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = - StreamingTestWaiter.this.synchronized { + BatchCounter.this.synchronized { numStartedBatches += 1 - StreamingTestWaiter.this.notifyAll() + BatchCounter.this.notifyAll() } override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = - StreamingTestWaiter.this.synchronized { + BatchCounter.this.synchronized { numCompletedBatches += 1 - StreamingTestWaiter.this.notifyAll() + BatchCounter.this.notifyAll() } } ssc.addStreamingListener(listener) From db26c3a22e1f68ba559bfc8055ba3ac442703832 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 14:30:35 -0800 Subject: [PATCH 15/19] Use standard timeout in ScalaTest `eventually` blocks. --- .../apache/spark/streaming/CheckpointSuite.scala | 8 ++++---- .../spark/streaming/InputStreamsSuite.scala | 2 +- .../apache/spark/streaming/TestSuiteBase.scala | 15 ++++++--------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 704ef1f14876d..50ade85b67c84 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -373,19 +373,19 @@ class CheckpointSuite extends TestSuiteBase { clock.addToTime(batchDuration.milliseconds) if (i != 3) { // Since we want to shut down while the 3rd batch is processing - eventually(timeout(batchDuration * 5)) { + eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === i) } } } clock.addToTime(batchDuration.milliseconds) - eventually(timeout(batchDuration * 5)) { + eventually(eventuallyTimeout) { // Wait until all files have been recorded and all batches have started assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3) } // Wait for a checkpoint to be written val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) - eventually(timeout(batchDuration * 5)) { + eventually(eventuallyTimeout) { assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 5) } ssc.stop() @@ -424,7 +424,7 @@ class CheckpointSuite extends TestSuiteBase { for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { writeFile(i, clock) clock.addToTime(batchDuration.milliseconds) - eventually(timeout(batchDuration * 5)) { + eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === index + 1) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 3ab913a46c9da..e158cbdb42dc2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -263,7 +263,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(file.setLastModified(clock.currentTime())) assert(file.lastModified === clock.currentTime) logInfo("Created file " + file) - eventually(timeout(batchDuration * 5)) { + eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === i) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 8cfd215828f63..7d82c3e4aadcf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -25,7 +25,9 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.time.{Span, Milliseconds => ScalaTestMilliseconds} +import org.scalatest.time.{Span, Seconds => ScalaTestSeconds} +import org.scalatest.concurrent.Eventually.timeout +import org.scalatest.concurrent.PatienceConfiguration import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener} @@ -179,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { .setMaster(master) .setAppName(framework) + // Timeout for use in ScalaTest `eventually` blocks + val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds)) + // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { @@ -200,14 +205,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { before(beforeFunction) after(afterFunction) - /** - * Implicit conversion which allows streaming Durations to be used with ScalaTest methods, - * such as `eventually`. - */ - implicit def streamingDurationToScalatestSpan(duration: Duration): Span = { - Span(duration.milliseconds, ScalaTestMilliseconds) - } - /** * Run a block of code with the given StreamingContext and automatically * stop the context when the block completes or when an exception is thrown. From 1cc689fffa4551c2cade471ef64d3ad26159617e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 14:33:10 -0800 Subject: [PATCH 16/19] ConcurrentHashMap -> SynchronizedMap --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index d107210c13521..e7c5639a63499 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -21,7 +21,6 @@ import java.io.{IOException, ObjectInputStream} import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable -import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} @@ -98,7 +97,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests @transient private[streaming] var batchTimeToSelectedFiles = - new ConcurrentHashMap[Time, Array[String]].asScala + new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -273,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] () - batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala + batchTimeToSelectedFiles = + new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } From 0b9c252812dce97c4f57ec08d5b948964a172941 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 16:16:26 -0800 Subject: [PATCH 17/19] Fix some ManualClock usage problems. --- .../spark/streaming/CheckpointSuite.scala | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 50ade85b67c84..1094a9a49fd33 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -142,7 +142,6 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() - System.clearProperty("spark.streaming.manualClock.jump") ssc = null } @@ -347,11 +346,11 @@ class CheckpointSuite extends TestSuiteBase { val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch - TaskControlFlags.taskThreeShouldBlockIndefinitely = true + CheckpointSuite.batchThreeShouldBlockIndefinitely = true val mappedStream = fileStream.map(s => { val i = s.toInt if (i == 3) { - while (TaskControlFlags.taskThreeShouldBlockIndefinitely) { + while (CheckpointSuite.batchThreeShouldBlockIndefinitely) { Thread.sleep(Long.MaxValue) } } @@ -386,7 +385,7 @@ class CheckpointSuite extends TestSuiteBase { // Wait for a checkpoint to be written val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration) eventually(eventuallyTimeout) { - assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 5) + assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6) } ssc.stop() // Check that we shut down while the third batch was being processed @@ -395,24 +394,22 @@ class CheckpointSuite extends TestSuiteBase { } // The original StreamingContext has now been stopped. - TaskControlFlags.taskThreeShouldBlockIndefinitely = false + CheckpointSuite.batchThreeShouldBlockIndefinitely = false // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { writeFile(i, clock) - clock.addToTime(1000) + clock.addToTime(batchDuration.milliseconds) } // Recover context from checkpoint file and verify whether the files that were // recorded before failure were saved and successfully recovered logInfo("*********** RESTARTING ************") withStreamingContext(new StreamingContext(checkpointDir)) { ssc => - // Copy over the time from the old clock so that we don't appear to have time-traveled - clock = { - val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock] - newClock.setTime(clock.currentTime()) - newClock - } + // So that the restarted StreamingContext's clock has gone forward in time since failure + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString) + val oldClockTime = clock.currentTime() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val batchCounter = new BatchCounter(ssc) val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] // Check that we remember files that were recorded before the restart @@ -420,12 +417,20 @@ class CheckpointSuite extends TestSuiteBase { // Restart stream computation ssc.start() - clock.addToTime(batchDuration.milliseconds) + // Verify that the clock has traveled forward to the expected time + eventually(eventuallyTimeout) { + clock.currentTime() === oldClockTime + } + // Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch) + val numBatchesAfterRestart = 4 + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart) + } for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { writeFile(i, clock) clock.addToTime(batchDuration.milliseconds) eventually(eventuallyTimeout) { - assert(batchCounter.getNumCompletedBatches === index + 1) + assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) } } clock.addToTime(batchDuration.milliseconds) @@ -531,7 +536,6 @@ class CheckpointSuite extends TestSuiteBase { } } -// Global object with flags for controlling tasks' behavior. -private object TaskControlFlags extends Serializable { - var taskThreeShouldBlockIndefinitely: Boolean = true +private object CheckpointSuite extends Serializable { + var batchThreeShouldBlockIndefinitely: Boolean = true } \ No newline at end of file From 8340bd0ab50209316202237a9aee7be619a0b922 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 16:19:25 -0800 Subject: [PATCH 18/19] Use set comparisons for output. --- .../spark/streaming/CheckpointSuite.scala | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 1094a9a49fd33..f2a22307659cc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -445,22 +445,9 @@ class CheckpointSuite extends TestSuiteBase { // Append the new output to the old buffer outputBuffer ++= outputStream.output - val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) - logInfo("--------------------------------") - logInfo(s"output, size = ${outputBuffer.size}") - outputBuffer.foreach(x => logInfo(s"[${x.mkString(",")}]")) - logInfo(s"expected output, size = ${expectedOutput.size}") - expectedOutput.foreach(x => logInfo(s"[$x]")) - logInfo("--------------------------------") - // Verify whether all the elements received are as expected - val output = outputBuffer.flatMap(x => x) - assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed - output.foreach(o => // To ensure all the inputs are correctly added cumulatively - assert(expectedOutput.contains(o), s"Expected value $o not found") - ) - // To ensure that all the inputs were received correctly - assert(expectedOutput.last === output.last) + val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) + assert(outputBuffer.flatten.toSet === expectedOutput.toSet) } } finally { Utils.deleteRecursively(testDir) From e4494f41ed5931c01b2845418e7705dfd19ba9fb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 5 Jan 2015 19:19:36 -0800 Subject: [PATCH 19/19] Address a potential race when setting file modification times --- .../apache/spark/streaming/CheckpointSuite.scala | 13 +++++++++++++ .../apache/spark/streaming/InputStreamsSuite.scala | 11 +++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index f2a22307659cc..8f8bc61437ba5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -320,6 +320,10 @@ class CheckpointSuite extends TestSuiteBase { val testDir = Utils.createTempDir() val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + /** + * Writes a file named `i` (which contains the number `i`) to the test directory and sets its + * modification time to `clock`'s current time. + */ def writeFile(i: Int, clock: ManualClock): Unit = { val file = new File(testDir, i.toString) Files.write(i + "\n", file, Charsets.UTF_8) @@ -329,6 +333,9 @@ class CheckpointSuite extends TestSuiteBase { assert(file.lastModified() === clock.currentTime()) } + /** + * Returns ids that identify which files which have been recorded by the file input stream. + */ def recordedFiles(ssc: StreamingContext): Seq[Int] = { val fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] @@ -369,6 +376,8 @@ class CheckpointSuite extends TestSuiteBase { // Create files and advance manual clock to process them for (i <- Seq(1, 2, 3)) { writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time clock.addToTime(batchDuration.milliseconds) if (i != 3) { // Since we want to shut down while the 3rd batch is processing @@ -399,6 +408,8 @@ class CheckpointSuite extends TestSuiteBase { // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time clock.addToTime(batchDuration.milliseconds) } @@ -428,6 +439,8 @@ class CheckpointSuite extends TestSuiteBase { } for ((i, index) <- Seq(7, 8, 9).zipWithIndex) { writeFile(i, clock) + // Advance the clock after creating the file to avoid a race when + // setting its modification time clock.addToTime(batchDuration.milliseconds) eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index e158cbdb42dc2..bddf51e130422 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -235,6 +235,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { def testFileStream(newFilesOnly: Boolean) { val testDir: File = null try { + val batchDuration = Seconds(2) val testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") @@ -245,7 +246,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] // This `setTime` call ensures that the clock is past the creation time of `existingFile` - clock.setTime(existingFile.lastModified + 1000) + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) @@ -254,15 +255,21 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { outputStream.register() ssc.start() + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.addToTime(batchDuration.milliseconds / 2) + // Over time, create files in the directory val input = Seq(1, 2, 3, 4, 5) input.foreach { i => - clock.addToTime(batchDuration.milliseconds) val file = new File(testDir, i.toString) Files.write(i + "\n", file, Charset.forName("UTF-8")) assert(file.setLastModified(clock.currentTime())) assert(file.lastModified === clock.currentTime) logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) eventually(eventuallyTimeout) { assert(batchCounter.getNumCompletedBatches === i) }