Skip to content

Commit a97d220

Browse files
committed
update metrics when removing blocks
1 parent 816f359 commit a97d220

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,9 +1264,12 @@ private[spark] class BlockManager(
12641264
"the disk, memory, or external block store")
12651265
}
12661266
blockInfoManager.removeBlock(blockId)
1267+
val removeBlockStatus = getCurrentBlockStatus(blockId, info)
12671268
if (tellMaster && info.tellMaster) {
1268-
val status = getCurrentBlockStatus(blockId, info)
1269-
reportBlockStatus(blockId, info, status)
1269+
reportBlockStatus(blockId, info, removeBlockStatus)
1270+
}
1271+
Option(TaskContext.get()).foreach { c =>
1272+
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus)))
12701273
}
12711274
}
12721275
}

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
928928
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
929929
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
930930
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
931+
932+
// remove block - list2 should be removed from disk
933+
val updatedBlocks6 = getUpdatedBlocks {
934+
store.removeBlock(
935+
"list2", tellMaster = true)
936+
}
937+
assert(updatedBlocks6.size === 1)
938+
assert(updatedBlocks6.head._1 === TestBlockId("list2"))
939+
assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
940+
assert(!store.diskStore.contains("list2"), "list2 was in disk store")
931941
}
932942

933943
test("query block statuses") {

0 commit comments

Comments
 (0)