Skip to content

Commit d4f2d87

Browse files
committed
Refactor file input stream tests to not rely on SystemClock.
1 parent dda1403 commit d4f2d87

File tree

1 file changed

+31
-30
lines changed

1 file changed

+31
-30
lines changed

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
2828
import java.util.concurrent.atomic.AtomicInteger
2929

3030
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
31-
import scala.concurrent.duration._
3231
import scala.language.postfixOps
3332

3433
import com.google.common.io.Files
3534
import org.scalatest.BeforeAndAfter
36-
import org.scalatest.concurrent.Eventually._
3735

3836
import org.apache.spark.Logging
3937
import org.apache.spark.storage.StorageLevel
@@ -234,45 +232,48 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
234232
}
235233

236234
def testFileStream(newFilesOnly: Boolean) {
237-
var ssc: StreamingContext = null
238235
val testDir: File = null
239236
try {
240237
val testDir = Utils.createTempDir()
238+
// Create a file that exists before the StreamingContext is created:
241239
val existingFile = new File(testDir, "0")
242240
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
241+
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
243242

244-
Thread.sleep(1000)
245243
// Set up the streaming context and input streams
246-
val newConf = conf.clone.set(
247-
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
248-
ssc = new StreamingContext(newConf, batchDuration)
249-
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
250-
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
251-
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
252-
val outputStream = new TestOutputStream(fileStream, outputBuffer)
253-
outputStream.register()
254-
ssc.start()
255-
256-
// Create files in the directory
257-
val input = Seq(1, 2, 3, 4, 5)
258-
input.foreach { i =>
259-
Thread.sleep(batchDuration.milliseconds)
260-
val file = new File(testDir, i.toString)
261-
Files.write(i + "\n", file, Charset.forName("UTF-8"))
262-
logInfo("Created file " + file)
263-
}
244+
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
245+
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
246+
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
247+
clock.setTime(existingFile.lastModified + 1000)
248+
val waiter = new StreamingTestWaiter(ssc)
249+
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
250+
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
251+
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
252+
val outputStream = new TestOutputStream(fileStream, outputBuffer)
253+
outputStream.register()
254+
ssc.start()
255+
256+
// Over time, create files in the directory
257+
val input = Seq(1, 2, 3, 4, 5)
258+
input.foreach { i =>
259+
clock.addToTime(batchDuration.milliseconds)
260+
val file = new File(testDir, i.toString)
261+
Files.write(i + "\n", file, Charset.forName("UTF-8"))
262+
assert(file.setLastModified(clock.currentTime()))
263+
assert(file.lastModified === clock.currentTime)
264+
logInfo("Created file " + file)
265+
waiter.waitForTotalBatchesCompleted(i, timeout = batchDuration * 5)
266+
}
264267

265-
// Verify that all the files have been read
266-
val expectedOutput = if (newFilesOnly) {
267-
input.map(_.toString).toSet
268-
} else {
269-
(Seq(0) ++ input).map(_.toString).toSet
270-
}
271-
eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) {
268+
// Verify that all the files have been read
269+
val expectedOutput = if (newFilesOnly) {
270+
input.map(_.toString).toSet
271+
} else {
272+
(Seq(0) ++ input).map(_.toString).toSet
273+
}
272274
assert(outputBuffer.flatten.toSet === expectedOutput)
273275
}
274276
} finally {
275-
if (ssc != null) ssc.stop()
276277
if (testDir != null) Utils.deleteRecursively(testDir)
277278
}
278279
}

0 commit comments

Comments
 (0)