From 01e6dc8ad9ac6353ef8e073b93a96bffb6e46ca6 Mon Sep 17 00:00:00 2001 From: "U-PEROOT\\UBHATD1" Date: Mon, 8 Jun 2015 19:47:16 +0530 Subject: [PATCH 1/7] A --- .../streaming/receiver/ReceiverSupervisorImpl.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 8be732b64e3a3..38aedb5868690 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -136,14 +136,20 @@ private[streaming] class ReceiverSupervisorImpl( metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { + var rBlock = receivedBlock val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong) case _ => None + case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size + case IteratorBlock(iterator) => + var arrayBuffer = ArrayBuffer(iterator.toArray : _*) + rBlock = new ArrayBufferBlock(arrayBuffer) + arrayBuffer.size + case _ => -1 } - val time = System.currentTimeMillis - val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) + val blockStoreResult = receivedBlockHandler.storeBlock(blockId, rBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) From 4c5931d660c6d0642dbb63c2340b24f5493e19d3 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Mon, 8 Jun 2015 20:34:04 +0530 Subject: [PATCH 2/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../receiver/ReceivedBlockHandler.scala | 52 ++++- .../receiver/ReceiverSupervisorImpl.scala | 15 +- .../streaming/ReceivedBlockHandlerSuite.scala | 192 +++++++++++++++++- .../streaming/ReceivedBlockTrackerSuite.scala | 2 +- 4 files changed, 238 insertions(+), 23 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 207d64d9414ee..dd0af251b46bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { - def blockId: StreamBlockId // Any implementation of this trait will store a block id + // Any implementation of this trait will store a block id + def blockId: StreamBlockId + // Any implementation of this trait will have to return the number of records + def numRecords: Option[Long] } /** Trait that represents a class that handles the storage of blocks received by receiver */ @@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId) +private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId, + numRecords: Option[Long]) extends ReceivedBlockStoreResult @@ -64,11 +68,17 @@ private[streaming] class BlockManagerBasedBlockHandler( extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + var numRecords = None: Option[Long] + val countIterator = block match { + case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) + case IteratorBlock(iterator) => new CountingIterator(iterator) + case _ => null + } val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) + blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => - blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) + blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => @@ -79,7 +89,10 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } - BlockManagerBasedStoreResult(blockId) + if(countIterator != null) { + numRecords = Some(countIterator.count) + } + BlockManagerBasedStoreResult(blockId, numRecords) } def cleanupOldBlocks(threshTime: Long) { @@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler( */ private[streaming] case class WriteAheadLogBasedStoreResult( blockId: StreamBlockId, + numRecords: Option[Long], walRecordHandle: WriteAheadLogRecordHandle ) extends ReceivedBlockStoreResult @@ -151,12 +165,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler( */ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + var numRecords = None: Option[Long] + val countIterator = block match { + case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) + case IteratorBlock(iterator) => new CountingIterator(iterator) + case _ => null + } // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.dataSerialize(blockId, arrayBuffer.iterator) + blockManager.dataSerialize(blockId, countIterator) case IteratorBlock(iterator) => - blockManager.dataSerialize(blockId, iterator) + blockManager.dataSerialize(blockId, countIterator) case ByteBufferBlock(byteBuffer) => byteBuffer case _ => @@ -181,7 +201,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) - WriteAheadLogBasedStoreResult(blockId, walRecordHandle) + if(countIterator != null) { + numRecords = Some(countIterator.count) + } + WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) } def cleanupOldBlocks(threshTime: Long) { @@ -199,3 +222,16 @@ private[streaming] object WriteAheadLogBasedBlockHandler { new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString } } + +/** + * A utility that will wrap the Iterator to get the count + */ +private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { + var count = 0 + def hasNext(): Boolean = iterator.hasNext + def isFullyConsumed: Boolean = !iterator.hasNext + def next(): T = { + count+=1 + iterator.next() + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 38aedb5868690..6078cdf8f8790 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -136,22 +136,11 @@ private[streaming] class ReceiverSupervisorImpl( metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { - var rBlock = receivedBlock val blockId = blockIdOption.getOrElse(nextBlockId) - val numRecords = receivedBlock match { - case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong) - case _ => None - case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case IteratorBlock(iterator) => - var arrayBuffer = ArrayBuffer(iterator.toArray : _*) - rBlock = new ArrayBufferBlock(arrayBuffer) - arrayBuffer.size - case _ => -1 - } val time = System.currentTimeMillis - val blockStoreResult = receivedBlockHandler.storeBlock(blockId, rBlock) + val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") - + val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index cca8cedb1d080..790d8ab06d3b4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -49,7 +49,6 @@ class ReceivedBlockHandlerSuite val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") val hadoopConf = new Configuration() - val storageLevel = StorageLevel.MEMORY_ONLY_SER val streamId = 1 val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) @@ -61,7 +60,21 @@ class ReceivedBlockHandlerSuite var rpcEnv: RpcEnv = null var blockManagerMaster: BlockManagerMaster = null var blockManager: BlockManager = null + var handler: ReceivedBlockHandler = null var tempDirectory: File = null + var storageLevel = StorageLevel.MEMORY_ONLY_SER + + private def makeBlockManager( + maxMem: Long, + name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + conf.set("spark.storage.unrollMemoryThreshold", "512") + conf.set("spark.storage.unrollFraction", "0.4") + val transfer = new NioBlockTransferService(conf, securityMgr) + val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + manager.initialize("app-id") + manager + } before { rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) @@ -174,6 +187,172 @@ class ReceivedBlockHandlerSuite } } + test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + // Create a non-trivial (not all zeros) byte array + var counter = 0.toByte + def incr: Byte = {counter = (counter + 1).toByte; counter;} + val bytes = Array.fill[Byte](100)(incr) + val byteBufferBlock = ByteBuffer.wrap(bytes) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) + assert(blockStoreResult.numRecords === None) + } + } + + test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.DISK_ONLY + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_AND_DISK + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages") { + storageLevel = StorageLevel.DISK_ONLY + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") { + storageLevel = StorageLevel.MEMORY_AND_DISK + val block = ArrayBuffer.fill(100)(0) + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.DISK_ONLY + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_AND_DISK + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages ") { + storageLevel = StorageLevel.DISK_ONLY + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") { + storageLevel = StorageLevel.MEMORY_AND_DISK + val block = ArrayBuffer.fill(100)(0) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(100)) + } + } + + test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { + storageLevel = StorageLevel.MEMORY_ONLY + blockManager = makeBlockManager(12000) + val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager + // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store + // this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block + // and hence it will not tryToPut this block , resulting the SparkException + withBlockManagerBasedBlockHandler { handler => + val thrown = intercept[SparkException] { + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + } + assert(thrown.getMessage === + "Could not store input-1-1000 to block manager with storage level " + storageLevel) + } + } + + test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_AND_DISK") { + storageLevel = StorageLevel.MEMORY_AND_DISK + blockManager = makeBlockManager(12000) + val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager + // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store + // this block in MEMORY , But BlockManager will be able to sereliaze this block to DISK + // and hence count returns correct value. + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(70)) + } + } + + test("WriteAheadLogBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { + storageLevel = StorageLevel.MEMORY_ONLY + blockManager = makeBlockManager(12000) + val block = List.fill(70)(new Array[Byte](100)) + // spark.storage.unrollFraction set to 0.4 for BlockManager + // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store + // this block in MEMORY , But BlockManager will be able to sereliaze this block to WAL + // and hence count returns correct value. + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) + assert(blockStoreResult.numRecords === Some(70)) + } + } + /** * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded * using the given verification function @@ -251,9 +430,20 @@ class ReceivedBlockHandlerSuite (blockIds, storeResults) } + /** Store block using a handler */ + private def storeBlock( + handler: ReceivedBlockHandler, + block: ReceivedBlock + ): ReceivedBlockStoreResult = { + val blockId = StreamBlockId(streamId, 1000L) + val blockStoreResult = handler.storeBlock(blockId, block) + logDebug("Done inserting") + blockStoreResult + } private def getWriteAheadLogFiles(): Seq[String] = { getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId)) } private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) } + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index be305b5e0dfea..f793a12843b2f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, - BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) + BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) } /** Get all the data written in the given write ahead log file. */ From 0153e7e21ead817959102c38c49b7f6567b510f4 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Mon, 8 Jun 2015 23:05:06 +0530 Subject: [PATCH 3/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed comments given by @zsxwing --- .../receiver/ReceivedBlockHandler.scala | 41 +++++++++---------- .../streaming/ReceivedBlockHandlerSuite.scala | 5 +-- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index dd0af251b46bc..b1ade4aeefdae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -69,16 +69,20 @@ private[streaming] class BlockManagerBasedBlockHandler( def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] - val countIterator = block match { - case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) - case IteratorBlock(iterator) => new CountingIterator(iterator) - case _ => null - } + val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) + val countIterator = new CountingIterator(arrayBuffer.iterator) + val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, + tellMaster = true) + numRecords = Some(countIterator.count) + putResult case IteratorBlock(iterator) => - blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) + val countIterator = new CountingIterator(iterator) + val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, + tellMaster = true) + numRecords = Some(countIterator.count) + putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => @@ -89,9 +93,6 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } - if(countIterator != null) { - numRecords = Some(countIterator.count) - } BlockManagerBasedStoreResult(blockId, numRecords) } @@ -166,17 +167,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler( def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] - val countIterator = block match { - case ArrayBufferBlock(arrayBuffer) => new CountingIterator(arrayBuffer.iterator) - case IteratorBlock(iterator) => new CountingIterator(iterator) - case _ => null - } // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => - blockManager.dataSerialize(blockId, countIterator) + val countIterator = new CountingIterator(arrayBuffer.iterator) + val serializedBlock = blockManager.dataSerialize(blockId, countIterator) + numRecords = Some(countIterator.count) + serializedBlock case IteratorBlock(iterator) => - blockManager.dataSerialize(blockId, countIterator) + val countIterator = new CountingIterator(iterator) + val serializedBlock = blockManager.dataSerialize(blockId, countIterator) + numRecords = Some(countIterator.count) + serializedBlock case ByteBufferBlock(byteBuffer) => byteBuffer case _ => @@ -201,9 +203,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) - if(countIterator != null) { - numRecords = Some(countIterator.count) - } WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) } @@ -226,7 +225,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler { /** * A utility that will wrap the Iterator to get the count */ -private class CountingIterator[T: Manifest](iterator: Iterator[T]) extends Iterator[T] { +private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { var count = 0 def hasNext(): Boolean = iterator.hasNext def isFullyConsumed: Boolean = !iterator.hasNext diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 790d8ab06d3b4..94dd768782919 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -60,7 +60,6 @@ class ReceivedBlockHandlerSuite var rpcEnv: RpcEnv = null var blockManagerMaster: BlockManagerMaster = null var blockManager: BlockManager = null - var handler: ReceivedBlockHandler = null var tempDirectory: File = null var storageLevel = StorageLevel.MEMORY_ONLY_SER @@ -190,9 +189,7 @@ class ReceivedBlockHandlerSuite test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") { storageLevel = StorageLevel.MEMORY_ONLY // Create a non-trivial (not all zeros) byte array - var counter = 0.toByte - def incr: Byte = {counter = (counter + 1).toByte; counter;} - val bytes = Array.fill[Byte](100)(incr) + val bytes = Array.tabulate(100)(i => i.toByte) val byteBufferBlock = ByteBuffer.wrap(bytes) withBlockManagerBasedBlockHandler { handler => val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) From fceac725d8e8fe37146b78e8ae5de32ec52d400f Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Tue, 9 Jun 2015 10:12:49 +0530 Subject: [PATCH 4/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../spark/streaming/receiver/ReceivedBlockHandler.scala | 2 +- .../spark/streaming/ReceivedBlockHandlerSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index b1ade4aeefdae..826842a8acd8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -230,7 +230,7 @@ private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { def hasNext(): Boolean = iterator.hasNext def isFullyConsumed: Boolean = !iterator.hasNext def next(): T = { - count+=1 + count += 1 iterator.next() } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 94dd768782919..73cb12607a631 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -63,7 +63,7 @@ class ReceivedBlockHandlerSuite var tempDirectory: File = null var storageLevel = StorageLevel.MEMORY_ONLY_SER - private def makeBlockManager( + private def createBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { conf.set("spark.storage.unrollMemoryThreshold", "512") @@ -307,7 +307,7 @@ class ReceivedBlockHandlerSuite test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { storageLevel = StorageLevel.MEMORY_ONLY - blockManager = makeBlockManager(12000) + blockManager = createBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store @@ -324,7 +324,7 @@ class ReceivedBlockHandlerSuite test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_AND_DISK") { storageLevel = StorageLevel.MEMORY_AND_DISK - blockManager = makeBlockManager(12000) + blockManager = createBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store @@ -338,7 +338,7 @@ class ReceivedBlockHandlerSuite test("WriteAheadLogBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { storageLevel = StorageLevel.MEMORY_ONLY - blockManager = makeBlockManager(12000) + blockManager = createBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store From 5a8344ae2258ed5c2af0c1c5231a54e88b4bba62 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Wed, 10 Jun 2015 06:57:12 +0530 Subject: [PATCH 5/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 count --- .../streaming/receiver/ReceivedBlockHandler.scala | 2 ++ .../streaming/ReceivedBlockHandlerSuite.scala | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 826842a8acd8e..ade7d6cbdf5d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -84,6 +84,7 @@ private[streaming] class BlockManagerBasedBlockHandler( numRecords = Some(countIterator.count) putResult case ByteBufferBlock(byteBuffer) => + numRecords = Some(1) blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( @@ -180,6 +181,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( numRecords = Some(countIterator.count) serializedBlock case ByteBufferBlock(byteBuffer) => + numRecords = Some(1) byteBuffer case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 73cb12607a631..e0b816ed917fa 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -193,7 +193,20 @@ class ReceivedBlockHandlerSuite val byteBufferBlock = ByteBuffer.wrap(bytes) withBlockManagerBasedBlockHandler { handler => val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) - assert(blockStoreResult.numRecords === None) + // ByteBufferBlock is counted as single record + assert(blockStoreResult.numRecords === Some(1)) + } + } + + test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") { + storageLevel = StorageLevel.MEMORY_ONLY + // Create a non-trivial (not all zeros) byte array + val bytes = Array.tabulate(100)(i => i.toByte) + val byteBufferBlock = ByteBuffer.wrap(bytes) + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) + // ByteBufferBlock is counted as single record + assert(blockStoreResult.numRecords === Some(1)) } } From f37cfd85db90d982daf806267a9247c74e91c81c Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Fri, 12 Jun 2015 18:33:28 +0530 Subject: [PATCH 6/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../receiver/ReceivedBlockHandler.scala | 36 +-- .../streaming/ReceivedBlockHandlerSuite.scala | 243 +++++++----------- 2 files changed, 109 insertions(+), 170 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index ade7d6cbdf5d9..495148ed309db 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -54,8 +54,8 @@ private[streaming] trait ReceivedBlockHandler { * that stores the metadata related to storage of blocks using * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] */ -private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId, - numRecords: Option[Long]) +private[streaming] case class BlockManagerBasedStoreResult( + blockId: StreamBlockId, numRecords: Option[Long]) extends ReceivedBlockStoreResult @@ -68,23 +68,21 @@ private[streaming] class BlockManagerBasedBlockHandler( extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => - val countIterator = new CountingIterator(arrayBuffer.iterator) - val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, + numRecords = Some(arrayBuffer.size.toLong) + blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) - numRecords = Some(countIterator.count) - putResult case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) - numRecords = Some(countIterator.count) + numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => - numRecords = Some(1) blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( @@ -171,17 +169,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => - val countIterator = new CountingIterator(arrayBuffer.iterator) - val serializedBlock = blockManager.dataSerialize(blockId, countIterator) - numRecords = Some(countIterator.count) - serializedBlock + numRecords = Some(arrayBuffer.size.toLong) + blockManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val serializedBlock = blockManager.dataSerialize(blockId, countIterator) - numRecords = Some(countIterator.count) + numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => - numRecords = Some(1) byteBuffer case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") @@ -228,11 +223,18 @@ private[streaming] object WriteAheadLogBasedBlockHandler { * A utility that will wrap the Iterator to get the count */ private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] { - var count = 0 + private var _count = 0 + + private def isFullyConsumed: Boolean = !iterator.hasNext + def hasNext(): Boolean = iterator.hasNext - def isFullyConsumed: Boolean = !iterator.hasNext + + def count(): Option[Long] = { + if (isFullyConsumed) Some(_count) else None + } + def next(): T = { - count += 1 + _count += 1 iterator.next() } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index e0b816ed917fa..321509a0535a7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -186,146 +186,62 @@ class ReceivedBlockHandlerSuite } } - test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - // Create a non-trivial (not all zeros) byte array - val bytes = Array.tabulate(100)(i => i.toByte) - val byteBufferBlock = ByteBuffer.wrap(bytes) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) - // ByteBufferBlock is counted as single record - assert(blockStoreResult.numRecords === Some(1)) - } - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ByteBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - // Create a non-trivial (not all zeros) byte array - val bytes = Array.tabulate(100)(i => i.toByte) - val byteBufferBlock = ByteBuffer.wrap(bytes) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ByteBufferBlock(byteBufferBlock)) - // ByteBufferBlock is counted as single record - assert(blockStoreResult.numRecords === Some(1)) - } - } - - test("BlockManagerBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.DISK_ONLY - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_AND_DISK - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages") { - storageLevel = StorageLevel.DISK_ONLY - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") { - storageLevel = StorageLevel.MEMORY_AND_DISK - val block = ArrayBuffer.fill(100)(0) - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-DISK_ONLY-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.DISK_ONLY - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-ArrayBufferBlock - count messages") { - storageLevel = StorageLevel.MEMORY_AND_DISK - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, ArrayBufferBlock(block)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY-IteratorBlock - count messages") { - storageLevel = StorageLevel.MEMORY_ONLY - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-DISK_ONLY-IteratorBlock - count messages ") { - storageLevel = StorageLevel.DISK_ONLY - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_AND_DISK-IteratorBlock - count messages") { - storageLevel = StorageLevel.MEMORY_AND_DISK - val block = ArrayBuffer.fill(100)(0) - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(100)) - } - } - - test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { + test("BlockManagerBasedBlockHandler - count messages") { + // ByteBufferBlock-MEMORY_ONLY + testRecordcount(true, StorageLevel.MEMORY_ONLY, + ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) + // ArrayBufferBlock-MEMORY_ONLY + testRecordcount(true, StorageLevel.MEMORY_ONLY, + ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) + // ArrayBufferBlock-DISK_ONLY + testRecordcount(true, StorageLevel.DISK_ONLY, + ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50)) + // ArrayBufferBlock-MEMORY_AND_DISK + testRecordcount(true, StorageLevel.MEMORY_AND_DISK, + ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75)) + // IteratorBlock-MEMORY_ONLY + testRecordcount(true, StorageLevel.MEMORY_ONLY, + IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) + // IteratorBlock-DISK_ONLY + testRecordcount(true, StorageLevel.DISK_ONLY, + IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125)) + // IteratorBlock-MEMORY_AND_DISK + testRecordcount(true, StorageLevel.MEMORY_AND_DISK, + IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150)) + } + + test("WriteAheadLogBasedBlockHandler - count messages") { + // ByteBufferBlock-MEMORY_ONLY + testRecordcount(false, StorageLevel.MEMORY_ONLY, + ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) + // ArrayBufferBlock-MEMORY_ONLY + testRecordcount(false, StorageLevel.MEMORY_ONLY, + ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) + // ArrayBufferBlock-DISK_ONLY + testRecordcount(false, StorageLevel.DISK_ONLY, + ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50)) + // ArrayBufferBlock-MEMORY_AND_DISK + testRecordcount(false, StorageLevel.MEMORY_AND_DISK, + ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75)) + // IteratorBlock-MEMORY_ONLY + testRecordcount(false, StorageLevel.MEMORY_ONLY, + IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) + // IteratorBlock-DISK_ONLY + testRecordcount(false, StorageLevel.DISK_ONLY, + IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125)) + // IteratorBlock-MEMORY_AND_DISK + testRecordcount(false, StorageLevel.MEMORY_AND_DISK, + IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150)) + } + + test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") { storageLevel = StorageLevel.MEMORY_ONLY blockManager = createBlockManager(12000) val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store + // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store // this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block - // and hence it will not tryToPut this block , resulting the SparkException + // and hence it will not tryToPut this block, resulting the SparkException withBlockManagerBasedBlockHandler { handler => val thrown = intercept[SparkException] { val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) @@ -335,34 +251,55 @@ class ReceivedBlockHandlerSuite } } - test("BlockManagerBasedBlockHandler - isFullyConsumed-MEMORY_AND_DISK") { - storageLevel = StorageLevel.MEMORY_AND_DISK + test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") { blockManager = createBlockManager(12000) - val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store - // this block in MEMORY , But BlockManager will be able to sereliaze this block to DISK + // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store + // this block in MEMORY, But BlockManager will be able to sereliaze this block to DISK // and hence count returns correct value. - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(70)) - } + testRecordcount(true, StorageLevel.MEMORY_AND_DISK, + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) } - test("WriteAheadLogBasedBlockHandler - isFullyConsumed-MEMORY_ONLY") { - storageLevel = StorageLevel.MEMORY_ONLY + test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") { blockManager = createBlockManager(12000) - val block = List.fill(70)(new Array[Byte](100)) // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll , there is not enough space to store - // this block in MEMORY , But BlockManager will be able to sereliaze this block to WAL + // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store + // this block in MEMORY, But BlockManager will be able to sereliaze this block to WAL // and hence count returns correct value. - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - assert(blockStoreResult.numRecords === Some(70)) - } + testRecordcount(false, StorageLevel.MEMORY_ONLY, + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) } + /** + * Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks + * and verify the correct record count + */ + private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean, + sLevel: StorageLevel, + receivedBlock: ReceivedBlock, + bManager: BlockManager, + expectedNumRecords: Option[Long] + ) { + storageLevel = sLevel + blockManager = bManager + if (isBlockManagedBasedBlockHandler) { + // test received block with BlockManager based handler + withBlockManagerBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, receivedBlock) + assert(blockStoreResult.numRecords === expectedNumRecords) + } + } else { + // test received block with WAL based handler + withWriteAheadLogBasedBlockHandler { handler => + val blockStoreResult = storeBlock(handler, receivedBlock) + assert(blockStoreResult.numRecords === expectedNumRecords) + } + } + // Removing the Block Id to use same blockManager for next test + blockManager.removeBlock(StreamBlockId(streamId, 1000L), true) +} + /** * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded * using the given verification function From f6cb6b5351b472ad8cb9c344aa58360755e5b660 Mon Sep 17 00:00:00 2001 From: Dibyendu Bhattacharya Date: Sat, 13 Jun 2015 22:47:40 +0530 Subject: [PATCH 7/7] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI --- .../receiver/ReceivedBlockHandler.scala | 4 +- .../streaming/ReceivedBlockHandlerSuite.scala | 211 +++++++++--------- 2 files changed, 108 insertions(+), 107 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 495148ed309db..c8dd6e06812dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -169,8 +169,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => - numRecords = Some(arrayBuffer.size.toLong) - blockManager.dataSerialize(blockId, arrayBuffer.iterator) + numRecords = Some(arrayBuffer.size.toLong) + blockManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val serializedBlock = blockManager.dataSerialize(blockId, countIterator) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 321509a0535a7..3b6cca7b5142f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -56,24 +56,13 @@ class ReceivedBlockHandlerSuite val serializer = new KryoSerializer(conf) val manualClock = new ManualClock val blockManagerSize = 10000000 + val blockManagerBuffer = new ArrayBuffer[BlockManager]() var rpcEnv: RpcEnv = null var blockManagerMaster: BlockManagerMaster = null var blockManager: BlockManager = null + var storageLevel: StorageLevel = null var tempDirectory: File = null - var storageLevel = StorageLevel.MEMORY_ONLY_SER - - private def createBlockManager( - maxMem: Long, - name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { - conf.set("spark.storage.unrollMemoryThreshold", "512") - conf.set("spark.storage.unrollFraction", "0.4") - val transfer = new NioBlockTransferService(conf, securityMgr) - val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr, 0) - manager.initialize("app-id") - manager - } before { rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) @@ -82,20 +71,21 @@ class ReceivedBlockHandlerSuite blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) - blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer, - blockManagerSize, conf, mapOutputTracker, shuffleManager, - new NioBlockTransferService(conf, securityMgr), securityMgr, 0) - blockManager.initialize("app-id") + storageLevel = StorageLevel.MEMORY_ONLY_SER + blockManager = createBlockManager(blockManagerSize, conf) tempDirectory = Utils.createTempDir() manualClock.setTime(0) } after { - if (blockManager != null) { - blockManager.stop() - blockManager = null + for ( blockManager <- blockManagerBuffer ) { + if (blockManager != null) { + blockManager.stop() + } } + blockManager = null + blockManagerBuffer.clear() if (blockManagerMaster != null) { blockManagerMaster.stop() blockManagerMaster = null @@ -186,89 +176,87 @@ class ReceivedBlockHandlerSuite } } - test("BlockManagerBasedBlockHandler - count messages") { - // ByteBufferBlock-MEMORY_ONLY - testRecordcount(true, StorageLevel.MEMORY_ONLY, - ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) - // ArrayBufferBlock-MEMORY_ONLY - testRecordcount(true, StorageLevel.MEMORY_ONLY, - ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) - // ArrayBufferBlock-DISK_ONLY - testRecordcount(true, StorageLevel.DISK_ONLY, - ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50)) - // ArrayBufferBlock-MEMORY_AND_DISK - testRecordcount(true, StorageLevel.MEMORY_AND_DISK, - ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75)) - // IteratorBlock-MEMORY_ONLY - testRecordcount(true, StorageLevel.MEMORY_ONLY, - IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) - // IteratorBlock-DISK_ONLY - testRecordcount(true, StorageLevel.DISK_ONLY, - IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125)) - // IteratorBlock-MEMORY_AND_DISK + test("Test Block - count messages") { + // Test count with BlockManagedBasedBlockHandler + testCountWithBlockManagerBasedBlockHandler(true) + // Test count with WriteAheadLogBasedBlockHandler + testCountWithBlockManagerBasedBlockHandler(false) + } + + test("Test Block - isFullyConsumed") { + val sparkConf = new SparkConf() + sparkConf.set("spark.storage.unrollMemoryThreshold", "512") + // spark.storage.unrollFraction set to 0.4 for BlockManager + sparkConf.set("spark.storage.unrollFraction", "0.4") + // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll + blockManager = createBlockManager(12000, sparkConf) + + // there is not enough space to store this block in MEMORY, + // But BlockManager will be able to sereliaze this block to WAL + // and hence count returns correct value. + testRecordcount(false, StorageLevel.MEMORY_ONLY, + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) + + // there is not enough space to store this block in MEMORY, + // But BlockManager will be able to sereliaze this block to DISK + // and hence count returns correct value. testRecordcount(true, StorageLevel.MEMORY_AND_DISK, - IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150)) + IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) + + // there is not enough space to store this block With MEMORY_ONLY StorageLevel. + // BlockManager will not be able to unroll this block + // and hence it will not tryToPut this block, resulting the SparkException + storageLevel = StorageLevel.MEMORY_ONLY + withBlockManagerBasedBlockHandler { handler => + val thrown = intercept[SparkException] { + storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator)) + } + } } - test("WriteAheadLogBasedBlockHandler - count messages") { + private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) { // ByteBufferBlock-MEMORY_ONLY - testRecordcount(false, StorageLevel.MEMORY_ONLY, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, + ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) + // ByteBufferBlock-MEMORY_ONLY_SER + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None) // ArrayBufferBlock-MEMORY_ONLY - testRecordcount(false, StorageLevel.MEMORY_ONLY, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, + ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) + // ArrayBufferBlock-MEMORY_ONLY_SER + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25)) // ArrayBufferBlock-DISK_ONLY - testRecordcount(false, StorageLevel.DISK_ONLY, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY, ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50)) // ArrayBufferBlock-MEMORY_AND_DISK - testRecordcount(false, StorageLevel.MEMORY_AND_DISK, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK, ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75)) // IteratorBlock-MEMORY_ONLY - testRecordcount(false, StorageLevel.MEMORY_ONLY, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY, + IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) + // IteratorBlock-MEMORY_ONLY_SER + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER, IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100)) // IteratorBlock-DISK_ONLY - testRecordcount(false, StorageLevel.DISK_ONLY, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY, IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125)) // IteratorBlock-MEMORY_AND_DISK - testRecordcount(false, StorageLevel.MEMORY_AND_DISK, + testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK, IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150)) } - test("BlockManagerBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") { - storageLevel = StorageLevel.MEMORY_ONLY - blockManager = createBlockManager(12000) - val block = List.fill(70)(new Array[Byte](100)) - // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store - // this block With MEMORY_ONLY StorageLevel. BlockManager will not be able to unroll this block - // and hence it will not tryToPut this block, resulting the SparkException - withBlockManagerBasedBlockHandler { handler => - val thrown = intercept[SparkException] { - val blockStoreResult = storeBlock(handler, IteratorBlock(block.iterator)) - } - assert(thrown.getMessage === - "Could not store input-1-1000 to block manager with storage level " + storageLevel) - } - } - - test("BlockManagerBasedBlockHandler-MEMORY_AND_DISK - isFullyConsumed") { - blockManager = createBlockManager(12000) - // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store - // this block in MEMORY, But BlockManager will be able to sereliaze this block to DISK - // and hence count returns correct value. - testRecordcount(true, StorageLevel.MEMORY_AND_DISK, - IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) - } - - test("WriteAheadLogBasedBlockHandler-MEMORY_ONLY - isFullyConsumed") { - blockManager = createBlockManager(12000) - // spark.storage.unrollFraction set to 0.4 for BlockManager - // With 12000 * 0.4 = 4800 bytes of free space for unroll, there is not enough space to store - // this block in MEMORY, But BlockManager will be able to sereliaze this block to WAL - // and hence count returns correct value. - testRecordcount(false, StorageLevel.MEMORY_ONLY, - IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70)) + private def createBlockManager( + maxMem: Long, + conf: SparkConf, + name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + val transfer = new NioBlockTransferService(conf, securityMgr) + val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + manager.initialize("app-id") + blockManagerBuffer += manager + manager } /** @@ -281,24 +269,36 @@ class ReceivedBlockHandlerSuite bManager: BlockManager, expectedNumRecords: Option[Long] ) { - storageLevel = sLevel blockManager = bManager - if (isBlockManagedBasedBlockHandler) { - // test received block with BlockManager based handler - withBlockManagerBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, receivedBlock) - assert(blockStoreResult.numRecords === expectedNumRecords) - } - } else { - // test received block with WAL based handler - withWriteAheadLogBasedBlockHandler { handler => - val blockStoreResult = storeBlock(handler, receivedBlock) - assert(blockStoreResult.numRecords === expectedNumRecords) + storageLevel = sLevel + var bId: StreamBlockId = null + try { + if (isBlockManagedBasedBlockHandler) { + // test received block with BlockManager based handler + withBlockManagerBasedBlockHandler { handler => + val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) + bId = blockId + assert(blockStoreResult.numRecords === expectedNumRecords, + "Message count not matches for a " + + receivedBlock.getClass.getName + + " being inserted using BlockManagerBasedBlockHandler with " + sLevel) + } + } else { + // test received block with WAL based handler + withWriteAheadLogBasedBlockHandler { handler => + val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock) + bId = blockId + assert(blockStoreResult.numRecords === expectedNumRecords, + "Message count not matches for a " + + receivedBlock.getClass.getName + + " being inserted using WriteAheadLogBasedBlockHandler with " + sLevel) + } } - } - // Removing the Block Id to use same blockManager for next test - blockManager.removeBlock(StreamBlockId(streamId, 1000L), true) -} + } finally { + // Removing the Block Id to use same blockManager for next test + blockManager.removeBlock(bId, true) + } + } /** * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded @@ -377,16 +377,17 @@ class ReceivedBlockHandlerSuite (blockIds, storeResults) } - /** Store block using a handler */ - private def storeBlock( + /** Store single block using a handler */ + private def storeSingleBlock( handler: ReceivedBlockHandler, block: ReceivedBlock - ): ReceivedBlockStoreResult = { - val blockId = StreamBlockId(streamId, 1000L) + ): (StreamBlockId, ReceivedBlockStoreResult) = { + val blockId = generateBlockId val blockStoreResult = handler.storeBlock(blockId, block) logDebug("Done inserting") - blockStoreResult + (blockId, blockStoreResult) } + private def getWriteAheadLogFiles(): Seq[String] = { getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId)) }