From 3f2ac8d6d977da2577c56f3bfcd51e8b053d952d Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sun, 6 Mar 2016 19:30:43 +0800 Subject: [PATCH 1/4] temp patch for SPARK-13566 --- .../org/apache/spark/executor/Executor.scala | 12 +++++ .../apache/spark/storage/BlockManager.scala | 48 +++++++++++++++++-- .../spark/storage/BlockManagerSuite.scala | 26 ++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a7bb412e1c94e..69a34ddedb98f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -218,6 +218,7 @@ private[spark] class Executor( threwException = false res } finally { + val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" @@ -227,6 +228,17 @@ private[spark] class Executor( logError(errMsg) } } + + if (releasedLocks.nonEmpty) { + val errMsg = + s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + + releasedLocks.mkString("[", ", ", "]") + if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) { + throw new SparkException(errMsg) + } else { + logError(errMsg) + } + } } val taskFinish = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2cc2fd9ef0712..fdcc1c2fe8b18 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,12 +19,14 @@ package org.apache.spark.storage import java.io._ import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Random import scala.util.control.NonFatal +import scala.collection.JavaConverters._ import sun.nio.ch.DirectBuffer @@ -65,7 +67,7 @@ private[spark] class BlockManager( val master: BlockManagerMaster, defaultSerializer: Serializer, val conf: SparkConf, - memoryManager: MemoryManager, + val memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, @@ -164,6 +166,11 @@ private[spark] class BlockManager( * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) + // Blocks are removing by another thread + private val pendingToRemove = new ConcurrentHashMap[BlockId, Long]() + + private val NON_TASK_WRITER = -1024L + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -1025,7 +1032,7 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull // If the block has not already been dropped - if (info != null) { + if (info != null && !pendingToRemove.containsKey(blockId)) { info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. @@ -1051,11 +1058,13 @@ private[spark] class BlockManager( } blockIsUpdated = true } + pendingToRemove.put(blockId, currentTaskAttemptId) // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) + pendingToRemove.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { @@ -1080,6 +1089,7 @@ private[spark] class BlockManager( /** * Remove all blocks belonging to the given RDD. + * * @return The number of blocks removed. */ def removeRdd(rddId: Int): Int = { @@ -1108,11 +1118,14 @@ private[spark] class BlockManager( def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") val info = blockInfo.get(blockId).orNull - if (info != null) { + if (info != null && !pendingToRemove.containsKey(blockId)) { + pendingToRemove.put(blockId, currentTaskAttemptId) info.synchronized { + val level = info.level // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) + val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false + pendingToRemove.remove(blockId) + val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false val removedFromExternalBlockStore = if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { @@ -1147,9 +1160,11 @@ private[spark] class BlockManager( val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) if (time < cleanupTime && shouldDrop(id)) { + pendingToRemove.put(id, currentTaskAttemptId) info.synchronized { val level = info.level if (level.useMemory) { memoryStore.remove(id) } + pendingToRemove.remove(id) if (level.useDisk) { diskStore.remove(id) } if (level.useOffHeap) { externalBlockStore.remove(id) } iterator.remove() @@ -1161,6 +1176,28 @@ private[spark] class BlockManager( } } + private def currentTaskAttemptId: Long = { + Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER) + } + + /** + * Release all lock held by the given task, clearing that task's pin bookkeeping + * structures and updating the global pin counts. This method should be called at the + * end of a task (either by a task completion handler or in `TaskRunner.run()`). + * + * @return the ids of blocks whose pins were released + */ + def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = { + var selectLocks = ArrayBuffer[BlockId]() + pendingToRemove.entrySet().asScala.foreach { entry => + if (entry.getValue == taskAttemptId) { + pendingToRemove.remove(taskAttemptId) + selectLocks += entry.getKey + } + } + selectLocks + } + private def shouldCompress(blockId: BlockId): Boolean = { blockId match { case _: ShuffleBlockId => compressShuffle @@ -1234,6 +1271,7 @@ private[spark] class BlockManager( rpcEnv.stop(slaveEndpoint) blockInfo.clear() memoryStore.clear() + pendingToRemove.clear() diskStore.clear() if (externalBlockStoreInitialized) { externalBlockStore.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8a1aede..f876b72808087 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -424,6 +424,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("deadlock between dropFromMemory and removeBlock") { + store = makeBlockManager(2000) + val a1 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + val t1 = new Thread { + override def run() = { + store.memoryManager.synchronized { + Thread.sleep(1000) + val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) + assert(status == None, "this thread can not get block a1") + } + } + } + + val t2 = new Thread { + override def run() = { + store.removeBlock("a1", tellMaster = false) + } + } + + t1.start() + t2.start() + t1.join() + t2.join() + } + test("correct BlockResult returned from get() calls") { store = makeBlockManager(12000) val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) From aaa6a96704d9f1435965812a99d3ac37a61ac5ff Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Mon, 7 Mar 2016 20:14:06 +0800 Subject: [PATCH 2/4] release all locks after task complete --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fdcc1c2fe8b18..c1301606de7ee 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1191,7 +1191,7 @@ private[spark] class BlockManager( var selectLocks = ArrayBuffer[BlockId]() pendingToRemove.entrySet().asScala.foreach { entry => if (entry.getValue == taskAttemptId) { - pendingToRemove.remove(taskAttemptId) + pendingToRemove.remove(entry.getKey) selectLocks += entry.getKey } } From 449e6fcdfc148ad40507edf13c79db3258b9ddfd Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 6 May 2016 11:59:42 +0800 Subject: [PATCH 3/4] use putIfAbsent instead of containsKey --- .../apache/spark/storage/BlockManager.scala | 158 ++++++++++-------- 1 file changed, 86 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c1301606de7ee..50dc77559d7a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1032,56 +1032,58 @@ private[spark] class BlockManager( val info = blockInfo.get(blockId).orNull // If the block has not already been dropped - if (info != null && !pendingToRemove.containsKey(blockId)) { - info.synchronized { - // required ? As of now, this will be invoked only for blocks which are ready - // But in case this changes in future, adding for consistency sake. - if (!info.waitForReady()) { - // If we get here, the block write failed. - logWarning(s"Block $blockId was marked as failure. Nothing to drop") - return None - } else if (blockInfo.get(blockId).isEmpty) { - logWarning(s"Block $blockId was already dropped.") - return None - } - var blockIsUpdated = false - val level = info.level + if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + // required ? As of now, this will be invoked only for blocks which are ready + // But in case this changes in future, adding for consistency sake. + if (!info.waitForReady()) { + // If we get here, the block write failed. + logWarning(s"Block $blockId was marked as failure. Nothing to drop") + return None + } else if (blockInfo.get(blockId).isEmpty) { + logWarning(s"Block $blockId was already dropped.") + return None + } + var blockIsUpdated = false + val level = info.level - // Drop to disk, if storage level requires - if (level.useDisk && !diskStore.contains(blockId)) { - logInfo(s"Writing block $blockId to disk") - data() match { - case Left(elements) => - diskStore.putArray(blockId, elements, level, returnValues = false) - case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) + // Drop to disk, if storage level requires + if (level.useDisk && !diskStore.contains(blockId)) { + logInfo(s"Writing block $blockId to disk") + data() match { + case Left(elements) => + diskStore.putArray(blockId, elements, level, returnValues = false) + case Right(bytes) => + diskStore.putBytes(blockId, bytes, level) + } + blockIsUpdated = true } - blockIsUpdated = true - } - pendingToRemove.put(blockId, currentTaskAttemptId) - // Actually drop from memory store - val droppedMemorySize = - if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L - val blockIsRemoved = memoryStore.remove(blockId) - pendingToRemove.remove(blockId) - if (blockIsRemoved) { - blockIsUpdated = true - } else { - logWarning(s"Block $blockId could not be dropped from memory as it does not exist") - } + // Actually drop from memory store + val droppedMemorySize = + if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L + val blockIsRemoved = memoryStore.remove(blockId) + if (blockIsRemoved) { + blockIsUpdated = true + } else { + logWarning(s"Block $blockId could not be dropped from memory as it does not exist") + } - val status = getCurrentBlockStatus(blockId, info) - if (info.tellMaster) { - reportBlockStatus(blockId, info, status, droppedMemorySize) - } - if (!level.useDisk) { - // The block is completely gone from this node; forget it so we can put() it again later. - blockInfo.remove(blockId) - } - if (blockIsUpdated) { - return Some(status) + val status = getCurrentBlockStatus(blockId, info) + if (info.tellMaster) { + reportBlockStatus(blockId, info, status, droppedMemorySize) + } + if (!level.useDisk) { + // The block is completely gone from this node;forget it so we can put() it again later. + blockInfo.remove(blockId) + } + if (blockIsUpdated) { + return Some(status) + } } + } finally { + pendingToRemove.remove(blockId) } } None @@ -1118,25 +1120,27 @@ private[spark] class BlockManager( def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") val info = blockInfo.get(blockId).orNull - if (info != null && !pendingToRemove.containsKey(blockId)) { - pendingToRemove.put(blockId, currentTaskAttemptId) - info.synchronized { - val level = info.level - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false - pendingToRemove.remove(blockId) - val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false - val removedFromExternalBlockStore = - if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false - if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { - logWarning(s"Block $blockId could not be removed as it was not found in either " + - "the disk, memory, or external block store") - } - blockInfo.remove(blockId) - if (tellMaster && info.tellMaster) { - val status = getCurrentBlockStatus(blockId, info) - reportBlockStatus(blockId, info, status) + if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + val level = info.level + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false + val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false + val removedFromExternalBlockStore = + if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false + if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) { + logWarning(s"Block $blockId could not be removed as it was not found in either " + + "the disk, memory, or external block store") + } + blockInfo.remove(blockId) + if (tellMaster && info.tellMaster) { + val status = getCurrentBlockStatus(blockId, info) + reportBlockStatus(blockId, info, status) + } } + } finally { + pendingToRemove.remove(blockId) } } else { // The block has already been removed; do nothing. @@ -1160,15 +1164,25 @@ private[spark] class BlockManager( val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) if (time < cleanupTime && shouldDrop(id)) { - pendingToRemove.put(id, currentTaskAttemptId) - info.synchronized { - val level = info.level - if (level.useMemory) { memoryStore.remove(id) } - pendingToRemove.remove(id) - if (level.useDisk) { diskStore.remove(id) } - if (level.useOffHeap) { externalBlockStore.remove(id) } - iterator.remove() - logInfo(s"Dropped block $id") + if (pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + val level = info.level + if (level.useMemory) { + memoryStore.remove(id) + } + if (level.useDisk) { + diskStore.remove(id) + } + if (level.useOffHeap) { + externalBlockStore.remove(id) + } + iterator.remove() + logInfo(s"Dropped block $id") + } + } finally { + pendingToRemove.remove(id) + } } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) From a096afa9e1f6105fe55d481130d24f1d0ad9cb26 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 6 May 2016 17:23:32 +0800 Subject: [PATCH 4/4] use CountDownLock instead of sleep --- .../apache/spark/storage/BlockManager.scala | 41 ++++++++++--------- .../spark/storage/BlockManagerSuite.scala | 28 +++++++++---- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 50dc77559d7a9..a46cc55e29cb4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -167,7 +167,7 @@ private[spark] class BlockManager( private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) // Blocks are removing by another thread - private val pendingToRemove = new ConcurrentHashMap[BlockId, Long]() + val pendingToRemove = new ConcurrentHashMap[BlockId, Long]() private val NON_TASK_WRITER = -1024L @@ -1163,26 +1163,25 @@ private[spark] class BlockManager( while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) - if (time < cleanupTime && shouldDrop(id)) { - if (pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) { - try { - info.synchronized { - val level = info.level - if (level.useMemory) { - memoryStore.remove(id) - } - if (level.useDisk) { - diskStore.remove(id) - } - if (level.useOffHeap) { - externalBlockStore.remove(id) - } - iterator.remove() - logInfo(s"Dropped block $id") + if (time < cleanupTime && shouldDrop(id) && + pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) { + try { + info.synchronized { + val level = info.level + if (level.useMemory) { + memoryStore.remove(id) } - } finally { - pendingToRemove.remove(id) + if (level.useDisk) { + diskStore.remove(id) + } + if (level.useOffHeap) { + externalBlockStore.remove(id) + } + iterator.remove() + logInfo(s"Dropped block $id") } + } finally { + pendingToRemove.remove(id) } val status = getCurrentBlockStatus(id, info) reportBlockStatus(id, info, status) @@ -1194,6 +1193,10 @@ private[spark] class BlockManager( Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER) } + def getBlockInfo(blockId: BlockId): BlockInfo = { + blockInfo.get(blockId).orNull + } + /** * Release all lock held by the given task, clearing that task's pin bookkeeping * structures and updating the global pin counts. This method should be called at the diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f876b72808087..333dcdb83b21d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays +import java.util.concurrent.CountDownLatch import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -428,26 +429,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - val t1 = new Thread { + val lock1 = new CountDownLatch(1) + val lock2 = new CountDownLatch(1) + + val t2 = new Thread { override def run() = { - store.memoryManager.synchronized { - Thread.sleep(1000) - val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) - assert(status == None, "this thread can not get block a1") + val info = store.getBlockInfo("a1") + info.synchronized { + store.pendingToRemove.put("a1", 1L) + lock1.countDown() + lock2.await() + store.pendingToRemove.remove("a1") } } } - val t2 = new Thread { + val t1 = new Thread { override def run() = { - store.removeBlock("a1", tellMaster = false) + store.memoryManager.synchronized { + t2.start() + lock1.await() + val status = store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) + assert(status == None, "this thread can not get block a1") + lock2.countDown() + } } } t1.start() - t2.start() t1.join() t2.join() + store.removeBlock("a1", tellMaster = false) } test("correct BlockResult returned from get() calls") {