Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,14 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
* then just go ahead and acquire the write lock. Otherwise, if another thread is already
* writing the block, then we wait for the write to finish before acquiring the read lock.
*
* @return true if the block did not already exist, false otherwise. If this returns false, then
* a read lock on the existing block will be held. If this returns true, a write lock on
* the new block will be held.
* @return true if the block did not already exist, false otherwise.
* If this returns true, a write lock on the new block will be held.
* If this returns false then a read lock will be held iff keepReadLock == true.
*/
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = {
newBlockInfo: BlockInfo,
keepReadLock: Boolean = true): Boolean = {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
// Get the lock that will be associated with the to-be written block and lock it for the entire
// duration of this operation. This way we prevent race conditions when two threads try to write
Expand Down Expand Up @@ -449,6 +450,8 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
val result = lockForWriting(blockId, blocking = false)
assert(result.isDefined)
return true
} else if (!keepReadLock) {
return false
} else {
// Block already exists. This could happen if another thread races with us to compute
// the same block. In this case we try to acquire a read lock, if the locking succeeds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1510,14 +1510,10 @@ private[spark] class BlockManager(

val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) {
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
if (!keepReadLock) {
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
return None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ class BlockInfoManagerSuite extends SparkFunSuite {
assert(blockInfoManager.get("block").get.readerCount === 1)
}

test("lockNewBlockForWriting should not block when keepReadLock is false") {
withTaskId(0) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
}
val lock1Future = Future {
withTaskId(1) {
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false)
}
}

assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
assert(blockInfoManager.get("block").get.readerCount === 0)
}

test("read locks are reentrant") {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
Expand Down