Skip to content

Commit 12635b4

Browse files
committed
Remove sleep() in runStreamsWithPartitions(); use streaming’s Duration class.
1 parent b245217 commit 12635b4

File tree

3 files changed

+12
-14
lines changed

3 files changed

+12
-14
lines changed

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.io.File
2121
import java.nio.charset.Charset
2222

2323
import scala.collection.mutable.ArrayBuffer
24-
import scala.concurrent.duration._
25-
import scala.language.postfixOps
2624
import scala.reflect.ClassTag
2725

2826
import com.google.common.io.Files
@@ -423,7 +421,7 @@ class CheckpointSuite extends TestSuiteBase {
423421
val waiter = new StreamingTestWaiter(ssc)
424422
ssc.start()
425423
// Wait for the last batch before restart to be re-processed:
426-
waiter.waitForTotalBatchesCompleted(1, timeout = 10 seconds)
424+
waiter.waitForTotalBatchesCompleted(1, timeout = Durations.seconds(10))
427425
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
428426
// the first element will be re-processed data of the last batch before restart
429427
verifyOutput(outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), useSet = true)
@@ -441,7 +439,7 @@ class CheckpointSuite extends TestSuiteBase {
441439
logInfo("Manual clock before advancing = " + clock.time)
442440
for (i <- 1 to numBatches) {
443441
clock.addToTime(batchDuration.milliseconds)
444-
waiter.waitForTotalBatchesCompleted(i, timeout = 10 seconds)
442+
waiter.waitForTotalBatchesCompleted(i, timeout = Durations.seconds(10))
445443
}
446444
logInfo("Manual clock after advancing = " + clock.time)
447445

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
6767
Thread.sleep(500) // This call is to allow time for the testServer to send the data to Spark
6868
clock.addToTime(batchDuration.milliseconds)
6969
}
70-
waiter.waitForTotalBatchesCompleted(input.size, timeout = 10 seconds)
70+
waiter.waitForTotalBatchesCompleted(input.size, timeout = Durations.seconds(10))
7171
logInfo("Stopping server")
7272
testServer.stop()
7373
logInfo("Stopping context")
@@ -146,7 +146,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
146146
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
147147
clock.addToTime(batchDuration.milliseconds)
148148
}
149-
waiter.waitForTotalBatchesCompleted(input.size, timeout = 10 seconds)
149+
waiter.waitForTotalBatchesCompleted(input.size, timeout = Durations.seconds(10))
150150

151151
logInfo("Stopping context")
152152
ssc.stop()
@@ -175,12 +175,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
175175
val inputIterator = input.toIterator
176176
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
177177
clock.addToTime(batchDuration.milliseconds)
178-
waiter.waitForTotalBatchesCompleted(1, timeout = 10 seconds)
178+
waiter.waitForTotalBatchesCompleted(1, timeout = Durations.seconds(10))
179179

180180
// Enqueue the remaining items (again one by one), merged in the final batch
181181
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
182182
clock.addToTime(batchDuration.milliseconds)
183-
waiter.waitForTotalBatchesCompleted(2, timeout = 10 seconds)
183+
waiter.waitForTotalBatchesCompleted(2, timeout = Durations.seconds(10))
184184
logInfo("Stopping context")
185185
ssc.stop()
186186

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeoutException
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.collection.mutable.SynchronizedBuffer
25-
import scala.concurrent.duration.FiniteDuration
2625
import scala.reflect.ClassTag
2726

2827
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -135,12 +134,12 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
135134
*/
136135
def waitForTotalBatchesCompleted(
137136
targetNumBatches: Int,
138-
timeout: FiniteDuration): Unit = this.synchronized {
139-
val startTime = System.nanoTime
137+
timeout: Duration): Unit = this.synchronized {
138+
val startTime = System.currentTimeMillis()
140139
def successful = getNumCompletedBatches >= targetNumBatches
141-
def timedOut = (System.nanoTime - startTime) >= timeout.toNanos
140+
def timedOut = (System.currentTimeMillis() - startTime) >= timeout.milliseconds
142141
while (!timedOut && !successful) {
143-
this.wait(timeout.toMillis)
142+
this.wait(timeout.milliseconds)
144143
}
145144
if (!successful && timedOut) {
146145
throw new TimeoutException(s"Waited for $targetNumBatches completed batches, but only" +
@@ -332,6 +331,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
332331
val output = outputStream.output
333332

334333
try {
334+
val waiter = new StreamingTestWaiter(ssc)
335335
// Start computation
336336
ssc.start()
337337

@@ -342,7 +342,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
342342
for (i <- 1 to numBatches) {
343343
logInfo("Actually waiting for " + batchDuration)
344344
clock.addToTime(batchDuration.milliseconds)
345-
Thread.sleep(batchDuration.milliseconds)
345+
waiter.waitForTotalBatchesCompleted(i, timeout = batchDuration * 5)
346346
}
347347
} else {
348348
clock.addToTime(numBatches * batchDuration.milliseconds)

0 commit comments

Comments
 (0)