Skip to content

Commit c5d76fd

Browse files
committed
address comments
1 parent 5951c9e commit c5d76fd

File tree

1 file changed

+73
-84
lines changed

1 file changed

+73
-84
lines changed

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 73 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package org.apache.spark.status
1919

2020
import org.apache.spark.{SparkConf, SparkFunSuite}
21-
import org.apache.spark.status.LiveEntityHelpers.makeNegative
22-
import org.apache.spark.status.api.v1
23-
import org.apache.spark.status.api.v1.{InputMetrics, OutputMetrics, ShuffleReadMetrics, ShuffleWriteMetrics}
21+
import org.apache.spark.executor.TaskMetrics
22+
import org.apache.spark.scheduler.{TaskInfo, TaskLocality}
2423
import org.apache.spark.util.{Distribution, Utils}
2524
import org.apache.spark.util.kvstore._
2625

@@ -79,78 +78,62 @@ class AppStatusStoreSuite extends SparkFunSuite {
7978
assert(store.count(classOf[CachedQuantile]) === 2)
8079
}
8180

82-
private def createAppStore(store: KVStore, live: Boolean = false): AppStatusStore = {
81+
private def createAppStore(live: Boolean, disk: Boolean): AppStatusStore = {
8382
val conf = new SparkConf()
8483
if (live) {
8584
AppStatusStore.createLiveStore(conf)
85+
}
86+
87+
val store: KVStore = if (disk) {
88+
val testDir = Utils.createTempDir()
89+
val diskStore = KVUtils.open(testDir, getClass.getName)
90+
new ElementTrackingStore(diskStore, conf)
8691
} else {
87-
new AppStatusStore(store)
92+
new ElementTrackingStore(new InMemoryStore, conf)
8893
}
94+
new AppStatusStore(store)
8995
}
9096

91-
test("SPARK-26260: task summary should contain only successful tasks' metrics") {
92-
val testDir = Utils.createTempDir()
93-
val diskStore = KVUtils.open(testDir, getClass.getName)
94-
val inMemoryStore = new InMemoryStore
95-
96-
val historyDiskAppStore = createAppStore(diskStore)
97-
val historyInMemoryAppStore = createAppStore(inMemoryStore)
98-
val liveAppStore = createAppStore(inMemoryStore, live = true)
99-
100-
Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore =>
97+
Seq(
98+
"disk" -> createAppStore(disk = true, live = false),
99+
"in memory" -> createAppStore(disk = false, live = false),
100+
"in memory live" -> createAppStore(disk = false, live = true)
101+
).foreach { case (hint, appStore) =>
102+
test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint") {
101103
val store = appStore.store
104+
102105
// Success and failed tasks metrics
103106
for (i <- 0 to 5) {
104-
if (i % 2 == 1) {
105-
store.write(newTaskData(i, status = "FAILED"))
107+
if (i % 2 == 0) {
108+
writeTaskDataToStore(i, store, "FAILED")
106109
} else {
107-
store.write(newTaskData(i, status = "SUCCESS"))
110+
writeTaskDataToStore(i, store, "SUCCESS")
108111
}
109112
}
110-
// Running tasks metrics (default metrics, positive metrics)
113+
114+
// Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported)
111115
Seq(-1, 6).foreach { metric =>
112-
store.write(newTaskData(metric, status = "RUNNING"))
116+
writeTaskDataToStore(metric, store, "RUNNING")
113117
}
114118

115119
/**
116120
* Following are the tasks metrics,
117-
* 0, 2, 4 => Success
118-
* 1, 3, 5 => Failed
121+
* 1, 3, 5 => Success
122+
* 0, 2, 4 => Failed
119123
* -1, 6 => Running
120124
*
121-
* Task summary will consider (0, 2, 4) only
125+
* Task summary will consider (1, 3, 5) only
122126
*/
123127
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
124128

125-
val values = Array(0.0, 2.0, 4.0)
129+
val values = Array(1.0, 3.0, 5.0)
126130

127131
val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
128132
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
129133
assert(expected === actual)
130134
}
131135
appStore.close()
132136
}
133-
Utils.deleteRecursively(testDir)
134-
}
135-
136-
test("SPARK-26260: task summary should be empty for non-successful tasks") {
137-
// This test will check for 0 metric value for failed task
138-
val testDir = Utils.createTempDir()
139-
val diskStore = KVUtils.open(testDir, getClass.getName)
140-
val inMemoryStore = new InMemoryStore
141-
142-
val historyDiskAppStore = createAppStore(diskStore)
143-
val historyInMemoryAppStore = createAppStore(inMemoryStore)
144-
val liveAppStore = createAppStore(inMemoryStore, live = true)
145-
146-
Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore =>
147-
val store = appStore.store
148-
(0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
149-
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
150-
assert(summary.size === 0)
151-
appStore.close()
152-
}
153-
Utils.deleteRecursively(testDir)
154137
}
155138

156139
private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
@@ -170,49 +153,55 @@ class AppStatusStoreSuite extends SparkFunSuite {
170153
}
171154

172155
private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
173-
174-
val metrics = new v1.TaskMetrics(
156+
new TaskDataWrapper(
157+
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true,
158+
i, i, i, i, i, i, i, i, i, i,
175159
i, i, i, i, i, i, i, i, i, i,
176-
new InputMetrics(i, i),
177-
new OutputMetrics(i, i),
178-
new ShuffleReadMetrics(i, i, i, i, i, i, i),
179-
new ShuffleWriteMetrics(i, i, i))
160+
i, i, i, i, stageId, attemptId)
161+
}
180162

181-
val hasMetrics = i >= 0
163+
private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = {
164+
val liveTask = new LiveTask(new TaskInfo( i.toLong, i, i, i.toLong, i.toString,
165+
i.toString, TaskLocality.ANY, false), stageId, attemptId, None)
182166

183-
val taskMetrics: v1.TaskMetrics = if (hasMetrics && status != "SUCCESS") {
184-
makeNegative(metrics)
185-
} else {
186-
metrics
167+
if (status == "SUCCESS") {
168+
liveTask.info.finishTime = 1L
169+
} else if (status == "FAILED") {
170+
liveTask.info.failed = true
171+
liveTask.info.finishTime = 1L
187172
}
188173

189-
new TaskDataWrapper(
190-
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
191-
hasMetrics,
192-
taskMetrics.executorDeserializeTime,
193-
taskMetrics.executorDeserializeCpuTime,
194-
taskMetrics.executorRunTime,
195-
taskMetrics.executorCpuTime,
196-
taskMetrics.resultSize,
197-
taskMetrics.jvmGcTime,
198-
taskMetrics.resultSerializationTime,
199-
taskMetrics.memoryBytesSpilled,
200-
taskMetrics.diskBytesSpilled,
201-
taskMetrics.peakExecutionMemory,
202-
taskMetrics.inputMetrics.bytesRead,
203-
taskMetrics.inputMetrics.recordsRead,
204-
taskMetrics.outputMetrics.bytesWritten,
205-
taskMetrics.outputMetrics.recordsWritten,
206-
taskMetrics.shuffleReadMetrics.remoteBlocksFetched,
207-
taskMetrics.shuffleReadMetrics.localBlocksFetched,
208-
taskMetrics.shuffleReadMetrics.fetchWaitTime,
209-
taskMetrics.shuffleReadMetrics.remoteBytesRead,
210-
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk,
211-
taskMetrics.shuffleReadMetrics.localBytesRead,
212-
taskMetrics.shuffleReadMetrics.recordsRead,
213-
taskMetrics.shuffleWriteMetrics.bytesWritten,
214-
taskMetrics.shuffleWriteMetrics.writeTime,
215-
taskMetrics.shuffleWriteMetrics.recordsWritten,
216-
stageId, attemptId)
174+
val taskMetrics = getTaskMetrics(i)
175+
liveTask.updateMetrics(taskMetrics)
176+
liveTask.write(store.asInstanceOf[ElementTrackingStore], 1L)
177+
}
178+
179+
private def getTaskMetrics(i: Int): TaskMetrics = {
180+
val taskMetrics = new TaskMetrics()
181+
taskMetrics.setExecutorDeserializeTime(i)
182+
taskMetrics.setExecutorDeserializeCpuTime(i)
183+
taskMetrics.setExecutorRunTime(i)
184+
taskMetrics.setExecutorCpuTime(i)
185+
taskMetrics.setResultSize(i)
186+
taskMetrics.setJvmGCTime(i)
187+
taskMetrics.setResultSerializationTime(i)
188+
taskMetrics.incMemoryBytesSpilled(i)
189+
taskMetrics.incDiskBytesSpilled(i)
190+
taskMetrics.incPeakExecutionMemory(i)
191+
taskMetrics.inputMetrics.incBytesRead(i)
192+
taskMetrics.inputMetrics.incRecordsRead(i)
193+
taskMetrics.outputMetrics.setBytesWritten(i)
194+
taskMetrics.outputMetrics.setRecordsWritten(i)
195+
taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(i)
196+
taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(i)
197+
taskMetrics.shuffleReadMetrics.incFetchWaitTime(i)
198+
taskMetrics.shuffleReadMetrics.incRemoteBytesRead(i)
199+
taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(i)
200+
taskMetrics.shuffleReadMetrics.incLocalBytesRead(i)
201+
taskMetrics.shuffleReadMetrics.incRecordsRead(i)
202+
taskMetrics.shuffleWriteMetrics.incBytesWritten(i)
203+
taskMetrics.shuffleWriteMetrics.incWriteTime(i)
204+
taskMetrics.shuffleWriteMetrics.incRecordsWritten(i)
205+
taskMetrics
217206
}
218207
}

0 commit comments

Comments
 (0)