@@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
3333/** Trait that represents the metadata related to storage of blocks */
3434private [streaming] trait ReceivedBlockStoreResult {
3535 def blockId : StreamBlockId // Any implementation of this trait will store a block id
36+ def numRecords : Long // Any implementation of this trait will store the number of records
3637}
3738
3839/** Trait that represents a class that handles the storage of blocks received by receiver */
@@ -51,7 +52,7 @@ private[streaming] trait ReceivedBlockHandler {
5152 * that stores the metadata related to storage of blocks using
5253 * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler ]]
5354 */
54- private [streaming] case class BlockManagerBasedStoreResult (blockId : StreamBlockId )
55+ private [streaming] case class BlockManagerBasedStoreResult (blockId : StreamBlockId , numRecords : Long )
5556 extends ReceivedBlockStoreResult
5657
5758
@@ -64,11 +65,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
6465 extends ReceivedBlockHandler with Logging {
6566
6667 def storeBlock (blockId : StreamBlockId , block : ReceivedBlock ): ReceivedBlockStoreResult = {
68+ var numRecords = None : Option [Long ]
69+ val countIterator = block match {
70+ case ArrayBufferBlock (arrayBuffer) => new CountingIterator (arrayBuffer.iterator)
71+ case IteratorBlock (iterator) => new CountingIterator (iterator)
72+ case _ => null
73+ }
6774 val putResult : Seq [(BlockId , BlockStatus )] = block match {
6875 case ArrayBufferBlock (arrayBuffer) =>
69- blockManager.putIterator(blockId, arrayBuffer.iterator , storageLevel, tellMaster = true )
76+ blockManager.putIterator(blockId, countIterator , storageLevel, tellMaster = true )
7077 case IteratorBlock (iterator) =>
71- blockManager.putIterator(blockId, iterator , storageLevel, tellMaster = true )
78+ blockManager.putIterator(blockId, countIterator , storageLevel, tellMaster = true )
7279 case ByteBufferBlock (byteBuffer) =>
7380 blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true )
7481 case o =>
@@ -79,7 +86,10 @@ private[streaming] class BlockManagerBasedBlockHandler(
7986 throw new SparkException (
8087 s " Could not store $blockId to block manager with storage level $storageLevel" )
8188 }
82- BlockManagerBasedStoreResult (blockId)
89+ if (countIterator != null ) {
90+ numRecords = Some (countIterator.count)
91+ }
92+ BlockManagerBasedStoreResult (blockId,numRecords.getOrElse(- 1 ))
8393 }
8494
8595 def cleanupOldBlocks (threshTime : Long ) {
@@ -96,6 +106,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
96106 */
97107private [streaming] case class WriteAheadLogBasedStoreResult (
98108 blockId : StreamBlockId ,
109+ numRecords : Long ,
99110 walRecordHandle : WriteAheadLogRecordHandle
100111 ) extends ReceivedBlockStoreResult
101112
@@ -151,12 +162,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
151162 */
152163 def storeBlock (blockId : StreamBlockId , block : ReceivedBlock ): ReceivedBlockStoreResult = {
153164
165+ var numRecords = None : Option [Long ]
166+ val countIterator = block match {
167+ case ArrayBufferBlock (arrayBuffer) => new CountingIterator (arrayBuffer.iterator)
168+ case IteratorBlock (iterator) => new CountingIterator (iterator)
169+ case _ => null
170+ }
154171 // Serialize the block so that it can be inserted into both
155172 val serializedBlock = block match {
156173 case ArrayBufferBlock (arrayBuffer) =>
157- blockManager.dataSerialize(blockId, arrayBuffer.iterator )
174+ blockManager.dataSerialize(blockId, countIterator )
158175 case IteratorBlock (iterator) =>
159- blockManager.dataSerialize(blockId, iterator )
176+ blockManager.dataSerialize(blockId, countIterator )
160177 case ByteBufferBlock (byteBuffer) =>
161178 byteBuffer
162179 case _ =>
@@ -181,7 +198,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
181198 // Combine the futures, wait for both to complete, and return the write ahead log record handle
182199 val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
183200 val walRecordHandle = Await .result(combinedFuture, blockStoreTimeout)
184- WriteAheadLogBasedStoreResult (blockId, walRecordHandle)
201+ if (countIterator != null ) {
202+ numRecords = Some (countIterator.count)
203+ }
204+ WriteAheadLogBasedStoreResult (blockId, numRecords.getOrElse(- 1 ), walRecordHandle)
185205 }
186206
187207 def cleanupOldBlocks (threshTime : Long ) {
@@ -199,3 +219,15 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
199219 new Path (checkpointDir, new Path (" receivedData" , streamId.toString)).toString
200220 }
201221}
222+
223+ /**
224+ * A utility that will wrap the Iterator to get the count
225+ */
226+ class CountingIterator [T : Manifest ](iterator : Iterator [T ]) extends Iterator [T ] {
227+ var count = 0
228+ def hasNext (): Boolean = iterator.hasNext
229+ def next () = {
230+ count+= 1
231+ iterator.next()
232+ }
233+ }
0 commit comments