diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 42a6cddc55f21..d5fde96b146b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1991,23 +1991,32 @@ private[spark] class BlockManager( * lock on the block. */ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = { - val blockStatus = if (tellMaster) { - val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId) - Some(getCurrentBlockStatus(blockId, blockInfo)) - } else None - - // Removals are idempotent in disk store and memory store. At worst, we get a warning. - val removedFromMemory = memoryStore.remove(blockId) - val removedFromDisk = diskStore.remove(blockId) - if (!removedFromMemory && !removedFromDisk) { - logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") - } - - blockInfoManager.removeBlock(blockId) - if (tellMaster) { - // Only update storage level from the captured block status before deleting, so that - // memory size and disk size are being kept for calculating delta. - reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) + var hasRemoveBlock = false + try { + val blockStatus = if (tellMaster) { + val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId) + Some(getCurrentBlockStatus(blockId, blockInfo)) + } else None + + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + if (!removedFromMemory && !removedFromDisk) { + logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") + } + + blockInfoManager.removeBlock(blockId) + hasRemoveBlock = true + if (tellMaster) { + // Only update storage level from the captured block status before deleting, so that + // memory size and disk size are being kept for calculating delta. + reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) + } + } finally { + if (!hasRemoveBlock) { + logWarning(s"Block $blockId was not removed normally.") + blockInfoManager.removeBlock(blockId) + } } }