@@ -69,16 +69,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
6969
7070 def storeBlock (blockId : StreamBlockId , block : ReceivedBlock ): ReceivedBlockStoreResult = {
7171 var numRecords = None : Option [Long ]
72- val countIterator = block match {
73- case ArrayBufferBlock (arrayBuffer) => new CountingIterator (arrayBuffer.iterator)
74- case IteratorBlock (iterator) => new CountingIterator (iterator)
75- case _ => null
76- }
72+
7773 val putResult : Seq [(BlockId , BlockStatus )] = block match {
7874 case ArrayBufferBlock (arrayBuffer) =>
79- blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true )
75+ val countIterator = new CountingIterator (arrayBuffer.iterator)
76+ val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
77+ tellMaster = true )
78+ numRecords = Some (countIterator.count)
79+ putResult
8080 case IteratorBlock (iterator) =>
81- blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true )
81+ val countIterator = new CountingIterator (iterator)
82+ val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
83+ tellMaster = true )
84+ numRecords = Some (countIterator.count)
85+ putResult
8286 case ByteBufferBlock (byteBuffer) =>
8387 blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true )
8488 case o =>
@@ -89,9 +93,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
8993 throw new SparkException (
9094 s " Could not store $blockId to block manager with storage level $storageLevel" )
9195 }
92- if (countIterator != null ) {
93- numRecords = Some (countIterator.count)
94- }
9596 BlockManagerBasedStoreResult (blockId, numRecords)
9697 }
9798
@@ -166,17 +167,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
166167 def storeBlock (blockId : StreamBlockId , block : ReceivedBlock ): ReceivedBlockStoreResult = {
167168
168169 var numRecords = None : Option [Long ]
169- val countIterator = block match {
170- case ArrayBufferBlock (arrayBuffer) => new CountingIterator (arrayBuffer.iterator)
171- case IteratorBlock (iterator) => new CountingIterator (iterator)
172- case _ => null
173- }
174170 // Serialize the block so that it can be inserted into both
175171 val serializedBlock = block match {
176172 case ArrayBufferBlock (arrayBuffer) =>
177- blockManager.dataSerialize(blockId, countIterator)
173+ val countIterator = new CountingIterator (arrayBuffer.iterator)
174+ val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
175+ numRecords = Some (countIterator.count)
176+ serializedBlock
178177 case IteratorBlock (iterator) =>
179- blockManager.dataSerialize(blockId, countIterator)
178+ val countIterator = new CountingIterator (iterator)
179+ val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
180+ numRecords = Some (countIterator.count)
181+ serializedBlock
180182 case ByteBufferBlock (byteBuffer) =>
181183 byteBuffer
182184 case _ =>
@@ -201,9 +203,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
201203 // Combine the futures, wait for both to complete, and return the write ahead log record handle
202204 val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
203205 val walRecordHandle = Await .result(combinedFuture, blockStoreTimeout)
204- if (countIterator != null ) {
205- numRecords = Some (countIterator.count)
206- }
207206 WriteAheadLogBasedStoreResult (blockId, numRecords, walRecordHandle)
208207 }
209208
@@ -226,7 +225,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
226225/**
227226 * A utility that will wrap the Iterator to get the count
228227 */
229- private class CountingIterator [T : Manifest ](iterator : Iterator [T ]) extends Iterator [T ] {
228+ private class CountingIterator [T ](iterator : Iterator [T ]) extends Iterator [T ] {
230229 var count = 0
231230 def hasNext (): Boolean = iterator.hasNext
232231 def isFullyConsumed : Boolean = ! iterator.hasNext
0 commit comments