@@ -311,7 +311,6 @@ class CheckpointSuite extends TestSuiteBase {
311311 testCheckpointedOperation(input, operation, output, 7 )
312312 }
313313
314-
315314 // This tests whether file input stream remembers what files were seen before
316315 // the master failure and uses them again to process a large window operation.
317316 // It also tests whether batches, whose processing was incomplete due to the
@@ -348,9 +347,14 @@ class CheckpointSuite extends TestSuiteBase {
348347 val fileStream = ssc.textFileStream(testDir.toString)
349348 // Make value 3 take a large time to process, to ensure that the driver
350349 // shuts down in the middle of processing the 3rd batch
350+ TaskControlFlags .taskThreeShouldBlockIndefinitely = true
351351 val mappedStream = fileStream.map(s => {
352352 val i = s.toInt
353- if (i == 3 ) Thread .sleep(4000 )
353+ if (i == 3 ) {
354+ while (TaskControlFlags .taskThreeShouldBlockIndefinitely) {
355+ Thread .sleep(Long .MaxValue )
356+ }
357+ }
354358 i
355359 })
356360
@@ -389,6 +393,7 @@ class CheckpointSuite extends TestSuiteBase {
389393 }
390394
391395 // The original StreamingContext has now been stopped.
396+ TaskControlFlags .taskThreeShouldBlockIndefinitely = false
392397
393398 // Create files while the streaming driver is down
394399 for (i <- Seq (4 , 5 , 6 )) {
@@ -523,3 +528,8 @@ class CheckpointSuite extends TestSuiteBase {
523528 outputStream.output.map(_.flatten)
524529 }
525530}
531+
532+ // Global object with flags for controlling tasks' behavior.
533+ private object TaskControlFlags extends Serializable {
534+ var taskThreeShouldBlockIndefinitely : Boolean = true
535+ }
0 commit comments