Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a95ddc4
Modify FileInputDStream to use Clock class.
JoshRosen Dec 16, 2014
3c3efc3
Synchronize `currentTime` in ManualClock
JoshRosen Dec 17, 2014
dda1403
Add StreamingTestWaiter class.
JoshRosen Dec 25, 2014
d4f2d87
Refactor file input stream tests to not rely on SystemClock.
JoshRosen Dec 25, 2014
c8f06b1
Remove Thread.sleep calls in FileInputStream CheckpointSuite test.
JoshRosen Dec 25, 2014
3689214
Merge remote-tracking branch 'origin/master' into SPARK-1600
JoshRosen Jan 5, 2015
da32f3f
Fix log message and comment typos
JoshRosen Jan 5, 2015
566a63f
Fix log message and comment typos
JoshRosen Jan 5, 2015
dbb8247
Remove last remaining sleep() call
JoshRosen Jan 5, 2015
fffc51c
Revert "Remove last remaining sleep() call"
JoshRosen Jan 5, 2015
15b48ee
Replace several TestWaiter methods w/ ScalaTest eventually.
JoshRosen Jan 5, 2015
b4442c3
batchTimeToSelectedFiles should be thread-safe
JoshRosen Jan 5, 2015
863d71a
Remove Thread.sleep that was used to make task run slowly
JoshRosen Jan 5, 2015
0b9c3a1
Wait for checkpoint to complete
JoshRosen Jan 5, 2015
3939432
Rename StreamingTestWaiter to BatchCounter
JoshRosen Jan 5, 2015
db26c3a
Use standard timeout in ScalaTest `eventually` blocks.
JoshRosen Jan 5, 2015
1cc689f
ConcurrentHashMap -> SynchronizedMap
JoshRosen Jan 5, 2015
0b9c252
Fix some ManualClock usage problems.
JoshRosen Jan 6, 2015
8340bd0
Use set comparisons for output.
JoshRosen Jan 6, 2015
e4494f4
Address a potential race when setting file modification times
JoshRosen Jan 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.reflect.ClassTag
Expand Down Expand Up @@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {

// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock

// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L

/*
* Make sure that the information of files selected in the last few batches are remembered.
Expand All @@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
remember(durationToRemember)

// Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]]
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]

// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
Expand Down Expand Up @@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = System.currentTimeMillis
lastNewFileFindingTime = clock.currentTime()

// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
Expand All @@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
Expand Down Expand Up @@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {

var time = 0L
private var time = 0L

def currentTime() = time
def currentTime() = this.synchronized {
time
}

def setTime(timeToSet: Long) = {
this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
assert(clock.time === Seconds(10).milliseconds)
assert(clock.currentTime() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
Expand Down
Loading