Skip to content

Commit e4494f4

Browse files
committed
Address a potential race when setting file modification times
1 parent 8340bd0 commit e4494f4

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ class CheckpointSuite extends TestSuiteBase {
320320
val testDir = Utils.createTempDir()
321321
val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
322322

323+
/**
324+
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
325+
* modification time to `clock`'s current time.
326+
*/
323327
def writeFile(i: Int, clock: ManualClock): Unit = {
324328
val file = new File(testDir, i.toString)
325329
Files.write(i + "\n", file, Charsets.UTF_8)
@@ -329,6 +333,9 @@ class CheckpointSuite extends TestSuiteBase {
329333
assert(file.lastModified() === clock.currentTime())
330334
}
331335

336+
/**
337+
* Returns ids that identify which files which have been recorded by the file input stream.
338+
*/
332339
def recordedFiles(ssc: StreamingContext): Seq[Int] = {
333340
val fileInputDStream =
334341
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
@@ -369,6 +376,8 @@ class CheckpointSuite extends TestSuiteBase {
369376
// Create files and advance manual clock to process them
370377
for (i <- Seq(1, 2, 3)) {
371378
writeFile(i, clock)
379+
// Advance the clock after creating the file to avoid a race when
380+
// setting its modification time
372381
clock.addToTime(batchDuration.milliseconds)
373382
if (i != 3) {
374383
// Since we want to shut down while the 3rd batch is processing
@@ -399,6 +408,8 @@ class CheckpointSuite extends TestSuiteBase {
399408
// Create files while the streaming driver is down
400409
for (i <- Seq(4, 5, 6)) {
401410
writeFile(i, clock)
411+
// Advance the clock after creating the file to avoid a race when
412+
// setting its modification time
402413
clock.addToTime(batchDuration.milliseconds)
403414
}
404415

@@ -428,6 +439,8 @@ class CheckpointSuite extends TestSuiteBase {
428439
}
429440
for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
430441
writeFile(i, clock)
442+
// Advance the clock after creating the file to avoid a race when
443+
// setting its modification time
431444
clock.addToTime(batchDuration.milliseconds)
432445
eventually(eventuallyTimeout) {
433446
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
235235
def testFileStream(newFilesOnly: Boolean) {
236236
val testDir: File = null
237237
try {
238+
val batchDuration = Seconds(2)
238239
val testDir = Utils.createTempDir()
239240
// Create a file that exists before the StreamingContext is created:
240241
val existingFile = new File(testDir, "0")
@@ -245,7 +246,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
245246
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
246247
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
247248
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
248-
clock.setTime(existingFile.lastModified + 1000)
249+
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
249250
val batchCounter = new BatchCounter(ssc)
250251
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
251252
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
@@ -254,15 +255,21 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
254255
outputStream.register()
255256
ssc.start()
256257

258+
// Advance the clock so that the files are created after StreamingContext starts, but
259+
// not enough to trigger a batch
260+
clock.addToTime(batchDuration.milliseconds / 2)
261+
257262
// Over time, create files in the directory
258263
val input = Seq(1, 2, 3, 4, 5)
259264
input.foreach { i =>
260-
clock.addToTime(batchDuration.milliseconds)
261265
val file = new File(testDir, i.toString)
262266
Files.write(i + "\n", file, Charset.forName("UTF-8"))
263267
assert(file.setLastModified(clock.currentTime()))
264268
assert(file.lastModified === clock.currentTime)
265269
logInfo("Created file " + file)
270+
// Advance the clock after creating the file to avoid a race when
271+
// setting its modification time
272+
clock.addToTime(batchDuration.milliseconds)
266273
eventually(eventuallyTimeout) {
267274
assert(batchCounter.getNumCompletedBatches === i)
268275
}

0 commit comments

Comments
 (0)