Skip to content

Commit ff151af

Browse files
committed
add metrics when removing blocks
1 parent f12f11e commit ff151af

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,10 +1120,15 @@ private[spark] class BlockManager(
11201120
"the disk, memory, or external block store")
11211121
}
11221122
blockInfo.remove(blockId)
1123+
val status = getCurrentBlockStatus(blockId, info)
11231124
if (tellMaster && info.tellMaster) {
1124-
val status = getCurrentBlockStatus(blockId, info)
11251125
reportBlockStatus(blockId, info, status)
11261126
}
1127+
Option(TaskContext.get()).foreach { tc =>
1128+
val metrics = tc.taskMetrics()
1129+
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
1130+
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
1131+
}
11271132
}
11281133
} else {
11291134
// The block has already been removed; do nothing.

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
895895
val list = List.fill(2)(new Array[Byte](2000))
896896
val bigList = List.fill(8)(new Array[Byte](2000))
897897

898+
def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
899+
val context = TaskContext.empty()
900+
try {
901+
TaskContext.setTaskContext(context)
902+
task
903+
} finally {
904+
TaskContext.unset()
905+
}
906+
context.taskMetrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
907+
}
908+
898909
// 1 updated block (i.e. list1)
899910
val updatedBlocks1 =
900911
store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
@@ -954,6 +965,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
954965
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
955966
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
956967
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
968+
969+
// remove block - list2 should be removed from disk
970+
val updatedBlocks6 = getUpdatedBlocks {
971+
store.removeBlock(
972+
"list2", tellMaster = true)
973+
}
974+
assert(updatedBlocks6.size === 1)
975+
assert(updatedBlocks6.head._1 === TestBlockId("list2"))
976+
assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
977+
assert(!store.diskStore.contains("list2"), "list2 was in disk store")
957978
}
958979

959980
test("query block statuses") {

0 commit comments

Comments
 (0)