Skip to content

Commit a95ddc4

Browse files
committed
Modify FileInputDStream to use Clock class.
1 parent 29fabb1 commit a95ddc4

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
7474
newFilesOnly: Boolean = true)
7575
extends InputDStream[(K, V)](ssc_) {
7676

77+
// This is a def so that it works during checkpoint recovery:
78+
private def clock = ssc.scheduler.clock
79+
7780
// Data to be saved as part of the streaming checkpoints
7881
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
7982

8083
// Initial ignore threshold based on which old, existing files in the directory (at the time of
8184
// starting the streaming application) will be ignored or considered
82-
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
85+
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L
8386

8487
/*
8588
* Make sure that the information of files selected in the last few batches are remembered.
@@ -151,7 +154,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
151154
*/
152155
private def findNewFiles(currentTime: Long): Array[String] = {
153156
try {
154-
lastNewFileFindingTime = System.currentTimeMillis
157+
lastNewFileFindingTime = clock.currentTime()
155158

156159
// Calculate ignore threshold
157160
val modTimeIgnoreThreshold = math.max(
@@ -164,7 +167,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
164167
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
165168
}
166169
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
167-
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
170+
val timeTaken = clock.currentTime() - lastNewFileFindingTime
168171
logInfo("Finding new files took " + timeTaken + " ms")
169172
logDebug("# cached file times = " + fileToModTime.size)
170173
if (timeTaken > slideDuration.milliseconds) {

0 commit comments

Comments
 (0)