1818package org .apache .spark .streaming .mqtt
1919
2020import java .net .{ServerSocket , URI }
21- import java .util .concurrent .{CountDownLatch , TimeUnit }
2221
2322import scala .language .postfixOps
2423
@@ -28,10 +27,6 @@ import org.apache.commons.lang3.RandomUtils
2827import org .eclipse .paho .client .mqttv3 ._
2928import org .eclipse .paho .client .mqttv3 .persist .MqttDefaultFilePersistence
3029
31- import org .apache .spark .streaming .StreamingContext
32- import org .apache .spark .streaming .api .java .JavaStreamingContext
33- import org .apache .spark .streaming .scheduler .StreamingListener
34- import org .apache .spark .streaming .scheduler .StreamingListenerReceiverStarted
3530import org .apache .spark .util .Utils
3631import org .apache .spark .{Logging , SparkConf }
3732
@@ -47,8 +42,6 @@ private class MQTTTestUtils extends Logging {
4742 private var broker : BrokerService = _
4843 private var connector : TransportConnector = _
4944
50- private var receiverStartedLatch = new CountDownLatch (1 )
51-
5245 def brokerUri : String = {
5346 s " $brokerHost: $brokerPort"
5447 }
@@ -73,7 +66,6 @@ private class MQTTTestUtils extends Logging {
7366 connector = null
7467 }
7568 Utils .deleteRecursively(persistenceDir)
76- receiverStartedLatch = null
7769 }
7870
7971 private def findFreePort (): Int = {
@@ -114,38 +106,4 @@ private class MQTTTestUtils extends Logging {
114106 }
115107 }
116108
117- /**
118- * Call this one before starting StreamingContext so that we won't miss the
119- * StreamingListenerReceiverStarted event.
120- */
121- def registerStreamingListener (jssc : JavaStreamingContext ): Unit = {
122- registerStreamingListener(jssc.ssc)
123- }
124-
125- /**
126- * Call this one before starting StreamingContext so that we won't miss the
127- * StreamingListenerReceiverStarted event.
128- */
129- def registerStreamingListener (ssc : StreamingContext ): Unit = {
130- ssc.addStreamingListener(new StreamingListener {
131- override def onReceiverStarted (receiverStarted : StreamingListenerReceiverStarted ) {
132- receiverStartedLatch.countDown()
133- }
134- })
135- }
136-
137- /**
138- * Block until at least one receiver has started or timeout occurs.
139- */
140- def waitForReceiverToStart (jssc : JavaStreamingContext ): Unit = {
141- waitForReceiverToStart(jssc.ssc)
142- }
143-
144- /**
145- * Block until at least one receiver has started or timeout occurs.
146- */
147- def waitForReceiverToStart (ssc : StreamingContext ): Unit = {
148- assert(
149- receiverStartedLatch.await(10 , TimeUnit .SECONDS ), " Timeout waiting for receiver to start." )
150- }
151109}
0 commit comments