From 1802ff0e12359240ebc40223dff351e3d4886002 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Tue, 9 May 2017 23:11:12 +0800 Subject: [PATCH 1/8] optimize rdd cartesian with caching the block in local --- .../org/apache/spark/rdd/CartesianRDD.scala | 73 ++++++++++++++++++- .../apache/spark/storage/BlockManager.scala | 37 ++++++++++ 2 files changed, 107 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 57108dcedcf0c..2cf950cc1c133 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -22,7 +22,8 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark._ -import org.apache.spark.util.Utils +import org.apache.spark.storage.{RDDBlockId, StorageLevel} +import org.apache.spark.util.{CompletionIterator, Utils} private[spark] class CartesianPartition( @@ -71,9 +72,75 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { + val blockManager = SparkEnv.get.blockManager val currSplit = split.asInstanceOf[CartesianPartition] - for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index) + // Whether the block persisted by the user with valid StorageLevel. + val persistedInLocal = blockManager.getStatus(blockId2) match { + case Some(result) => + // This meaning if the block is cached by the user? If it's valid, it shouldn't be + // removed by other task. + result.storageLevel.isValid + case None => false + } + var cachedInLocal = false + + // Try to get data from the local, otherwise it will be cached to the local. + def getOrElseCache( + rdd: RDD[U], + partition: Partition, + context: TaskContext, + level: StorageLevel): Iterator[U] = { + // Because the getLocalValues return a CompletionIterator, and it will release the read + // block after the iterator finish using. So there should update the flag. + cachedInLocal = blockManager.getStatus(blockId2) match { + case Some(_) => true + case None => false + } + + if (persistedInLocal || cachedInLocal) { + blockManager.getLocalValues(blockId2) match { + case Some(result) => + val existingMetrics = context.taskMetrics().inputMetrics + existingMetrics.incBytesRead(result.bytes) + return new InterruptibleIterator[U](context, result.data.asInstanceOf[Iterator[U]]) { + override def next(): U = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } + case None => + if (persistedInLocal) { + throw new SparkException(s"Block $blockId2 was not found even though it's persisted") + } + } + } + + val iterator = rdd.iterator(partition, context) + val cachedResult = blockManager.putIterator[U](blockId2, iterator, level, false) match { + case true => + cachedInLocal = true + "successful" + case false => "failed" + } + + logInfo(s"Cache the block $blockId2 to local $cachedResult.") + iterator + } + + def removeCachedBlock(): Unit = { + val blockManager = SparkEnv.get.blockManager + if (!persistedInLocal || cachedInLocal || blockManager.isRemovable(blockId2)) { + blockManager.removeOrMarkAsRemovable(blockId2, false) + } + } + + val resultIter = + for (x <- rdd1.iterator(currSplit.s1, context); + y <- getOrElseCache(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) + yield (x, y) + + CompletionIterator[(T, U), Iterator[(T, U)]](resultIter, removeCachedBlock()) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 33ce30c58e1ad..475d36a5e4859 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io._ import java.nio.ByteBuffer import java.nio.channels.Channels +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.mutable.HashMap @@ -202,6 +203,9 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + // Record the removable block. + private lazy val removableBlocks = ConcurrentHashMap.newKeySet[BlockId]() + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -1464,6 +1468,38 @@ private[spark] class BlockManager( } } + /** + * Whether the block is removable. + */ + def isRemovable(blockId: BlockId): Boolean = { + removableBlocks.contains(blockId) + } + + /** + * Try to remove the block without blocking. Mark it as removable if it is in use. + */ + def removeOrMarkAsRemovable(blockId: BlockId, tellMaster: Boolean = true): Unit = { + // Try to lock for writing without blocking + blockInfoManager.lockForWriting(blockId, false) match { + case None => + // Because lock in unblocking manner, so the block may not exist or be used by other task. + blockInfoManager.get(blockId) match { + case None => + logWarning(s"Asked to remove block $blockId, which does not exist") + removableBlocks.remove(blockId) + case Some(_) => + // The block is in use, mark it as removable + logDebug(s"Marking block $blockId as removable") + removableBlocks.add(blockId) + } + case Some(info) => + logDebug(s"Removing block $blockId") + removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster) + addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty) + removableBlocks.remove(blockId) + } + } + private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = { Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) @@ -1481,6 +1517,7 @@ private[spark] class BlockManager( // Closing should be idempotent, but maybe not for the NioBlockTransferService. shuffleClient.close() } + removableBlocks.clear() diskBlockManager.stop() rpcEnv.stop(slaveEndpoint) blockInfoManager.clear() From 08e25c9c6ca723383fc3af20137db356a8f65262 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Tue, 9 May 2017 23:20:05 +0800 Subject: [PATCH 2/8] add test case --- .../spark/storage/BlockManagerSuite.scala | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1e7bcdb6740f6..07301b6b3f1dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -18,20 +18,19 @@ package org.apache.spark.storage import java.nio.ByteBuffer +import java.util.Properties import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.language.implicitConversions import scala.language.postfixOps import scala.reflect.ClassTag - import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ - import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.DataReadMethod @@ -67,6 +66,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true) val shuffleManager = new SortShuffleManager(new SparkConf(false)) + private implicit val ec = ExecutionContext.global + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) @@ -101,6 +102,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE blockManager } + private def withTaskId[T](taskAttemptId: Long)(block: => T): T = { + try { + TaskContext.setTaskContext( + new TaskContextImpl(0, 0, taskAttemptId, 0, null, new Properties, null)) + block + } finally { + TaskContext.unset() + } + } + override def beforeEach(): Unit = { super.beforeEach() // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case @@ -1280,6 +1291,40 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("remote block without blocking") { + store = makeBlockManager(8000, "executor1") + val arr = new Array[Byte](4000) + store.registerTask(0) + store.registerTask(1) + withTaskId(0) { + store.putSingle("block", arr, StorageLevel.MEMORY_AND_DISK, true) + // lock the block with read lock + store.get("block") + } + val future = Future { + withTaskId(1) { + // block is in use, mark it as removable + store.removeOrMarkAsRemovable("block") + store.isRemovable("block") + } + } + Thread.sleep(300) + assert(store.getStatus("block").isDefined, "block should not be removed") + assert(ThreadUtils.awaitResult(future, 1.seconds), "block should be marked as removable") + withTaskId(0) { + store.releaseLock("block") + } + val future1 = Future { + withTaskId(1) { + // remove it + store.removeOrMarkAsRemovable("block") + !store.isRemovable("block") + } + } + assert(ThreadUtils.awaitResult(future1, 1.seconds), "block should not be marked as removable") + assert(master.getLocations("block").isEmpty, "block should be removed") + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 From 0f812d944c1db12faa7f03059ec49c2c377dfcc9 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 10 May 2017 11:24:28 +0800 Subject: [PATCH 3/8] put block into local with read block --- .../org/apache/spark/rdd/CartesianRDD.scala | 73 ++++++++++--------- .../apache/spark/storage/BlockManager.scala | 6 +- .../spark/storage/BlockManagerSuite.scala | 1 - 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 2cf950cc1c133..cc759f9774b71 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -75,14 +75,6 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( val blockManager = SparkEnv.get.blockManager val currSplit = split.asInstanceOf[CartesianPartition] val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index) - // Whether the block persisted by the user with valid StorageLevel. - val persistedInLocal = blockManager.getStatus(blockId2) match { - case Some(result) => - // This meaning if the block is cached by the user? If it's valid, it shouldn't be - // removed by other task. - result.storageLevel.isValid - case None => false - } var cachedInLocal = false // Try to get data from the local, otherwise it will be cached to the local. @@ -91,45 +83,58 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( partition: Partition, context: TaskContext, level: StorageLevel): Iterator[U] = { - // Because the getLocalValues return a CompletionIterator, and it will release the read - // block after the iterator finish using. So there should update the flag. - cachedInLocal = blockManager.getStatus(blockId2) match { - case Some(_) => true - case None => false - } - - if (persistedInLocal || cachedInLocal) { - blockManager.getLocalValues(blockId2) match { - case Some(result) => - val existingMetrics = context.taskMetrics().inputMetrics - existingMetrics.incBytesRead(result.bytes) - return new InterruptibleIterator[U](context, result.data.asInstanceOf[Iterator[U]]) { - override def next(): U = { - existingMetrics.incRecordsRead(1) - delegate.next() - } - } - case None => - if (persistedInLocal) { - throw new SparkException(s"Block $blockId2 was not found even though it's persisted") - } - } + getLocalValues() match { + case Some(result) => + cachedInLocal = true + return result + case None => + cachedInLocal = false } val iterator = rdd.iterator(partition, context) - val cachedResult = blockManager.putIterator[U](blockId2, iterator, level, false) match { + // Keep read lock, because next we need read it. + val cachedResult = blockManager.putIterator[U](blockId2, iterator, level, false, + true) match { case true => cachedInLocal = true "successful" - case false => "failed" + case false => + cachedInLocal = false + "failed" } logInfo(s"Cache the block $blockId2 to local $cachedResult.") - iterator + getLocalValues() match { + // We don't need release the read lock, it will release after the iterator completion. + case Some(result) => result + case None => + throw new SparkException(s"Block $blockId2 was not found even though it's read-locked") + } + } + + def getLocalValues(): Option[Iterator[U]] = { + blockManager.getLocalValues(blockId2) match { + case Some(result) => + val existingMetrics = context.taskMetrics().inputMetrics + existingMetrics.incBytesRead(result.bytes) + val localIter = + new InterruptibleIterator[U](context, result.data.asInstanceOf[Iterator[U]]) { + override def next(): U = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } + Some(localIter) + case None => + None + } } def removeCachedBlock(): Unit = { val blockManager = SparkEnv.get.blockManager + // Whether the block it persisted by the user. + val persistedInLocal = + blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId) if (!persistedInLocal || cachedInLocal || blockManager.isRemovable(blockId2)) { blockManager.removeOrMarkAsRemovable(blockId2, false) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 475d36a5e4859..2569c5d060cdc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -791,9 +791,11 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[T], level: StorageLevel, - tellMaster: Boolean = true): Boolean = { + tellMaster: Boolean = true, + keepReadLock: Boolean = false): Boolean = { require(values != null, "Values is null") - doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { + doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster, + keepReadLock)match { case None => true case Some(iter) => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 07301b6b3f1dd..0a3ba12e84c3b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1352,7 +1352,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] = { - import scala.concurrent.ExecutionContext.Implicits.global Future {} } From 08c1849e7fbc3edd6d4c5c60e2fab93e9cb1ee03 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 10 May 2017 15:47:46 +0800 Subject: [PATCH 4/8] follow the code style and add some comments --- .../org/apache/spark/rdd/CartesianRDD.scala | 23 ++++++++----------- .../spark/storage/BlockManagerSuite.scala | 4 +++- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index cc759f9774b71..8e564b4eff46e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -85,25 +85,21 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( level: StorageLevel): Iterator[U] = { getLocalValues() match { case Some(result) => - cachedInLocal = true return result - case None => - cachedInLocal = false + case None => // do nothing } val iterator = rdd.iterator(partition, context) - // Keep read lock, because next we need read it. - val cachedResult = blockManager.putIterator[U](blockId2, iterator, level, false, - true) match { - case true => - cachedInLocal = true - "successful" + // Keep read lock, because next we need read it. And don't tell master. + blockManager.putIterator[U](blockId2, iterator, level, false, true) match { + case true => cachedInLocal = true case false => - cachedInLocal = false - "failed" + // There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to + // cache it. + throw new SparkException(s"Cache block $blockId2 in local failed even though it's $level") } - logInfo(s"Cache the block $blockId2 to local $cachedResult.") + logInfo(s"Cache the block $blockId2 to local successful.") getLocalValues() match { // We don't need release the read lock, it will release after the iterator completion. case Some(result) => result @@ -112,6 +108,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( } } + // Get block from local, and update the metrics. def getLocalValues(): Option[Iterator[U]] = { blockManager.getLocalValues(blockId2) match { case Some(result) => @@ -135,7 +132,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( // Whether the block it persisted by the user. val persistedInLocal = blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId) - if (!persistedInLocal || cachedInLocal || blockManager.isRemovable(blockId2)) { + if (!persistedInLocal && (cachedInLocal || blockManager.isRemovable(blockId2))) { blockManager.removeOrMarkAsRemovable(blockId2, false) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0a3ba12e84c3b..10f43bf1845fe 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -21,16 +21,18 @@ import java.nio.ByteBuffer import java.util.Properties import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps import scala.reflect.ClassTag + import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ + import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.DataReadMethod From 89a22ef273bf798d722c21be390d8e9c5c3c9fb4 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 12 May 2017 10:12:51 +0800 Subject: [PATCH 5/8] release the hold lock --- .../main/scala/org/apache/spark/rdd/CartesianRDD.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 8e564b4eff46e..38e672480d89f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -76,6 +76,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( val currSplit = split.asInstanceOf[CartesianPartition] val blockId2 = RDDBlockId(rdd2.id, currSplit.s2.index) var cachedInLocal = false + var holdReadLock = false // Try to get data from the local, otherwise it will be cached to the local. def getOrElseCache( @@ -92,7 +93,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( val iterator = rdd.iterator(partition, context) // Keep read lock, because next we need read it. And don't tell master. blockManager.putIterator[U](blockId2, iterator, level, false, true) match { - case true => cachedInLocal = true + case true => + cachedInLocal = true + // After we cached the block, we also hold the block read lock until this task finished. + holdReadLock = true case false => // There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to // cache it. @@ -129,6 +133,10 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( def removeCachedBlock(): Unit = { val blockManager = SparkEnv.get.blockManager + if (holdReadLock) { + // If hold the read lock, we need release it. + blockManager.releaseLock(blockId2) + } // Whether the block it persisted by the user. val persistedInLocal = blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId) From 697ba33d5fe154bdba8dd2bf2a6909c8836beb10 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 18 May 2017 12:36:44 +0800 Subject: [PATCH 6/8] address comments --- .../org/apache/spark/rdd/CartesianRDD.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 38e672480d89f..5f0ea7cac3f8d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -92,15 +92,15 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( val iterator = rdd.iterator(partition, context) // Keep read lock, because next we need read it. And don't tell master. - blockManager.putIterator[U](blockId2, iterator, level, false, true) match { - case true => - cachedInLocal = true - // After we cached the block, we also hold the block read lock until this task finished. - holdReadLock = true - case false => - // There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to - // cache it. - throw new SparkException(s"Cache block $blockId2 in local failed even though it's $level") + val putResult = blockManager.putIterator[U](blockId2, iterator, level, false, true) + if (putResult) { + cachedInLocal = true + // After we cached the block, we also hold the block read lock until this task finished. + holdReadLock = true + } else { + // There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to + // cache it. + throw new SparkException(s"Cache block $blockId2 in local failed even though it's $level") } logInfo(s"Cache the block $blockId2 to local successful.") From c8222f419f360df129f698994d198fa23f69d991 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 19 May 2017 10:35:47 +0800 Subject: [PATCH 7/8] address the unit test error --- .../org/apache/spark/rdd/CartesianRDD.scala | 29 ++++++++++++------- .../metrics/InputOutputMetricsSuite.scala | 4 +++ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 5f0ea7cac3f8d..5c0f041a24aba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -87,29 +87,36 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( getLocalValues() match { case Some(result) => return result - case None => // do nothing + case None => if (holdReadLock) { + throw new SparkException(s"get() failed for block $blockId2 even though we held a lock") + } } val iterator = rdd.iterator(partition, context) + if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) { + // If the block is cached in local, wo shouldn't cache it again. + return iterator + } + // Keep read lock, because next we need read it. And don't tell master. - val putResult = blockManager.putIterator[U](blockId2, iterator, level, false, true) - if (putResult) { + val putSuccess = blockManager.putIterator[U](blockId2, iterator, level, false, true) + if (putSuccess) { cachedInLocal = true // After we cached the block, we also hold the block read lock until this task finished. holdReadLock = true + logInfo(s"Cache the block $blockId2 to local successful.") + val readLocalBlock = blockManager.getLocalValues(blockId2).getOrElse { + blockManager.releaseLock(blockId2) + throw new SparkException(s"get() failed for block $blockId2 even though we held a lock") + } + + new InterruptibleIterator[U](context, readLocalBlock.data.asInstanceOf[Iterator[U]]) } else { + blockManager.releaseLock(blockId2) // There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to // cache it. throw new SparkException(s"Cache block $blockId2 in local failed even though it's $level") } - - logInfo(s"Cache the block $blockId2 to local successful.") - getLocalValues() match { - // We don't need release the read lock, it will release after the iterator completion. - case Some(result) => result - case None => - throw new SparkException(s"Block $blockId2 was not found even though it's read-locked") - } } // Get block from local, and update the metrics. diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 5d522189a0c29..67b80701c9a84 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -198,8 +198,12 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext // write files to disk so we can read them later. sc.parallelize(cartVector).saveAsTextFile(cartFilePath) val aRdd = sc.textFile(cartFilePath, numPartitions) + aRdd.cache() + aRdd.count() val tmpRdd = sc.textFile(tmpFilePath, numPartitions) + tmpRdd.cache() + tmpRdd.count() val firstSize = runAndReturnBytesRead { aRdd.count() From f29a9dcc8af12026c4f70e28602d3e814491bd2b Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 19 May 2017 20:15:09 +0800 Subject: [PATCH 8/8] address comments --- .../org/apache/spark/rdd/CartesianRDD.scala | 50 ++++++++++++------- .../main/scala/org/apache/spark/rdd/RDD.scala | 7 ++- .../metrics/InputOutputMetricsSuite.scala | 4 -- 3 files changed, 36 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 5c0f041a24aba..88e4cefdace31 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -50,7 +50,8 @@ private[spark] class CartesianRDD[T: ClassTag, U: ClassTag]( sc: SparkContext, var rdd1 : RDD[T], - var rdd2 : RDD[U]) + var rdd2 : RDD[U], + val cacheFetchedInLocal: Boolean = false) extends RDD[(T, U)](sc, Nil) with Serializable { @@ -78,7 +79,8 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( var cachedInLocal = false var holdReadLock = false - // Try to get data from the local, otherwise it will be cached to the local. + // Try to get data from the local, otherwise it will be cached to the local if user set + // cacheFetchedInLocal as true. def getOrElseCache( rdd: RDD[U], partition: Partition, @@ -88,13 +90,16 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( case Some(result) => return result case None => if (holdReadLock) { + blockManager.releaseLock(blockId2) throw new SparkException(s"get() failed for block $blockId2 even though we held a lock") } } val iterator = rdd.iterator(partition, context) - if (rdd.getStorageLevel != StorageLevel.NONE || rdd.isCheckpointedAndMaterialized) { - // If the block is cached in local, wo shouldn't cache it again. + val status = blockManager.getStatus(blockId2) + if (!cacheFetchedInLocal || (status.isDefined && status.get.storageLevel.isValid)) { + // If user don't want cache the block fetched from remotely, just return it. + // Or if the block is cached in local, wo shouldn't cache it again. return iterator } @@ -138,26 +143,33 @@ class CartesianRDD[T: ClassTag, U: ClassTag]( } } - def removeCachedBlock(): Unit = { - val blockManager = SparkEnv.get.blockManager - if (holdReadLock) { - // If hold the read lock, we need release it. - blockManager.releaseLock(blockId2) - } - // Whether the block it persisted by the user. - val persistedInLocal = - blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId) - if (!persistedInLocal && (cachedInLocal || blockManager.isRemovable(blockId2))) { - blockManager.removeOrMarkAsRemovable(blockId2, false) - } - } - val resultIter = for (x <- rdd1.iterator(currSplit.s1, context); y <- getOrElseCache(rdd2, currSplit.s2, context, StorageLevel.MEMORY_AND_DISK)) yield (x, y) - CompletionIterator[(T, U), Iterator[(T, U)]](resultIter, removeCachedBlock()) + CompletionIterator[(T, U), Iterator[(T, U)]](resultIter, + removeCachedBlock(blockId2, holdReadLock, cachedInLocal)) + } + + /** + * Remove the cached block. If we hold the read lock, we also need release it. + */ + def removeCachedBlock( + blockId: RDDBlockId, + holdReadLock: Boolean, + cachedInLocal: Boolean): Unit = { + val blockManager = SparkEnv.get.blockManager + if (holdReadLock) { + // If hold the read lock, we need release it. + blockManager.releaseLock(blockId) + } + // Whether the block it persisted by the user. + val persistedInLocal = + blockManager.master.getLocations(blockId).contains(blockManager.blockManagerId) + if (!persistedInLocal && (cachedInLocal || blockManager.isRemovable(blockId))) { + blockManager.removeOrMarkAsRemovable(blockId, false) + } } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 63a87e7f09d85..3c64bf5e8b0dd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -670,9 +670,12 @@ abstract class RDD[T: ClassTag]( /** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. + * + * @param cacheFetchedInLocal Whether cache the remotely fetched block in local. */ - def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { - new CartesianRDD(sc, this, other) + def cartesian[U: ClassTag](other: RDD[U], + cacheFetchedInLocal: Boolean = false): RDD[(T, U)] = withScope { + new CartesianRDD(sc, this, other, cacheFetchedInLocal) } /** diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 67b80701c9a84..5d522189a0c29 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -198,12 +198,8 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext // write files to disk so we can read them later. sc.parallelize(cartVector).saveAsTextFile(cartFilePath) val aRdd = sc.textFile(cartFilePath, numPartitions) - aRdd.cache() - aRdd.count() val tmpRdd = sc.textFile(tmpFilePath, numPartitions) - tmpRdd.cache() - tmpRdd.count() val firstSize = runAndReturnBytesRead { aRdd.count()