Skip to content

Commit b245217

Browse files
committed
Remove several (but not all) sleep() calls in InputStreamSuite.
1 parent 3db335f commit b245217

File tree

2 files changed

+12
-14
lines changed

2 files changed

+12
-14
lines changed

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import akka.actor.Actor
21-
import akka.actor.Props
22-
import akka.util.ByteString
23-
2420
import java.io.{File, BufferedWriter, OutputStreamWriter}
2521
import java.net.{InetSocketAddress, SocketException, ServerSocket}
2622
import java.nio.charset.Charset
@@ -54,6 +50,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
5450

5551
// Set up the streaming context and input streams
5652
val ssc = new StreamingContext(conf, batchDuration)
53+
val waiter = new StreamingTestWaiter(ssc)
5754
val networkStream = ssc.socketTextStream(
5855
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
5956
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -65,13 +62,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
6562
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
6663
val input = Seq(1, 2, 3, 4, 5)
6764
val expectedOutput: Seq[Seq[String]] = input.map(i => Seq(i.toString))
68-
Thread.sleep(1000)
6965
for (i <- 0 until input.size) {
7066
testServer.send(input(i).toString + "\n")
71-
Thread.sleep(500)
67+
Thread.sleep(500) // This call is to allow time for the testServer to send the data to Spark
7268
clock.addToTime(batchDuration.milliseconds)
7369
}
74-
Thread.sleep(1000)
70+
waiter.waitForTotalBatchesCompleted(input.size, timeout = 10 seconds)
7571
logInfo("Stopping server")
7672
testServer.stop()
7773
logInfo("Stopping context")
@@ -131,6 +127,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
131127
test("queue input stream - oneAtATime = true") {
132128
// Set up the streaming context and input streams
133129
val ssc = new StreamingContext(conf, batchDuration)
130+
val waiter = new StreamingTestWaiter(ssc)
134131
val queue = new SynchronizedQueue[RDD[String]]()
135132
val queueStream = ssc.queueStream(queue, oneAtATime = true)
136133
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -143,14 +140,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
143140
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
144141
val input = Seq("1", "2", "3", "4", "5")
145142
val expectedOutput = input.map(Seq(_))
146-
//Thread.sleep(1000)
147143
val inputIterator = input.toIterator
148144
for (i <- 0 until input.size) {
149145
// Enqueue more than 1 item per tick but they should dequeue one at a time
150146
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
151147
clock.addToTime(batchDuration.milliseconds)
152148
}
153-
Thread.sleep(1000)
149+
waiter.waitForTotalBatchesCompleted(input.size, timeout = 10 seconds)
150+
154151
logInfo("Stopping context")
155152
ssc.stop()
156153

@@ -160,6 +157,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
160157
test("queue input stream - oneAtATime = false") {
161158
// Set up the streaming context and input streams
162159
val ssc = new StreamingContext(conf, batchDuration)
160+
val waiter = new StreamingTestWaiter(ssc)
163161
val queue = new SynchronizedQueue[RDD[String]]()
164162
val queueStream = ssc.queueStream(queue, oneAtATime = false)
165163
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -177,12 +175,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
177175
val inputIterator = input.toIterator
178176
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
179177
clock.addToTime(batchDuration.milliseconds)
180-
Thread.sleep(1000)
178+
waiter.waitForTotalBatchesCompleted(1, timeout = 10 seconds)
181179

182180
// Enqueue the remaining items (again one by one), merged in the final batch
183181
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
184182
clock.addToTime(batchDuration.milliseconds)
185-
Thread.sleep(1000)
183+
waiter.waitForTotalBatchesCompleted(2, timeout = 10 seconds)
186184
logInfo("Stopping context")
187185
ssc.stop()
188186

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeoutException
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.collection.mutable.SynchronizedBuffer
25-
import scala.concurrent.duration.{Duration => SDuration}
25+
import scala.concurrent.duration.FiniteDuration
2626
import scala.reflect.ClassTag
2727

2828
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -135,10 +135,10 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
135135
*/
136136
def waitForTotalBatchesCompleted(
137137
targetNumBatches: Int,
138-
timeout: SDuration = SDuration.Inf): Unit = this.synchronized {
138+
timeout: FiniteDuration): Unit = this.synchronized {
139139
val startTime = System.nanoTime
140-
def timedOut = timeout < SDuration.Inf && (System.nanoTime - startTime) >= timeout.toNanos
141140
def successful = getNumCompletedBatches >= targetNumBatches
141+
def timedOut = (System.nanoTime - startTime) >= timeout.toNanos
142142
while (!timedOut && !successful) {
143143
this.wait(timeout.toMillis)
144144
}

0 commit comments

Comments
 (0)