Skip to content

Commit a6394bc

Browse files
JoshRosentdas
authored andcommitted
[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage
This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <[email protected]> Closes apache#3801 from JoshRosen/SPARK-1600 and squashes the following commits: e4494f4 [Josh Rosen] Address a potential race when setting file modification times 8340bd0 [Josh Rosen] Use set comparisons for output. 0b9c252 [Josh Rosen] Fix some ManualClock usage problems. 1cc689f [Josh Rosen] ConcurrentHashMap -> SynchronizedMap db26c3a [Josh Rosen] Use standard timeout in ScalaTest `eventually` blocks. 3939432 [Josh Rosen] Rename StreamingTestWaiter to BatchCounter 0b9c3a1 [Josh Rosen] Wait for checkpoint to complete 863d71a [Josh Rosen] Remove Thread.sleep that was used to make task run slowly b4442c3 [Josh Rosen] batchTimeToSelectedFiles should be thread-safe 15b48ee [Josh Rosen] Replace several TestWaiter methods w/ ScalaTest eventually. fffc51c [Josh Rosen] Revert "Remove last remaining sleep() call" dbb8247 [Josh Rosen] Remove last remaining sleep() call 566a63f [Josh Rosen] Fix log message and comment typos da32f3f [Josh Rosen] Fix log message and comment typos 3689214 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-1600 c8f06b1 [Josh Rosen] Remove Thread.sleep calls in FileInputStream CheckpointSuite test. d4f2d87 [Josh Rosen] Refactor file input stream tests to not rely on SystemClock. dda1403 [Josh Rosen] Add StreamingTestWaiter class. 3c3efc3 [Josh Rosen] Synchronize `currentTime` in ManualClock a95ddc4 [Josh Rosen] Modify FileInputDStream to use Clock class.
1 parent 451546a commit a6394bc

File tree

6 files changed

+251
-136
lines changed

6 files changed

+251
-136
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.streaming.dstream
1919

2020
import java.io.{IOException, ObjectInputStream}
21+
import java.util.concurrent.ConcurrentHashMap
2122

2223
import scala.collection.mutable
2324
import scala.reflect.ClassTag
@@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
7475
newFilesOnly: Boolean = true)
7576
extends InputDStream[(K, V)](ssc_) {
7677

78+
// This is a def so that it works during checkpoint recovery:
79+
private def clock = ssc.scheduler.clock
80+
7781
// Data to be saved as part of the streaming checkpoints
7882
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
7983

8084
// Initial ignore threshold based on which old, existing files in the directory (at the time of
8185
// starting the streaming application) will be ignored or considered
82-
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
86+
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
8387

8488
/*
8589
* Make sure that the information of files selected in the last few batches are remembered.
@@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
9195
remember(durationToRemember)
9296

9397
// Map of batch-time to selected file info for the remembered batches
98+
// This is a concurrent map because it's also accessed in unit tests
9499
@transient private[streaming] var batchTimeToSelectedFiles =
95-
new mutable.HashMap[Time, Array[String]]
100+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
96101

97102
// Set of files that were selected in the remembered batches
98103
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
@@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
151156
*/
152157
private def findNewFiles(currentTime: Long): Array[String] = {
153158
try {
154-
lastNewFileFindingTime = System.currentTimeMillis
159+
lastNewFileFindingTime = clock.currentTime()
155160

156161
// Calculate ignore threshold
157162
val modTimeIgnoreThreshold = math.max(
@@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
164169
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
165170
}
166171
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
167-
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
172+
val timeTaken = clock.currentTime() - lastNewFileFindingTime
168173
logInfo("Finding new files took " + timeTaken + " ms")
169174
logDebug("# cached file times = " + fileToModTime.size)
170175
if (timeTaken > slideDuration.milliseconds) {
@@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
267272
logDebug(this.getClass().getSimpleName + ".readObject used")
268273
ois.defaultReadObject()
269274
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
270-
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
275+
batchTimeToSelectedFiles =
276+
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
271277
recentlySelectedFiles = new mutable.HashSet[String]()
272278
fileToModTime = new TimeStampedHashMap[String, Long](true)
273279
}

streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ class SystemClock() extends Clock {
5959
private[streaming]
6060
class ManualClock() extends Clock {
6161

62-
var time = 0L
62+
private var time = 0L
6363

64-
def currentTime() = time
64+
def currentTime() = this.synchronized {
65+
time
66+
}
6567

6668
def setTime(timeToSet: Long) = {
6769
this.synchronized {

streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
638638
if (rememberDuration != null) ssc.remember(rememberDuration)
639639
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
640640
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
641-
assert(clock.time === Seconds(10).milliseconds)
641+
assert(clock.currentTime() === Seconds(10).milliseconds)
642642
assert(output.size === numExpectedOutput)
643643
operatedStream
644644
}

0 commit comments

Comments
 (0)