From 46036bf683632e03f970de20f7bcd17b5369d5dc Mon Sep 17 00:00:00 2001 From: uncleGen Date: Mon, 16 Jan 2017 17:14:32 +0800 Subject: [PATCH 1/3] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs --- .../org/apache/spark/streaming/DStreamGraph.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736ee5101..69415ad8b207 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -112,12 +112,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) - val jobs = this.synchronized { - outputStreams.flatMap { outputStream => - val jobOption = outputStream.generateJob(time) - jobOption.foreach(_.setCallSite(outputStream.creationSite)) - jobOption - } + val jobs = getOutputStreams().flatMap { outputStream => + val jobOption = outputStream.generateJob(time) + jobOption.foreach(_.setCallSite(outputStream.creationSite)) + jobOption } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs From eaa7b15f19711b27e628cfe366fa819a46d0e450 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 17 Jan 2017 16:24:07 +0800 Subject: [PATCH 2/3] update --- .../apache/spark/streaming/DStreamGraph.scala | 23 ++++++++++++------- .../ui/StreamingJobProgressListener.scala | 8 +++---- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 69415ad8b207..319986ccc10d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() + val inputStreamNameAndID = new ArrayBuffer[(String, Int)]() + var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null + var numReceivers: Int = 0 def start(time: Time) { this.synchronized { @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) - outputStreams.foreach(_.validateAtStart) + outputStreams.foreach(_.validateAtStart()) + numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) + inputStreams.foreach(is => inputStreamNameAndID.+=((is.name, is.id))) inputStreams.par.foreach(_.start()) } } @@ -106,16 +111,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } - def getInputStreamName(streamId: Int): Option[String] = synchronized { - inputStreams.find(_.id == streamId).map(_.name) - } + def getReceiverNumber: Int = numReceivers + + def getInputStreamNameAndID: ArrayBuffer[(String, Int)] = inputStreamNameAndID def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) - val jobs = getOutputStreams().flatMap { outputStream => - val jobOption = outputStream.generateJob(time) - jobOption.foreach(_.setCallSite(outputStream.creationSite)) - jobOption + val jobs = this.synchronized { + outputStreams.flatMap { outputStream => + val jobOption = outputStream.generateJob(time) + jobOption.foreach(_.setCallSite(outputStream.creationSite)) + jobOption + } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 95f582106c71..0589fe02aee6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def numInactiveReceivers: Int = { - ssc.graph.getReceiverInputStreams().length - numActiveReceivers + ssc.graph.getReceiverNumber - numActiveReceivers } def numTotalCompletedBatches: Long = synchronized { @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { - completedBatchUIData.toSeq + completedBatchUIData } def streamName(streamId: Int): Option[String] = { - ssc.graph.getInputStreamName(streamId) + ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1) } /** * Return all InputDStream Ids */ - def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id) + def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2) /** * Return all of the record rates for each InputDStream in each batch. The key of the return value From e51623c007b9faf2ba4fe7c92ad138b0c9c2a8c1 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 18 Jan 2017 10:11:47 +0800 Subject: [PATCH 3/3] update --- .../org/apache/spark/streaming/DStreamGraph.scala | 10 +++++----- .../streaming/ui/StreamingJobProgressListener.scala | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 319986ccc10d..dce2028b4887 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -31,7 +31,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() - val inputStreamNameAndID = new ArrayBuffer[(String, Int)]() + @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil var rememberDuration: Duration = null var checkpointInProgress = false @@ -39,7 +39,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null - var numReceivers: Int = 0 + @volatile private var numReceivers: Int = 0 def start(time: Time) { this.synchronized { @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart()) numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) - inputStreams.foreach(is => inputStreamNameAndID.+=((is.name, is.id))) + inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)) inputStreams.par.foreach(_.start()) } } @@ -111,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } - def getReceiverNumber: Int = numReceivers + def getNumReceivers: Int = numReceivers - def getInputStreamNameAndID: ArrayBuffer[(String, Int)] = inputStreamNameAndID + def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 0589fe02aee6..ed4c1e484efd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def numInactiveReceivers: Int = { - ssc.graph.getReceiverNumber - numActiveReceivers + ssc.graph.getNumReceivers - numActiveReceivers } def numTotalCompletedBatches: Long = synchronized { @@ -197,7 +197,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { - completedBatchUIData + completedBatchUIData.toIndexedSeq } def streamName(streamId: Int): Option[String] = {