Skip to content

Commit a431b84

Browse files
committed
Fix streaming test failures
1 parent 8f5ae53 commit a431b84

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
3737
import org.apache.spark.streaming.dstream._
3838
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
3939
import org.apache.spark.streaming.scheduler._
40-
import org.apache.spark.streaming.ui.StreamingTab
40+
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
4141
import org.apache.spark.util.MetadataCleaner
4242

4343
/**
@@ -158,7 +158,14 @@ class StreamingContext private[streaming] (
158158

159159
private[streaming] val waiter = new ContextWaiter
160160

161-
private[streaming] val uiTab = new StreamingTab(this)
161+
private[streaming] val progressListener = new StreamingJobProgressListener(this)
162+
163+
private[streaming] val uiTab: Option[StreamingTab] =
164+
if (conf.getBoolean("spark.ui.enabled", true)) {
165+
Some(new StreamingTab(this))
166+
} else {
167+
None
168+
}
162169

163170
/** Register streaming source to metrics system */
164171
private val streamingSource = new StreamingSource(this)

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
2626
override val metricRegistry = new MetricRegistry
2727
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
2828

29-
private val streamingListener = ssc.uiTab.listener
29+
private val streamingListener = ssc.progressListener
3030

3131
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
3232
defaultValue: T) {

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private[spark] class StreamingTab(ssc: StreamingContext)
3131
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
3232

3333
val parent = getSparkUI(ssc)
34-
val listener = new StreamingJobProgressListener(ssc)
34+
val listener = ssc.progressListener
3535

3636
ssc.addStreamingListener(listener)
3737
attachPage(new StreamingPage(this))

0 commit comments

Comments
 (0)