diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2cc2fd9ef0712..538272dc00db6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1120,10 +1120,15 @@ private[spark] class BlockManager( "the disk, memory, or external block store") } blockInfo.remove(blockId) + val status = getCurrentBlockStatus(blockId, info) if (tellMaster && info.tellMaster) { - val status = getCurrentBlockStatus(blockId, info) reportBlockStatus(blockId, info, status) } + Option(TaskContext.get()).foreach { tc => + val metrics = tc.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status))) + } } } else { // The block has already been removed; do nothing. diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8a1aede..c00591fa371aa 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -895,6 +895,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000)) + def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { + val context = TaskContext.empty() + try { + TaskContext.setTaskContext(context) + task + } finally { + TaskContext.unset() + } + context.taskMetrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + } + // 1 updated block (i.e. list1) val updatedBlocks1 = store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) @@ -954,6 +965,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!store.diskStore.contains("list3"), "list3 was in disk store") assert(!store.diskStore.contains("list4"), "list4 was in disk store") assert(!store.diskStore.contains("list5"), "list5 was in disk store") + + // remove block - list2 should be removed from disk + val updatedBlocks6 = getUpdatedBlocks { + store.removeBlock( + "list2", tellMaster = true) + } + assert(updatedBlocks6.size === 1) + assert(updatedBlocks6.head._1 === TestBlockId("list2")) + assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE) + assert(!store.diskStore.contains("list2"), "list2 was in disk store") } test("query block statuses") {