Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

if (blockInfos.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
case _ => None
}

val time = System.currentTimeMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Time, StreamingContext}

/** To track the information of input stream at specified batch time. */
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
require(numRecords >= 0, "numRecords must not be negative")
}

/**
* This class manages all the input streams as well as their input data statistics. The information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {

require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")

@volatile private var _isBlockIdValid = true

def blockId: StreamBlockId = blockStoreResult.blockId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite

/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}

Expand Down