From 23219c6afef2f2af6d2c16b7ab547a8549735c4c Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Wed, 3 Jun 2015 20:09:38 +0530 Subject: [PATCH 1/9] Receiver.store with Iterator do not give correct count at Spark UI --- .../apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 92938379b9c17..d9e8cf1c226ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -139,6 +139,7 @@ private[streaming] class ReceiverSupervisorImpl( val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size + case IteratorBlock(iterator) => iterator.length case _ => -1 } From 095492f1e7520d264ada7b6d93d06617a4920eef Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Thu, 4 Jun 2015 00:31:07 +0530 Subject: [PATCH 2/9] [SPARK-8080] Receiver.store with Iterator does not give correct count at Spark UI --- .../streaming/receiver/ReceiverSupervisorImpl.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index d9e8cf1c226ee..349ac69b7698b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -136,15 +136,18 @@ private[streaming] class ReceiverSupervisorImpl( metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { + var rBlock = receivedBlock val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case IteratorBlock(iterator) => iterator.length + case IteratorBlock(iterator) => + var arrayBuffer = ArrayBuffer(iterator.toArray : _*) + rBlock = new ArrayBufferBlock(arrayBuffer) + arrayBuffer.size case _ => -1 } - val time = System.currentTimeMillis - val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) + val blockStoreResult = receivedBlockHandler.storeBlock(blockId, rBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) From ccbdbdf086dc2720390849cf669b3aee9133ed65 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Thu, 4 Jun 2015 00:32:25 +0530 Subject: [PATCH 3/9] [SPARK-8080] Receiver.store with Iterator does not give correct count at Spark UI --- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 349ac69b7698b..c345ba7134079 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -136,7 +136,7 @@ private[streaming] class ReceiverSupervisorImpl( metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { - var rBlock = receivedBlock + var rBlock = receivedBlock val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size From 20b9374f788a172584887f3972faa3ab26fc45d3 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Thu, 4 Jun 2015 00:46:02 +0530 Subject: [PATCH 4/9] [SPARK-8080] Receiver.store with Iterator does not give correct count at Spark UI. Fixed some formatting issue --- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index c345ba7134079..e8889ce68b917 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -140,8 +140,8 @@ private[streaming] class ReceiverSupervisorImpl( val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case IteratorBlock(iterator) => - var arrayBuffer = ArrayBuffer(iterator.toArray : _*) + case IteratorBlock(iterator) => + var arrayBuffer = ArrayBuffer(iterator.toArray : _*) rBlock = new ArrayBufferBlock(arrayBuffer) arrayBuffer.size case _ => -1 From f9aaa9f7b888cd7988fd25d79b14d381758bcdff Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Thu, 4 Jun 2015 21:49:35 +0530 Subject: [PATCH 5/9] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../receiver/ReceivedBlockHandler.scala | 46 ++++++++++++++++--- .../receiver/ReceiverSupervisorImpl.scala | 13 +----- .../streaming/ReceivedBlockTrackerSuite.scala | 2 +- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 207d64d9414ee..3a3e1f42c4bd5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { def blockId: StreamBlockId // Any implementation of this trait will store a block id + def numRecords: Long // Any implementation of this trait will store the number of records } /** Trait that represents a class that handles the storage of blocks received by receiver */ @@ -51,7 +52,7 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId) +private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId , numRecords: Long) extends ReceivedBlockStoreResult @@ -64,11 +65,17 @@ private[streaming] class BlockManagerBasedBlockHandler( extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + var numRecords = None: Option[Long] + val countIterator = block match { + case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) + case IteratorBlock(iterator) => new CountingIterator(iterator) + case _ => null + } val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) + blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => - blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) + blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => @@ -79,7 +86,10 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } - BlockManagerBasedStoreResult(blockId) + if(countIterator !=null) { + numRecords = Some(countIterator.count) + } + BlockManagerBasedStoreResult(blockId,numRecords.getOrElse(-1)) } def cleanupOldBlocks(threshTime: Long) { @@ -96,6 +106,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, + numRecords: Long, walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -151,12 +162,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler( */ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + var numRecords = None: Option[Long] + val countIterator = block match { + case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) + case IteratorBlock(iterator) => new CountingIterator(iterator) + case _ => null + } // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.dataSerialize(blockId, arrayBuffer.iterator) + blockManager.dataSerialize(blockId, countIterator) case IteratorBlock(iterator) => - blockManager.dataSerialize(blockId, iterator) + blockManager.dataSerialize(blockId, countIterator) case ByteBufferBlock(byteBuffer) => byteBuffer case _ => @@ -181,7 +198,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) - WriteAheadLogBasedStoreResult(blockId, walRecordHandle) + if(countIterator !=null) { + numRecords = Some(countIterator.count) + } + WriteAheadLogBasedStoreResult(blockId, numRecords.getOrElse(-1), walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { @@ -199,3 +219,15 @@ private[streaming] object WriteAheadLogBasedBlockHandler { new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString } } + +/** + * A utility that will wrap the Iterator to get the count + */ +class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { + var count = 0 + def hasNext(): Boolean = iterator.hasNext + def next() = { + count+=1 + iterator.next() + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index e8889ce68b917..6078cdf8f8790 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -136,20 +136,11 @@ private[streaming] class ReceiverSupervisorImpl( metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { - var rBlock = receivedBlock val blockId = blockIdOption.getOrElse(nextBlockId) - val numRecords = receivedBlock match { - case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case IteratorBlock(iterator) => - var arrayBuffer = ArrayBuffer(iterator.toArray : _*) - rBlock = new ArrayBufferBlock(arrayBuffer) - arrayBuffer.size - case _ => -1 - } val time = System.currentTimeMillis - val blockStoreResult = receivedBlockHandler.storeBlock(blockId, rBlock) + val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") - + val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 6f0ee774cb5cf..9f0a1310bb182 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { List.fill(5)(ReceivedBlockInfo(streamId, 0, None, - BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) + BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), 1))) } /** Get all the data written in the given write ahead log file. */ From c631a26ef26a9052d4010724a1b791a8590ec3d2 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Thu, 4 Jun 2015 22:51:36 +0530 Subject: [PATCH 6/9] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed Scala Style check issue --- .../spark/streaming/receiver/ReceivedBlockHandler.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 3a3e1f42c4bd5..5955f08b9a0e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -52,7 +52,8 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId , numRecords: Long) +private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId , + numRecords: Long) extends ReceivedBlockStoreResult @@ -89,7 +90,7 @@ private[streaming] class BlockManagerBasedBlockHandler( if(countIterator !=null) { numRecords = Some(countIterator.count) } - BlockManagerBasedStoreResult(blockId,numRecords.getOrElse(-1)) + BlockManagerBasedStoreResult(blockId, numRecords.getOrElse(-1)) } def cleanupOldBlocks(threshTime: Long) { @@ -223,10 +224,10 @@ private[streaming] object WriteAheadLogBasedBlockHandler { /** * A utility that will wrap the Iterator to get the count */ -class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { +private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { var count = 0 def hasNext(): Boolean = iterator.hasNext - def next() = { + def next(): T = { count+=1 iterator.next() } From 51de69e9ab29220e61df0cfadabff13ceea2aaf9 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Fri, 5 Jun 2015 23:34:07 +0530 Subject: [PATCH 7/9] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Test Cases --- .../receiver/ReceivedBlockHandler.scala | 4 +- .../streaming/ReceivedBlockHandlerSuite.scala | 88 +++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 5955f08b9a0e0..edfb981b2df08 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -87,7 +87,7 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } - if(countIterator !=null) { + if(countIterator != null) { numRecords = Some(countIterator.count) } BlockManagerBasedStoreResult(blockId, numRecords.getOrElse(-1)) @@ -199,7 +199,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) - if(countIterator !=null) { + if(countIterator != null) { numRecords = Some(countIterator.count) } WriteAheadLogBasedStoreResult(blockId, numRecords.getOrElse(-1), walRecordHandle) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index cca8cedb1d080..2457e2634e716 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -61,8 +61,24 @@ class ReceivedBlockHandlerSuite var rpcEnv: RpcEnv = null var blockManagerMaster: BlockManagerMaster = null var blockManager: BlockManager = null + var handler: ReceivedBlockHandler = null var tempDirectory: File = null + private def makeBlockManagerBasedBlockHandler( + manager: BlockManager, + storageLevel: StorageLevel): BlockManagerBasedBlockHandler = { + val handler = new BlockManagerBasedBlockHandler(manager, storageLevel) + handler + } + + private def makeWriteAheadLogBasedBlockHandler( + manager: BlockManager, + storageLevel: StorageLevel): WriteAheadLogBasedBlockHandler = { + val handler = new WriteAheadLogBasedBlockHandler(manager, 1, + storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + handler + } + before { rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) @@ -113,6 +129,78 @@ class ReceivedBlockHandlerSuite } } + test("BlockManagerBasedBlockHandler - count messages MEMORY_ONLY") { + handler = makeBlockManagerBasedBlockHandler(blockManager, StorageLevel.MEMORY_ONLY) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + + test("BlockManagerBasedBlockHandler - count messages DISK_ONLY") { + handler = makeBlockManagerBasedBlockHandler(blockManager, StorageLevel.DISK_ONLY) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + + test("BlockManagerBasedBlockHandler - count messages MEMORY_AND_DISK") { + handler = makeBlockManagerBasedBlockHandler(blockManager, StorageLevel.MEMORY_AND_DISK) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + + test("WriteAheadLogBasedBlockHandler - count messages MEMORY_ONLY") { + handler = makeWriteAheadLogBasedBlockHandler(blockManager, StorageLevel.MEMORY_ONLY) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + + test("WriteAheadLogBasedBlockHandler - count messages DISK_ONLY") { + handler = makeWriteAheadLogBasedBlockHandler(blockManager, StorageLevel.DISK_ONLY) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + + test("WriteAheadLogBasedBlockHandler - count messages MEMORY_AND_DISK") { + handler = makeWriteAheadLogBasedBlockHandler(blockManager, StorageLevel.MEMORY_AND_DISK) + val block1 = List.fill(100)(new Array[Byte](100)) + val blockId1 = generateBlockId() + val blockStoreResult1 = handler.storeBlock(blockId1, IteratorBlock(block1.iterator)) + val block2 = collection.mutable.ArrayBuffer.fill(100)(0) + val blockId2 = generateBlockId() + val blockStoreResult2 = handler.storeBlock(blockId2, ArrayBufferBlock(block2)) + assert(blockStoreResult1.numRecords === 100) + assert(blockStoreResult2.numRecords === 100) + } + test("BlockManagerBasedBlockHandler - handle errors in storing block") { withBlockManagerBasedBlockHandler { handler => testErrorHandling(handler) From c250fb5fb6a08d89b0aed05b5d3eec44b9991c01 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Sun, 7 Jun 2015 16:06:46 +0530 Subject: [PATCH 8/9] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../receiver/ReceivedBlockHandler.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index edfb981b2df08..dd0af251b46bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -32,8 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { - def blockId: StreamBlockId // Any implementation of this trait will store a block id - def numRecords: Long // Any implementation of this trait will store the number of records + // Any implementation of this trait will store a block id + def blockId: StreamBlockId + // Any implementation of this trait will have to return the number of records + def numRecords: Option[Long] } /** Trait that represents a class that handles the storage of blocks received by receiver */ @@ -52,8 +54,8 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId , - numRecords: Long) +private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId, + numRecords: Option[Long]) extends ReceivedBlockStoreResult @@ -90,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler( if(countIterator != null) { numRecords = Some(countIterator.count) } - BlockManagerBasedStoreResult(blockId, numRecords.getOrElse(-1)) + BlockManagerBasedStoreResult(blockId, numRecords) } def cleanupOldBlocks(threshTime: Long) { @@ -107,7 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, - numRecords: Long, + numRecords: Option[Long], walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -202,7 +204,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( if(countIterator != null) { numRecords = Some(countIterator.count) } - WriteAheadLogBasedStoreResult(blockId, numRecords.getOrElse(-1), walRecordHandle) + WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { @@ -227,6 +229,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler { private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { var count = 0 def hasNext(): Boolean = iterator.hasNext + def isFullyConsumed: Boolean = !iterator.hasNext def next(): T = { count+=1 iterator.next() From 28225d58846e6c214fbc6f7e1ef09cda6b7cd94d Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Mon, 8 Jun 2015 08:04:56 +0530 Subject: [PATCH 9/9] Modified comments --- .../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 93e034a52845d..790d8ab06d3b4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -312,6 +312,7 @@ class ReceivedBlockHandlerSuite storageLevel = StorageLevel.MEMORY_ONLY blockManager = makeBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store // this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block // and hence it will not tryToPut this block , resulting the SparkException @@ -328,6 +329,7 @@ class ReceivedBlockHandlerSuite storageLevel = StorageLevel.MEMORY_AND_DISK blockManager = makeBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store // this block in MEMORY , But BlockManager will be able to sereliaze this block to DISK // and hence count returns correct value. @@ -341,6 +343,7 @@ class ReceivedBlockHandlerSuite storageLevel = StorageLevel.MEMORY_ONLY blockManager = makeBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store // this block in MEMORY , But BlockManager will be able to sereliaze this block to WAL // and hence count returns correct value.