@@ -861,37 +861,36 @@ private[spark] class BlockManager(
861861 }
862862
863863 val startTimeMs = System .currentTimeMillis
864- var blockWasSuccessfullyStored : Boolean = false
865864 var exceptionWasThrown : Boolean = true
866865 val result : Option [T ] = try {
867866 val res = putBody(putBlockInfo)
868- blockWasSuccessfullyStored = res.isEmpty
869867 exceptionWasThrown = false
870- res
871- } finally {
872- if (blockWasSuccessfullyStored) {
868+ if (res.isEmpty) {
869+ // the block was successfully stored
873870 if (keepReadLock) {
874871 blockInfoManager.downgradeLock(blockId)
875872 } else {
876873 blockInfoManager.unlock(blockId)
877874 }
878875 } else {
879- if (exceptionWasThrown) {
880- // If an exception was thrown then it's possible that the code in `putBody` has already
881- // notified the master about the availability of this block, so we need to send an update
882- // to remove this block location.
883- removeBlockInternal(blockId, tellMaster = tellMaster)
884- // The `putBody` code may have also added a new block status to TaskMetrics, so we need
885- // to cancel that out by overwriting it with an empty block status. We only do this if
886- // the finally block was entered via an exception because doing this unconditionally would
887- // cause us to send empty block statuses for every block that failed to be cached due to
888- // a memory shortage (which is an expected failure, unlike an uncaught exception).
889- addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus .empty)
890- } else {
891- removeBlockInternal(blockId, tellMaster = false )
892- }
876+ removeBlockInternal(blockId, tellMaster = false )
893877 logWarning(s " Putting block $blockId failed " )
894878 }
879+ res
880+ } finally {
881+ if (exceptionWasThrown) {
882+ logWarning(s " Putting block $blockId failed due to an exception " )
883+ // If an exception was thrown then it's possible that the code in `putBody` has already
884+ // notified the master about the availability of this block, so we need to send an update
885+ // to remove this block location.
886+ removeBlockInternal(blockId, tellMaster = tellMaster)
887+ // The `putBody` code may have also added a new block status to TaskMetrics, so we need
888+ // to cancel that out by overwriting it with an empty block status. We only do this if
889+ // the finally block was entered via an exception because doing this unconditionally would
890+ // cause us to send empty block statuses for every block that failed to be cached due to
891+ // a memory shortage (which is an expected failure, unlike an uncaught exception).
892+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus .empty)
893+ }
895894 }
896895 if (level.replication > 1 ) {
897896 logDebug(" Putting block %s with replication took %s"
0 commit comments