Skip to content

Commit 80bc486

Browse files
committed
De-duplicate copy-pasted code.
1 parent b86ff24 commit 80bc486

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,17 @@ private[spark] class BlockManager(
402402
locations
403403
}
404404

405+
/**
406+
* Cleanup code run in response to a failed local read.
407+
* Must be called while holding a read lock on the block.
408+
*/
409+
private def handleLocalReadFailure(blockId: BlockId): Nothing = {
410+
releaseLock(blockId)
411+
// Remove the missing block so that its unavailability is reported to the driver
412+
removeBlock(blockId)
413+
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
414+
}
415+
405416
/**
406417
* Get block from local block manager as an iterator of Java objects.
407418
*/
@@ -441,10 +452,7 @@ private[spark] class BlockManager(
441452
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
442453
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
443454
} else {
444-
releaseLock(blockId)
445-
// Remove the missing block so that its unavailability is reported to the driver
446-
removeBlock(blockId)
447-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
455+
handleLocalReadFailure(blockId)
448456
}
449457
}
450458
}
@@ -491,10 +499,7 @@ private[spark] class BlockManager(
491499
// The block was not found on disk, so serialize an in-memory copy:
492500
serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
493501
} else {
494-
releaseLock(blockId)
495-
// Remove the missing block so that its unavailability is reported to the driver
496-
removeBlock(blockId)
497-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
502+
handleLocalReadFailure(blockId)
498503
}
499504
} else { // storage level is serialized
500505
if (level.useMemory && memoryStore.contains(blockId)) {
@@ -503,10 +508,7 @@ private[spark] class BlockManager(
503508
val diskBytes = diskStore.getBytes(blockId)
504509
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
505510
} else {
506-
releaseLock(blockId)
507-
// Remove the missing block so that its unavailability is reported to the driver
508-
removeBlock(blockId)
509-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
511+
handleLocalReadFailure(blockId)
510512
}
511513
}
512514
}

0 commit comments

Comments
 (0)