From d255b6ed393d78e5216bed528a2a5a253683f8da Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 5 Jun 2015 07:26:32 +0800 Subject: [PATCH 1/2] Fix the negative event count issue --- .../spark/streaming/dstream/ReceiverInputDStream.scala | 2 +- .../apache/spark/streaming/scheduler/InputInfoTracker.scala | 6 +++++- .../spark/streaming/scheduler/ReceivedBlockInfo.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index e4ff05e12f201..406d5a14173bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Register the input blocks information into InputInfoTracker - val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum) + val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).filter(_ > 0).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) if (blockInfos.nonEmpty) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala index a72efccf2f994..b3b49ac059d11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala @@ -22,7 +22,11 @@ import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.streaming.{Time, StreamingContext} -/** To track the information of input stream at specified batch time. */ +/** + * To track the information of input stream at specified batch time. + * + * Note: `numRecords` must be >= 0. + */ private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala index dc11e84f29965..ea354194a3df5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala @@ -24,7 +24,7 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle /** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, - numRecords: Long, + numRecords: Long, // -1 means unknown metadataOption: Option[Any], blockStoreResult: ReceivedBlockStoreResult ) { From a5d7da6f63ff00cbf4f66ac6175148dd61ffa54e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 5 Jun 2015 12:02:50 +0800 Subject: [PATCH 2/2] Address comments --- .../spark/streaming/dstream/ReceiverInputDStream.scala | 2 +- .../streaming/receiver/ReceiverSupervisorImpl.scala | 4 ++-- .../spark/streaming/scheduler/InputInfoTracker.scala | 10 ++++------ .../spark/streaming/scheduler/ReceivedBlockInfo.scala | 4 +++- .../spark/streaming/ReceivedBlockTrackerSuite.scala | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 406d5a14173bd..e76e7eb0dea19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Register the input blocks information into InputInfoTracker - val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).filter(_ > 0).sum) + val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) if (blockInfos.nonEmpty) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 92938379b9c17..8be732b64e3a3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl( ) { val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { - case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case _ => -1 + case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong) + case _ => None } val time = System.currentTimeMillis diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala index b3b49ac059d11..7c0db8a863c67 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala @@ -22,12 +22,10 @@ import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.streaming.{Time, StreamingContext} -/** - * To track the information of input stream at specified batch time. - * - * Note: `numRecords` must be >= 0. - */ -private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) +/** To track the information of input stream at specified batch time. */ +private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) { + require(numRecords >= 0, "numRecords must not be negative") +} /** * This class manages all the input streams as well as their input data statistics. The information diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala index ea354194a3df5..656ac80df8979 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala @@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle /** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, - numRecords: Long, // -1 means unknown + numRecords: Option[Long], metadataOption: Option[Any], blockStoreResult: ReceivedBlockStoreResult ) { + require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative") + @volatile private var _isBlockIdValid = true def blockId: StreamBlockId = blockStoreResult.blockId diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 6f0ee774cb5cf..be305b5e0dfea 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { - List.fill(5)(ReceivedBlockInfo(streamId, 0, None, + List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) }