1717
1818package org .apache .spark .streaming
1919
20+ import akka .actor .Actor
21+ import akka .actor .Props
22+ import akka .util .ByteString
23+
2024import java .io .{File , BufferedWriter , OutputStreamWriter }
2125import java .net .{InetSocketAddress , SocketException , ServerSocket }
2226import java .nio .charset .Charset
@@ -50,7 +54,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
5054
5155 // Set up the streaming context and input streams
5256 val ssc = new StreamingContext (conf, batchDuration)
53- val waiter = new StreamingTestWaiter (ssc)
5457 val networkStream = ssc.socketTextStream(
5558 " localhost" , testServer.port, StorageLevel .MEMORY_AND_DISK )
5659 val outputBuffer = new ArrayBuffer [Seq [String ]] with SynchronizedBuffer [Seq [String ]]
@@ -62,12 +65,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
6265 val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
6366 val input = Seq (1 , 2 , 3 , 4 , 5 )
6467 val expectedOutput : Seq [Seq [String ]] = input.map(i => Seq (i.toString))
68+ Thread .sleep(1000 )
6569 for (i <- 0 until input.size) {
6670 testServer.send(input(i).toString + " \n " )
67- Thread .sleep(500 ) // This sleep is to wait for `testServer` to send the data to Spark
71+ Thread .sleep(500 )
6872 clock.addToTime(batchDuration.milliseconds)
69- waiter.waitForBatchToComplete()
7073 }
74+ Thread .sleep(1000 )
7175 logInfo(" Stopping server" )
7276 testServer.stop()
7377 logInfo(" Stopping context" )
0 commit comments