@@ -217,7 +217,7 @@ private[spark] class BlockManager(
217217 logInfo(s " Reporting ${blockInfoManager.size} blocks to the master. " )
218218 for ((blockId, info) <- blockInfoManager.entries) {
219219 val status = getCurrentBlockStatus(blockId, info)
220- if (! tryToReportBlockStatus(blockId, info , status)) {
220+ if (info.tellMaster && ! tryToReportBlockStatus(blockId, status)) {
221221 logError(s " Failed to report $blockId to master; giving up. " )
222222 return
223223 }
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
298298
299299 /**
300300 * Get the BlockStatus for the block identified by the given ID, if it exists.
301- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store .
301+ * NOTE: This is mainly for testing.
302302 */
303303 def getStatus (blockId : BlockId ): Option [BlockStatus ] = {
304304 blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
333333 */
334334 private def reportBlockStatus (
335335 blockId : BlockId ,
336- info : BlockInfo ,
337336 status : BlockStatus ,
338337 droppedMemorySize : Long = 0L ): Unit = {
339- val needReregister = ! tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
338+ val needReregister = ! tryToReportBlockStatus(blockId, status, droppedMemorySize)
340339 if (needReregister) {
341340 logInfo(s " Got told to re-register updating block $blockId" )
342341 // Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
352351 */
353352 private def tryToReportBlockStatus (
354353 blockId : BlockId ,
355- info : BlockInfo ,
356354 status : BlockStatus ,
357355 droppedMemorySize : Long = 0L ): Boolean = {
358- if (info.tellMaster) {
359- val storageLevel = status.storageLevel
360- val inMemSize = Math .max(status.memSize, droppedMemorySize)
361- val onDiskSize = status.diskSize
362- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
363- } else {
364- true
365- }
356+ val storageLevel = status.storageLevel
357+ val inMemSize = Math .max(status.memSize, droppedMemorySize)
358+ val onDiskSize = status.diskSize
359+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
366360 }
367361
368362 /**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
374368 info.synchronized {
375369 info.level match {
376370 case null =>
377- BlockStatus ( StorageLevel . NONE , memSize = 0L , diskSize = 0L )
371+ BlockStatus .empty
378372 case level =>
379373 val inMem = level.useMemory && memoryStore.contains(blockId)
380374 val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -565,8 +559,9 @@ private[spark] class BlockManager(
565559 // Give up trying anymore locations. Either we've tried all of the original locations,
566560 // or we've refreshed the list of locations from the master, and have still
567561 // hit failures after trying locations from the refreshed list.
568- throw new BlockFetchException (s " Failed to fetch block after " +
569- s " ${totalFailureCount} fetch failures. Most recent failure cause: " , e)
562+ logWarning(s " Failed to fetch block after $totalFailureCount fetch failures. " +
563+ s " Most recent failure cause: " , e)
564+ return None
570565 }
571566
572567 logWarning(s " Failed to fetch remote block $blockId " +
@@ -807,12 +802,10 @@ private[spark] class BlockManager(
807802 // Now that the block is in either the memory or disk store,
808803 // tell the master about it.
809804 info.size = size
810- if (tellMaster) {
811- reportBlockStatus(blockId, info, putBlockStatus)
812- }
813- Option (TaskContext .get()).foreach { c =>
814- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
805+ if (tellMaster && info.tellMaster) {
806+ reportBlockStatus(blockId, putBlockStatus)
815807 }
808+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
816809 }
817810 logDebug(" Put block %s locally took %s" .format(blockId, Utils .getUsedTimeMs(startTimeMs)))
818811 if (level.replication > 1 ) {
@@ -961,15 +954,12 @@ private[spark] class BlockManager(
961954 val putBlockStatus = getCurrentBlockStatus(blockId, info)
962955 val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
963956 if (blockWasSuccessfullyStored) {
964- // Now that the block is in either the memory, externalBlockStore, or disk store,
965- // tell the master about it.
957+ // Now that the block is in either the memory or disk store, tell the master about it.
966958 info.size = size
967- if (tellMaster) {
968- reportBlockStatus(blockId, info, putBlockStatus)
969- }
970- Option (TaskContext .get()).foreach { c =>
971- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
959+ if (tellMaster && info.tellMaster) {
960+ reportBlockStatus(blockId, putBlockStatus)
972961 }
962+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
973963 logDebug(" Put block %s locally took %s" .format(blockId, Utils .getUsedTimeMs(startTimeMs)))
974964 if (level.replication > 1 ) {
975965 val remoteStartTime = System .currentTimeMillis
@@ -1271,12 +1261,10 @@ private[spark] class BlockManager(
12711261
12721262 val status = getCurrentBlockStatus(blockId, info)
12731263 if (info.tellMaster) {
1274- reportBlockStatus(blockId, info, status, droppedMemorySize)
1264+ reportBlockStatus(blockId, status, droppedMemorySize)
12751265 }
12761266 if (blockIsUpdated) {
1277- Option (TaskContext .get()).foreach { c =>
1278- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
1279- }
1267+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
12801268 }
12811269 status.storageLevel
12821270 }
@@ -1316,21 +1304,31 @@ private[spark] class BlockManager(
13161304 // The block has already been removed; do nothing.
13171305 logWarning(s " Asked to remove block $blockId, which does not exist " )
13181306 case Some (info) =>
1319- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1320- val removedFromMemory = memoryStore.remove(blockId)
1321- val removedFromDisk = diskStore.remove(blockId)
1322- if (! removedFromMemory && ! removedFromDisk) {
1323- logWarning(s " Block $blockId could not be removed as it was not found in either " +
1324- " the disk, memory, or external block store" )
1325- }
1326- blockInfoManager.removeBlock(blockId)
1327- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
1328- if (tellMaster && info.tellMaster) {
1329- reportBlockStatus(blockId, info, removeBlockStatus)
1330- }
1331- Option (TaskContext .get()).foreach { c =>
1332- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
1333- }
1307+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
1308+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus .empty)
1309+ }
1310+ }
1311+
1312+ /**
1313+ * Internal version of [[removeBlock() ]] which assumes that the caller already holds a write
1314+ * lock on the block.
1315+ */
1316+ private def removeBlockInternal (blockId : BlockId , tellMaster : Boolean ): Unit = {
1317+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
1318+ val removedFromMemory = memoryStore.remove(blockId)
1319+ val removedFromDisk = diskStore.remove(blockId)
1320+ if (! removedFromMemory && ! removedFromDisk) {
1321+ logWarning(s " Block $blockId could not be removed as it was not found on disk or in memory " )
1322+ }
1323+ blockInfoManager.removeBlock(blockId)
1324+ if (tellMaster) {
1325+ reportBlockStatus(blockId, BlockStatus .empty)
1326+ }
1327+ }
1328+
1329+ private def addUpdatedBlockStatusToTaskMetrics (blockId : BlockId , status : BlockStatus ): Unit = {
1330+ Option (TaskContext .get()).foreach { c =>
1331+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
13341332 }
13351333 }
13361334
0 commit comments