Skip to content

Commit a82ea25

Browse files
committed
Add tests for StorageStatusListener
1 parent 8773b01 commit a82ea25

File tree

2 files changed

+153
-1
lines changed

2 files changed

+153
-1
lines changed

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler._
2929
@DeveloperApi
3030
class StorageStatusListener extends SparkListener {
3131
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
32-
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
32+
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
3333

3434
def storageStatusList = executorIdToStorageStatus.values.toSeq
3535

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import org.scalatest.FunSuite
21+
import org.apache.spark.Success
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.scheduler._
24+
25+
/**
26+
* Test the behavior of StorageStatusListener in response to all relevant events.
27+
*/
28+
class StorageStatusListenerSuite extends FunSuite {
29+
private val bm1 = BlockManagerId("big", "dog", 1, 1)
30+
private val bm2 = BlockManagerId("fat", "duck", 2, 2)
31+
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
32+
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
33+
34+
test("block manager added/removed") {
35+
val listener = new StorageStatusListener
36+
37+
// Block manager add
38+
assert(listener.executorIdToStorageStatus.size === 0)
39+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
40+
assert(listener.executorIdToStorageStatus.size === 1)
41+
assert(listener.executorIdToStorageStatus.get("big").isDefined)
42+
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
43+
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
44+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
45+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
46+
assert(listener.executorIdToStorageStatus.size === 2)
47+
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
48+
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
49+
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
50+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
51+
52+
// Block manager remove
53+
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
54+
assert(listener.executorIdToStorageStatus.size === 1)
55+
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
56+
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
57+
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
58+
assert(listener.executorIdToStorageStatus.size === 0)
59+
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
60+
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
61+
}
62+
63+
test("task end without updated blocks") {
64+
val listener = new StorageStatusListener
65+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
66+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
67+
val taskMetrics = new TaskMetrics
68+
69+
// Task end with no updated blocks
70+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
71+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
72+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
73+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
74+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
75+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
76+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
77+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
78+
}
79+
80+
test("task end with updated blocks") {
81+
val listener = new StorageStatusListener
82+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
83+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
84+
val taskMetrics1 = new TaskMetrics
85+
val taskMetrics2 = new TaskMetrics
86+
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
87+
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
88+
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
89+
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
90+
taskMetrics2.updatedBlocks = Some(Seq(block3))
91+
92+
// Task end with new blocks
93+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
94+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
95+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
96+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
97+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
98+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
99+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
100+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
101+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
102+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
103+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
104+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
105+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
106+
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
107+
108+
// Task end with dropped blocks
109+
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
110+
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
111+
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
112+
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
113+
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
114+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
115+
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
116+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
117+
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
118+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
119+
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
120+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
121+
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
122+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
123+
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
124+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
125+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
126+
}
127+
128+
test("unpersist RDD") {
129+
val listener = new StorageStatusListener
130+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
131+
val taskMetrics1 = new TaskMetrics
132+
val taskMetrics2 = new TaskMetrics
133+
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
134+
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
135+
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
136+
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
137+
taskMetrics2.updatedBlocks = Some(Seq(block3))
138+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
139+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
140+
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
141+
142+
// Unpersist RDD
143+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
144+
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
145+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
146+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
147+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
148+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
149+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
150+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
151+
}
152+
}

0 commit comments

Comments
 (0)