Skip to content

Commit db26c3a

Browse files
committed
Use standard timeout in ScalaTest eventually blocks.
1 parent 3939432 commit db26c3a

File tree

3 files changed

+11
-14
lines changed

3 files changed

+11
-14
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,19 +373,19 @@ class CheckpointSuite extends TestSuiteBase {
373373
clock.addToTime(batchDuration.milliseconds)
374374
if (i != 3) {
375375
// Since we want to shut down while the 3rd batch is processing
376-
eventually(timeout(batchDuration * 5)) {
376+
eventually(eventuallyTimeout) {
377377
assert(batchCounter.getNumCompletedBatches === i)
378378
}
379379
}
380380
}
381381
clock.addToTime(batchDuration.milliseconds)
382-
eventually(timeout(batchDuration * 5)) {
382+
eventually(eventuallyTimeout) {
383383
// Wait until all files have been recorded and all batches have started
384384
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
385385
}
386386
// Wait for a checkpoint to be written
387387
val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
388-
eventually(timeout(batchDuration * 5)) {
388+
eventually(eventuallyTimeout) {
389389
assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 5)
390390
}
391391
ssc.stop()
@@ -424,7 +424,7 @@ class CheckpointSuite extends TestSuiteBase {
424424
for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
425425
writeFile(i, clock)
426426
clock.addToTime(batchDuration.milliseconds)
427-
eventually(timeout(batchDuration * 5)) {
427+
eventually(eventuallyTimeout) {
428428
assert(batchCounter.getNumCompletedBatches === index + 1)
429429
}
430430
}

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
263263
assert(file.setLastModified(clock.currentTime()))
264264
assert(file.lastModified === clock.currentTime)
265265
logInfo("Created file " + file)
266-
eventually(timeout(batchDuration * 5)) {
266+
eventually(eventuallyTimeout) {
267267
assert(batchCounter.getNumCompletedBatches === i)
268268
}
269269
}

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import scala.language.implicitConversions
2525
import scala.reflect.ClassTag
2626

2727
import org.scalatest.{BeforeAndAfter, FunSuite}
28-
import org.scalatest.time.{Span, Milliseconds => ScalaTestMilliseconds}
28+
import org.scalatest.time.{Span, Seconds => ScalaTestSeconds}
29+
import org.scalatest.concurrent.Eventually.timeout
30+
import org.scalatest.concurrent.PatienceConfiguration
2931

3032
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
3133
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchStarted, StreamingListenerBatchCompleted, StreamingListener}
@@ -179,6 +181,9 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
179181
.setMaster(master)
180182
.setAppName(framework)
181183

184+
// Timeout for use in ScalaTest `eventually` blocks
185+
val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
186+
182187
// Default before function for any streaming test suite. Override this
183188
// if you want to add your stuff to "before" (i.e., don't call before { } )
184189
def beforeFunction() {
@@ -200,14 +205,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
200205
before(beforeFunction)
201206
after(afterFunction)
202207

203-
/**
204-
* Implicit conversion which allows streaming Durations to be used with ScalaTest methods,
205-
* such as `eventually`.
206-
*/
207-
implicit def streamingDurationToScalatestSpan(duration: Duration): Span = {
208-
Span(duration.milliseconds, ScalaTestMilliseconds)
209-
}
210-
211208
/**
212209
* Run a block of code with the given StreamingContext and automatically
213210
* stop the context when the block completes or when an exception is thrown.

0 commit comments

Comments
 (0)