From be8c26f29ad1f30e3f706926320abf630452fb70 Mon Sep 17 00:00:00 2001 From: Warren Zhu Date: Fri, 22 Sep 2023 16:53:09 -0700 Subject: [PATCH] Avoid accquire read lock when keepReadLock is false --- .../apache/spark/storage/BlockInfoManager.scala | 11 +++++++---- .../org/apache/spark/storage/BlockManager.scala | 6 +----- .../spark/storage/BlockInfoManagerSuite.scala | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 8dccfbc5e2d80..f80190c96e85a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -415,13 +415,14 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -449,6 +450,8 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true + } else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 81933744472ee..cccee78aee132 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1510,14 +1510,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") - if (!keepReadLock) { - // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: - releaseLock(blockId) - } return None } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 3708f0aa67223..f133a38269d71 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -166,6 +166,20 @@ class BlockInfoManagerSuite extends SparkFunSuite { assert(blockInfoManager.get("block").get.readerCount === 1) } + test("lockNewBlockForWriting should not block when keepReadLock is false") { + withTaskId(0) { + assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) + } + val lock1Future = Future { + withTaskId(1) { + blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false) + } + } + + assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds)) + assert(blockInfoManager.get("block").get.readerCount === 0) + } + test("read locks are reentrant") { withTaskId(1) { assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))