1717
1818package org .apache .spark .streaming
1919
20- import akka .actor .Actor
21- import akka .actor .Props
22- import akka .util .ByteString
23-
2420import java .io .{File , BufferedWriter , OutputStreamWriter }
2521import java .net .{InetSocketAddress , SocketException , ServerSocket }
2622import java .nio .charset .Charset
@@ -54,6 +50,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
5450
5551 // Set up the streaming context and input streams
5652 val ssc = new StreamingContext (conf, batchDuration)
53+ val waiter = new StreamingTestWaiter (ssc)
5754 val networkStream = ssc.socketTextStream(
5855 " localhost" , testServer.port, StorageLevel .MEMORY_AND_DISK )
5956 val outputBuffer = new ArrayBuffer [Seq [String ]] with SynchronizedBuffer [Seq [String ]]
@@ -66,13 +63,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
6663 val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
6764 val input = Seq (1 , 2 , 3 , 4 , 5 )
6865 val expectedOutput = input.map(_.toString)
69- Thread .sleep(1000 )
7066 for (i <- 0 until input.size) {
7167 testServer.send(input(i).toString + " \n " )
72- Thread .sleep(500 )
68+ Thread .sleep(500 ) // This sleep is to wait for `testServer` to send the data to Spark
7369 clock.addToTime(batchDuration.milliseconds)
70+ waiter.waitForBatchToComplete()
7471 }
75- Thread .sleep(1000 )
7672 logInfo(" Stopping server" )
7773 testServer.stop()
7874 logInfo(" Stopping context" )
0 commit comments