@@ -206,28 +206,28 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
206206 val numTotalRecords = numThreads * numRecordsPerThread
207207 val testReceiver = new MultiThreadTestReceiver (numThreads, numRecordsPerThread)
208208 MultiThreadTestReceiver .haveAllThreadsFinished = false
209-
210- // set up the network stream using the test receiver
211- val ssc = new StreamingContext (conf, batchDuration)
212- val networkStream = ssc.receiverStream[Int ](testReceiver)
213- val countStream = networkStream.count
214209 val outputBuffer = new ArrayBuffer [Seq [Long ]] with SynchronizedBuffer [Seq [Long ]]
215- val outputStream = new TestOutputStream (countStream, outputBuffer)
216210 def output : ArrayBuffer [Long ] = outputBuffer.flatMap(x => x)
217- outputStream.register()
218- ssc.start()
219-
220- // Let the data from the receiver be received
221- val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
222- val startTime = System .currentTimeMillis()
223- while ((! MultiThreadTestReceiver .haveAllThreadsFinished || output.sum < numTotalRecords) &&
224- System .currentTimeMillis() - startTime < 5000 ) {
225- Thread .sleep(100 )
226- clock.advance(batchDuration.milliseconds)
211+
212+ // set up the network stream using the test receiver
213+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
214+ val networkStream = ssc.receiverStream[Int ](testReceiver)
215+ val countStream = networkStream.count
216+
217+ val outputStream = new TestOutputStream (countStream, outputBuffer)
218+ outputStream.register()
219+ ssc.start()
220+
221+ // Let the data from the receiver be received
222+ val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
223+ val startTime = System .currentTimeMillis()
224+ while ((! MultiThreadTestReceiver .haveAllThreadsFinished || output.sum < numTotalRecords) &&
225+ System .currentTimeMillis() - startTime < 5000 ) {
226+ Thread .sleep(100 )
227+ clock.advance(batchDuration.milliseconds)
228+ }
229+ Thread .sleep(1000 )
227230 }
228- Thread .sleep(1000 )
229- logInfo(" Stopping context" )
230- ssc.stop()
231231
232232 // Verify whether data received was as expected
233233 logInfo(" --------------------------------" )
@@ -239,30 +239,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
239239 }
240240
241241 test(" queue input stream - oneAtATime = true" ) {
242- // Set up the streaming context and input streams
243- val ssc = new StreamingContext (conf, batchDuration)
244- val queue = new SynchronizedQueue [RDD [String ]]()
245- val queueStream = ssc.queueStream(queue, oneAtATime = true )
246- val outputBuffer = new ArrayBuffer [Seq [String ]] with SynchronizedBuffer [Seq [String ]]
247- val outputStream = new TestOutputStream (queueStream, outputBuffer)
248- def output : ArrayBuffer [Seq [String ]] = outputBuffer.filter(_.size > 0 )
249- outputStream.register()
250- ssc.start()
251-
252- // Setup data queued into the stream
253- val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
254242 val input = Seq (" 1" , " 2" , " 3" , " 4" , " 5" )
255243 val expectedOutput = input.map(Seq (_))
244+ val outputBuffer = new ArrayBuffer [Seq [String ]] with SynchronizedBuffer [Seq [String ]]
245+ def output : ArrayBuffer [Seq [String ]] = outputBuffer.filter(_.size > 0 )
256246
257- val inputIterator = input.toIterator
258- for (i <- 0 until input.size) {
259- // Enqueue more than 1 item per tick but they should dequeue one at a time
260- inputIterator.take(2 ).foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
261- clock.advance(batchDuration.milliseconds)
247+ // Set up the streaming context and input streams
248+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
249+ val queue = new SynchronizedQueue [RDD [String ]]()
250+ val queueStream = ssc.queueStream(queue, oneAtATime = true )
251+ val outputStream = new TestOutputStream (queueStream, outputBuffer)
252+ outputStream.register()
253+ ssc.start()
254+
255+ // Setup data queued into the stream
256+ val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
257+
258+ val inputIterator = input.toIterator
259+ for (i <- 0 until input.size) {
260+ // Enqueue more than 1 item per tick but they should dequeue one at a time
261+ inputIterator.take(2 ).foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
262+ clock.advance(batchDuration.milliseconds)
263+ }
264+ Thread .sleep(1000 )
262265 }
263- Thread .sleep(1000 )
264- logInfo(" Stopping context" )
265- ssc.stop()
266266
267267 // Verify whether data received was as expected
268268 logInfo(" --------------------------------" )
@@ -282,33 +282,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
282282 }
283283
284284 test(" queue input stream - oneAtATime = false" ) {
285- // Set up the streaming context and input streams
286- val ssc = new StreamingContext (conf, batchDuration)
287- val queue = new SynchronizedQueue [RDD [String ]]()
288- val queueStream = ssc.queueStream(queue, oneAtATime = false )
289285 val outputBuffer = new ArrayBuffer [Seq [String ]] with SynchronizedBuffer [Seq [String ]]
290- val outputStream = new TestOutputStream (queueStream, outputBuffer)
291286 def output : ArrayBuffer [Seq [String ]] = outputBuffer.filter(_.size > 0 )
292- outputStream.register()
293- ssc.start()
294-
295- // Setup data queued into the stream
296- val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
297287 val input = Seq (" 1" , " 2" , " 3" , " 4" , " 5" )
298288 val expectedOutput = Seq (Seq (" 1" , " 2" , " 3" ), Seq (" 4" , " 5" ))
299289
300- // Enqueue the first 3 items (one by one), they should be merged in the next batch
301- val inputIterator = input.toIterator
302- inputIterator.take(3 ).foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
303- clock.advance(batchDuration.milliseconds)
304- Thread .sleep(1000 )
305-
306- // Enqueue the remaining items (again one by one), merged in the final batch
307- inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
308- clock.advance(batchDuration.milliseconds)
309- Thread .sleep(1000 )
310- logInfo(" Stopping context" )
311- ssc.stop()
290+ // Set up the streaming context and input streams
291+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
292+ val queue = new SynchronizedQueue [RDD [String ]]()
293+ val queueStream = ssc.queueStream(queue, oneAtATime = false )
294+ val outputStream = new TestOutputStream (queueStream, outputBuffer)
295+ outputStream.register()
296+ ssc.start()
297+
298+ // Setup data queued into the stream
299+ val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
300+
301+ // Enqueue the first 3 items (one by one), they should be merged in the next batch
302+ val inputIterator = input.toIterator
303+ inputIterator.take(3 ).foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
304+ clock.advance(batchDuration.milliseconds)
305+ Thread .sleep(1000 )
306+
307+ // Enqueue the remaining items (again one by one), merged in the final batch
308+ inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq (i)))
309+ clock.advance(batchDuration.milliseconds)
310+ Thread .sleep(1000 )
311+ }
312312
313313 // Verify whether data received was as expected
314314 logInfo(" --------------------------------" )
0 commit comments