From da244f6c8056d66601a3cfb2f38291dc1a4c4c79 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 14:45:08 -0700 Subject: [PATCH 1/5] Fixed socket receiver as well as made receiver state and error visible in the streamign UI. --- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../dstream/SocketInputDStream.scala | 49 +++++++--------- .../spark/streaming/receiver/Receiver.scala | 2 +- .../streaming/receiver/ReceiverMessage.scala | 4 +- .../receiver/ReceiverSupervisor.scala | 58 +++++++++++-------- .../receiver/ReceiverSupervisorImpl.scala | 24 +++++--- .../streaming/scheduler/ReceiverTracker.scala | 8 +-- .../scheduler/StreamingListener.scala | 2 +- .../spark/streaming/ui/ReceiverInfo.scala | 29 ++++++++++ .../ui/StreamingJobProgressListener.scala | 50 ++++++++++++++-- .../spark/streaming/ui/StreamingPage.scala | 20 +++++-- .../streaming/NetworkReceiverSuite.scala | 9 ++- .../streaming/StreamingListenerSuite.scala | 9 +-- 13 files changed, 179 insertions(+), 87 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index cf987a1ab02c..a3d6a1821245 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging { } } if (unit.isEmpty) { - "%d".formatLocal(Locale.US, value) + "%d".formatLocal(Locale.US, value.toInt) } else { "%.1f%s".formatLocal(Locale.US, value, unit) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 1e32727eacfa..8b72bcf20653 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { - var socket: Socket = null - var receivingThread: Thread = null - def onStart() { - receivingThread = new Thread("Socket Receiver") { - override def run() { - connect() - receive() - } - } - receivingThread.start() + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + setDaemon(true) + override def run() { receive() } + }.start() } def onStop() { - if (socket != null) { - socket.close() - } - socket = null - if (receivingThread != null) { - receivingThread.join() - } + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false } - def connect() { + /** Create a socket connection and receive data until receiver is stopped */ + def receive() { + var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) - } catch { - case e: Exception => - restart("Could not connect to " + host + ":" + port, e) - } - } - - def receive() { - try { logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } + logInfo("Stopped receiving") + restart("Retrying connecting to " + host + ":" + port) } catch { - case e: Exception => - restart("Error receiving data from socket", e) + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } finally { + if (socket != null) { + socket.close() + logInfo("Closed socket to " + host + ":" + port) + } } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 44eecf1dd256..299fc28ad2a3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -198,7 +198,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** Check if receiver has been marked for stopping. */ def isStopped(): Boolean = { - !executor.isReceiverStarted() + executor.isReceiverStopped() } /** Get unique identifier of this receiver. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 6ab3ca6ea5fa..bf39d1e891ca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] object StopReceiver extends NetworkReceiverMessage +private[streaming] sealed trait ReceiverMessage +private[streaming] object StopReceiver extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 256b3335e49a..09be3a50d2df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor( /** Report errors. */ def reportError(message: String, throwable: Throwable) - /** Start the executor */ + /** Called when supervisor is started */ + protected def onStart() { } + + /** Called when supervisor is stopped */ + protected def onStop(message: String, error: Option[Throwable]) { } + + /** Called when receiver is started */ + protected def onReceiverStart() { } + + /** Called when receiver is stopped */ + protected def onReceiverStop(message: String, error: Option[Throwable]) { } + + /** Start the supervisor */ def start() { + onStart() startReceiver() } - /** Mark the executor and the receiver for stopping */ + /** Mark the supervisor and the receiver for stopping */ def stop(message: String, error: Option[Throwable]) { stoppingError = error.orNull stopReceiver(message, error) + onStop(message, error) stopLatch.countDown() } @@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor( def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") + receiver.onStart() + logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { @@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor( /** Stop receiver */ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized { try { + logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse("")) receiverState = Stopped + receiver.onStop() + logInfo("Called receiver onStop") onReceiverStop(message, error) } catch { case t: Throwable => @@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor( /** Restart receiver with delay */ def restartReceiver(message: String, error: Option[Throwable], delay: Int) { - logWarning("Restarting receiver with delay " + delay + " ms: " + message, - error.getOrElse(null)) - stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) - future { + Future { + logWarning("Restarting receiver with delay " + delay + " ms: " + message, + error.getOrElse(null)) + stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) logDebug("Sleeping for " + delay) Thread.sleep(delay) - logDebug("Starting receiver again") + logInfo("Starting receiver again") startReceiver() logInfo("Receiver started again") } } - /** Called when the receiver needs to be started */ - protected def onReceiverStart(): Unit = synchronized { - // Call user-defined onStart() - logInfo("Calling receiver onStart") - receiver.onStart() - logInfo("Called receiver onStart") - } - - /** Called when the receiver needs to be stopped */ - protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized { - // Call user-defined onStop() - logInfo("Calling receiver onStop") - receiver.onStop() - logInfo("Called receiver onStop") - } - /** Check if receiver has been marked for stopping */ def isReceiverStarted() = { logDebug("state = " + receiverState) receiverState == Started } - /** Wait the thread until the executor is stopped */ + /** Check if receiver has been marked for stopping */ + def isReceiverStopped() = { + logDebug("state = " + receiverState) + receiverState == Stopped + } + + + /** Wait the thread until the supervisor is stopped */ def awaitTermination() { stopLatch.await() logInfo("Waiting for executor stop is over") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 2a3521bd46ae..ce8316bb1489 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Received stop signal") stop("Stopped by driver", None) } + + def ref = self }), "Receiver-" + streamId + "-" + System.currentTimeMillis()) /** Unique block ids if one wants to add blocks directly */ @@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl( logWarning("Reported error " + message + " - " + error) } - override def onReceiverStart() { + override protected def onStart() { blockGenerator.start() - super.onReceiverStart() } - override def onReceiverStop(message: String, error: Option[Throwable]) { - super.onReceiverStop(message, error) + override protected def onStop(message: String, error: Option[Throwable]) { blockGenerator.stop() + env.actorSystem.stop(actor) + } + + override protected def onReceiverStart() { + val msg = RegisterReceiver( + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) + val future = trackerActor.ask(msg)(askTimeout) + Await.result(future, askTimeout) + } + + override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") val future = trackerActor.ask( @@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Stopped receiver " + streamId) } - override def stop(message: String, error: Option[Throwable]) { - super.stop(message, error) - env.actorSystem.stop(actor) - } - /** Generate new block ID */ private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d2537f6f23d..cec9bde5ece8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -29,12 +29,12 @@ import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, St import org.apache.spark.util.AkkaUtils /** Information about receiver */ -case class ReceiverInfo(streamId: Int, typ: String, location: String) { +private[scheduler] case class ReceiverInfo(streamId: Int, typ: String, location: String) { override def toString = s"$typ-$streamId" } /** Information about blocks received by the receiver */ -case class ReceivedBlockInfo( +private[streaming] case class ReceivedBlockInfo( streamId: Int, blockId: StreamBlockId, numRecords: Long, @@ -130,9 +130,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( - ReceiverInfo(streamId, typ, host) - )) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(streamId, typ, host)) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 9d6ec1fa3354..9de7adab49bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -28,7 +28,7 @@ case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends Streami case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) +case class StreamingListenerReceiverStarted(streamId: Int, typ: String, location: String) extends StreamingListenerEvent case class StreamingListenerReceiverError(streamId: Int, message: String, error: String) extends StreamingListenerEvent diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala new file mode 100644 index 000000000000..3f5e8ae8249c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +/** Class having information about a receiver */ +case class ReceiverInfo( + streamId: Int, + name: String, + var active: Boolean, + var location: String = "", + var lastErrorMessage: String = "", + var lastError: String = "" + ) { +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index bf637c144631..9017af054fa7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -23,12 +23,13 @@ import scala.collection.mutable.{Queue, HashMap} import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted import org.apache.spark.streaming.scheduler.BatchInfo -import org.apache.spark.streaming.scheduler.ReceiverInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution +import org.apache.spark.Logging -private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener { +private[ui] class StreamingJobProgressListener(ssc: StreamingContext) + extends StreamingListener with Logging { private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] @@ -39,9 +40,50 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St val batchDuration = ssc.graph.batchDuration.milliseconds - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { - receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + receiverInfos(receiverStarted.streamId) = ReceiverInfo(receiverStarted.streamId, + s"${receiverStarted.typ}-${receiverStarted.streamId}", true, receiverStarted.location) + } + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + synchronized { + val newReceiverInfo = receiverInfos.get(receiverError.streamId) match { + case Some(oldInfo) => + oldInfo.copy(lastErrorMessage = receiverError.message, lastError = receiverError.error) + case None => + logWarning("No prior receiver info") + ReceiverInfo( + receiverError.streamId, + "", true, "", + lastErrorMessage = receiverError.message, + lastError = receiverError.error + ) + } + receiverInfos(receiverError.streamId) = newReceiverInfo + } + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + synchronized { + val newReceiverInfo = receiverInfos.get(receiverStopped.streamId) match { + case Some(oldInfo) => + oldInfo.copy( + active = false, + lastErrorMessage = receiverStopped.message, + lastError = receiverStopped.error + ) + case None => + logWarning("No prior receiver info") + ReceiverInfo( + receiverStopped.streamId, + "", true, "", + lastErrorMessage = receiverStopped.message, + lastError = receiverStopped.error + ) + } + receiverInfos(receiverStopped.streamId) = newReceiverInfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 8fe1219356cd..451b23e01c99 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab) val table = if (receivedRecordDistributions.size > 0) { val headerRow = Seq( "Receiver", + "Status", "Location", "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", "Minimum rate\n[records/sec]", - "25th percentile rate\n[records/sec]", "Median rate\n[records/sec]", - "75th percentile rate\n[records/sec]", - "Maximum rate\n[records/sec]" + "Maximum rate\n[records/sec]", + "Last Error" ) val dataRows = (0 until listener.numReceivers).map { receiverId => val receiverInfo = listener.receiverInfo(receiverId) - val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") + val receiverActive = receiverInfo.map { info => + if (info.active) "ACTIVE" else "INACTIVE" + }.getOrElse(emptyCell) val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => formatNumber(r.toLong)) + d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong)) }.getOrElse { Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } - Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats + val receiverLastError = listener.receiverInfo(receiverId).map { info => + val msg = s"${info.lastErrorMessage} - ${info.lastError}" + if (msg.size > 100) msg.take(97) + "..." else msg + }.getOrElse(emptyCell) + Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++ + receivedRecordStats ++ Seq(receiverLastError) } Some(listingTable(headerRow, dataRows)) } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index 5c0415ad14eb..d9ac3c91f6e3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import scala.language.postfixOps import org.apache.spark.SparkConf import org.apache.spark.storage.{StorageLevel, StreamBlockId} @@ -92,9 +93,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { // Verify restarting actually stops and starts the receiver receiver.restart("restarting", null, 100) - assert(receiver.isStopped) - assert(receiver.onStopCalled) + eventually(timeout(50 millis), interval(10 millis)) { + // receiver will be stopped async + assert(receiver.isStopped) + assert(receiver.onStopCalled) + } eventually(timeout(1000 millis), interval(100 millis)) { + // receiver will be started async assert(receiver.onStartCalled) assert(executor.isReceiverStarted) assert(receiver.isStarted) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 542c697ae312..f6209f98663d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.ui.ReceiverInfo import org.scalatest.matchers.ShouldMatchers import org.scalatest.concurrent.Eventually._ @@ -74,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { ssc.start() try { eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverInfo should have size 1 - collector.startedReceiverInfo(0).streamId should equal (0) + collector.startedReceiverStreamIds.size should be >= 1 + collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) collector.receiverErrors should have size 1 @@ -107,12 +108,12 @@ class BatchInfoCollector extends StreamingListener { /** Listener that collects information on processed batches */ class ReceiverInfoCollector extends StreamingListener { - val startedReceiverInfo = new ArrayBuffer[ReceiverInfo] + val startedReceiverStreamIds = new ArrayBuffer[Int] val stoppedReceiverStreamIds = new ArrayBuffer[Int]() val receiverErrors = new ArrayBuffer[(Int, String, String)]() override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - startedReceiverInfo += receiverStarted.receiverInfo + startedReceiverStreamIds += receiverStarted.streamId } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { From 5c80919dc1c03bccf997f71136d734de4e0cddfa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 14:50:57 -0700 Subject: [PATCH 2/5] Moved BatchInfo from streaming.scheduler to streaming.ui --- .../scala/org/apache/spark/streaming/scheduler/JobSet.scala | 1 + .../apache/spark/streaming/scheduler/StreamingListener.scala | 1 + .../apache/spark/streaming/{scheduler => ui}/BatchInfo.scala | 3 ++- .../spark/streaming/ui/StreamingJobProgressListener.scala | 1 - .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 5 files changed, 5 insertions(+), 3 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/{scheduler => ui}/BatchInfo.scala (95%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index a69d74362173..449b4bd1f7dd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.streaming.Time +import org.apache.spark.streaming.ui.BatchInfo /** Class representing a set of Jobs * belong to the same batch. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 9de7adab49bc..191f92fc1e79 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue import org.apache.spark.util.Distribution +import org.apache.spark.streaming.ui.BatchInfo /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala rename to streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala index 9c69a2a4e21f..de04b8a30009 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.streaming.scheduler +package org.apache.spark.streaming.ui import org.apache.spark.streaming.Time +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo /** * Class having information on completed batches. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 9017af054fa7..00d0431f88d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -22,7 +22,6 @@ import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.{Queue, HashMap} import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted -import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution import org.apache.spark.Logging diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index f6209f98663d..049d7aba9243 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.ui.ReceiverInfo +import org.apache.spark.streaming.ui.{BatchInfo, ReceiverInfo} import org.scalatest.matchers.ShouldMatchers import org.scalatest.concurrent.Eventually._ From d7f849c390297de99ea17dfa2f91a9145b3dd6f2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 15:52:03 -0700 Subject: [PATCH 3/5] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui" This reverts commit 5c80919dc1c03bccf997f71136d734de4e0cddfa. --- .../apache/spark/streaming/{ui => scheduler}/BatchInfo.scala | 3 +-- .../scala/org/apache/spark/streaming/scheduler/JobSet.scala | 1 - .../apache/spark/streaming/scheduler/StreamingListener.scala | 1 - .../spark/streaming/ui/StreamingJobProgressListener.scala | 1 + .../org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 5 files changed, 3 insertions(+), 5 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/{ui => scheduler}/BatchInfo.scala (95%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala rename to streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index de04b8a30009..9c69a2a4e21f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.spark.streaming.ui +package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.Time -import org.apache.spark.streaming.scheduler.ReceivedBlockInfo /** * Class having information on completed batches. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 449b4bd1f7dd..a69d74362173 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.streaming.Time -import org.apache.spark.streaming.ui.BatchInfo /** Class representing a set of Jobs * belong to the same batch. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 191f92fc1e79..9de7adab49bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue import org.apache.spark.util.Distribution -import org.apache.spark.streaming.ui.BatchInfo /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 00d0431f88d3..9017af054fa7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -22,6 +22,7 @@ import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.{Queue, HashMap} import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted +import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution import org.apache.spark.Logging diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 049d7aba9243..f6209f98663d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.ui.{BatchInfo, ReceiverInfo} +import org.apache.spark.streaming.ui.ReceiverInfo import org.scalatest.matchers.ShouldMatchers import org.scalatest.concurrent.Eventually._ From ad98bc9696dac03ffe37dbf86027e3737387423a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 17:56:40 -0700 Subject: [PATCH 4/5] Refactored streaming listener to use ReceiverInfo. --- .../streaming/receiver/ActorReceiver.scala | 12 +++++-- .../spark/streaming/receiver/Receiver.scala | 3 ++ .../spark/streaming/scheduler/BatchInfo.scala | 3 ++ .../{ui => scheduler}/ReceiverInfo.scala | 20 +++++++---- .../streaming/scheduler/ReceiverTracker.scala | 35 ++++++++++++------- .../scheduler/StreamingListener.scala | 27 +++++++++++--- .../ui/StreamingJobProgressListener.scala | 35 ++----------------- .../streaming/StreamingListenerSuite.scala | 10 +++--- 8 files changed, 84 insertions(+), 61 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/{ui => scheduler}/ReceiverInfo.scala (70%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 821cf19481d4..743be58950c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart} import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.StorageLevel import java.nio.ByteBuffer +import org.apache.spark.annotation.DeveloperApi -/** A helper with set of defaults for supervisor strategy */ +/** + * :: DeveloperApi :: + * A helper with set of defaults for supervisor strategy + */ +@DeveloperApi object ActorSupervisorStrategy { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = @@ -40,6 +45,7 @@ object ActorSupervisorStrategy { } /** + * :: DeveloperApi :: * A receiver trait to be mixed in with your Actor to gain access to * the API for pushing received data into Spark Streaming for being processed. * @@ -61,6 +67,7 @@ object ActorSupervisorStrategy { * to ensure the type safety, i.e parametrized type of push block and InputDStream * should be same. */ +@DeveloperApi trait ActorHelper { self: Actor => // to ensure that this can be added to Actor classes only @@ -92,10 +99,12 @@ trait ActorHelper { } /** + * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in * conjunction with `StreamingContext.actorStream` and * [[org.apache.spark.streaming.receiver.ActorHelper]]. */ +@DeveloperApi case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, numberOfHiccups: Int, @@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag]( supervisor ! PoisonPill } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 299fc28ad2a3..524c1b8d8ce4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * should define the setup steps necessary to start receiving data, @@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel * } * }}} */ +@DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9c69a2a4e21f..a68aecb88111 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.Time +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Class having information on completed batches. * @param batchTime Time of the batch * @param submissionTime Clock time of when jobs of this batch was submitted to @@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing */ +@DeveloperApi case class BatchInfo( batchTime: Time, receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala similarity index 70% rename from streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala rename to streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index 3f5e8ae8249c..d7e39c528c51 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -15,15 +15,23 @@ * limitations under the License. */ -package org.apache.spark.streaming.ui +package org.apache.spark.streaming.scheduler -/** Class having information about a receiver */ +import akka.actor.ActorRef +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Class having information about a receiver + */ +@DeveloperApi case class ReceiverInfo( streamId: Int, name: String, - var active: Boolean, - var location: String = "", - var lastErrorMessage: String = "", - var lastError: String = "" + private[streaming] val actor: ActorRef, + active: Boolean, + location: String, + lastErrorMessage: String = "", + lastError: String = "" ) { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index cec9bde5ece8..6452cb6b99c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -28,11 +28,6 @@ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver} import org.apache.spark.util.AkkaUtils -/** Information about receiver */ -private[scheduler] case class ReceiverInfo(streamId: Int, typ: String, location: String) { - override def toString = s"$typ-$streamId" -} - /** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, @@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { val receiverInputStreams = ssc.graph.getReceiverInputStreams() val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverLauncher() - val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] + val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo] val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) @@ -129,15 +124,22 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { if (!receiverInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } - receiverInfo += ((streamId, receiverActor)) - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(streamId, typ, host)) + receiverInfo(streamId) = ReceiverInfo(streamId, s"${typ}-${streamId}", receiverActor, true, host) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) } /** Deregister a receiver */ def deregisterReceiver(streamId: Int, message: String, error: String) { - receiverInfo -= streamId - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -155,7 +157,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Report error sent by a receiver */ def reportError(streamId: Int, message: String, error: String) { - ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -269,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.foreach(_ ! StopReceiver) + receiverInfo.values.flatMap { info => Option(info.actor)} + .foreach { _ ! StopReceiver } logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 9de7adab49bc..ed1aa114e19d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue import org.apache.spark.util.Distribution +import org.apache.spark.annotation.DeveloperApi -/** Base trait for events related to StreamingListener */ +/** + * :: DeveloperApi :: + * Base trait for events related to StreamingListener + */ +@DeveloperApi sealed trait StreamingListenerEvent +@DeveloperApi case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverStarted(streamId: Int, typ: String, location: String) +@DeveloperApi +case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverError(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent /** + * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming * computation. */ +@DeveloperApi trait StreamingListener { /** Called when a receiver has been started */ @@ -65,9 +82,11 @@ trait StreamingListener { /** + * :: DeveloperApi :: * A simple StreamingListener that logs summary statistics across Spark Streaming batches * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) */ +@DeveloperApi class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 9017af054fa7..16abe29dea7b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -42,48 +42,19 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { - receiverInfos(receiverStarted.streamId) = ReceiverInfo(receiverStarted.streamId, - s"${receiverStarted.typ}-${receiverStarted.streamId}", true, receiverStarted.location) + receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo } } override def onReceiverError(receiverError: StreamingListenerReceiverError) { synchronized { - val newReceiverInfo = receiverInfos.get(receiverError.streamId) match { - case Some(oldInfo) => - oldInfo.copy(lastErrorMessage = receiverError.message, lastError = receiverError.error) - case None => - logWarning("No prior receiver info") - ReceiverInfo( - receiverError.streamId, - "", true, "", - lastErrorMessage = receiverError.message, - lastError = receiverError.error - ) - } - receiverInfos(receiverError.streamId) = newReceiverInfo + receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo } } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { synchronized { - val newReceiverInfo = receiverInfos.get(receiverStopped.streamId) match { - case Some(oldInfo) => - oldInfo.copy( - active = false, - lastErrorMessage = receiverStopped.message, - lastError = receiverStopped.error - ) - case None => - logWarning("No prior receiver info") - ReceiverInfo( - receiverStopped.streamId, - "", true, "", - lastErrorMessage = receiverStopped.message, - lastError = receiverStopped.error - ) - } - receiverInfos(receiverStopped.streamId) = newReceiverInfo + receiverInfos(receiverStopped.receiverInfo.streamId) = receiverStopped.receiverInfo } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index f6209f98663d..28438abf099f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.ui.ReceiverInfo import org.scalatest.matchers.ShouldMatchers import org.scalatest.concurrent.Eventually._ @@ -66,7 +65,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { test("receiver info reporting") { val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) - val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) val collector = new ReceiverInfoCollector @@ -113,15 +112,16 @@ class ReceiverInfoCollector extends StreamingListener { val receiverErrors = new ArrayBuffer[(Int, String, String)]() override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - startedReceiverStreamIds += receiverStarted.streamId + startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - stoppedReceiverStreamIds += receiverStopped.streamId + stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } override def onReceiverError(receiverError: StreamingListenerReceiverError) { - receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error)) + receiverErrors += ((receiverError.receiverInfo.streamId, + receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)) } } From dbddf75eb9c3f82f9db20a9e97c6aababe032478 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 24 Apr 2014 18:25:02 -0700 Subject: [PATCH 5/5] Style fix. --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 1970417eb31e..5307fe189d71 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -124,7 +124,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { if (!receiverInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } - receiverInfo(streamId) = ReceiverInfo(streamId, s"${typ}-${streamId}", receiverActor, true, host) + receiverInfo(streamId) = ReceiverInfo( + streamId, s"${typ}-${streamId}", receiverActor, true, host) ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) }