Skip to content

Commit f9a8ca5

Browse files
committed
[SPARK-42204][CORE] Add option to disable redundant logging of TaskMetrics internal accumulators in event logs
### What changes were proposed in this pull request? This PR adds an off-by-default option to JsonProtocol to have it exclude certain redundant accumulator information from Spark event logs in order to save space and processing time. Several event logs types contain both TaskMetrics and Accumulables, but there is redundancy in how the TaskMetrics data is stored: - TaskMetrics are stored in a map called "Task Metrics" which maps from metric names to metric values. - An "Accumulables" field contains information on accumulator updates from the task, but this field includes updates from the TaskMetrics internal accumulators (both the value from the task, plus a running "sum-so-far" from all of the tasks completed up to that point). The redundant task metrics accumulables are not actually used by the Spark History Server: I verified this by reading AppStatusListener and SQLAppStatusListener. I believe that this redundancy was introduced back in [SPARK-10620](https://issues.apache.org/jira/browse/SPARK-10620) when Spark 1.x's separate TaskMetrics implementation was replaced by the current accumulator-based version. In this PR, I add logic to exclude TaskMetrics internal accumulators when writing this field (if a new flag is enabled). The new `spark.eventLog.includeTaskMetricsAccumulators` configuration (default `false`, meaning "keep the redundant information") can be set to `true` to exclude these redundant internal accumulator updates. For now, I am merging this off-by-default, but in a followup PR for Spark 4.0.0 we might consider a change-of-default to `true` (in which case the flag would serve as an "escape-hatch" for users who want to restore the old behavior. Although I think it's somewhat unlikely that third-party non-Spark consumers of the event logs would be relying on this redundant information, this is changing a longstanding user-facing data format and thus needs a flag. ### Why are the changes needed? This change reduces the size of Spark event logs, especially for logs from applications that run many tasks. It should also have slight benefits on event log read and write speed (although I haven't tried to quantify this). ### Does this PR introduce _any_ user-facing change? No user-facing changes in Spark History Server. This flag's effects could be considered a user-facing change from the perspective of third-party code which does its own direct processing of Spark event logs, hence the config. However, in this PR (by itself) the flag is off-by-default. Out-of-the-box user-facing changes will be discussed / proposed in a separate flag-flip PR. ### How was this patch tested? New unit tests in `JsonProtocolSuite`. Manual tests of event log size in `spark-shell` with a job that runs `spark.parallelize(1 to 1000, 1000).count()`. For this toy query, this PR's change shrunk the uncompressed event log size by ~15%. The relative size reduction will be even greater once other issues like https://issues.apache.org/jira/browse/SPARK-42206 or https://issues.apache.org/jira/browse/SPARK-42203 are fixed. The relative reduction will be smaller for tasks with many SQL metrics because those accumulables cannot be excluded. Closes #39763 from JoshRosen/SPARK-42204-remove-redundant-logging-of-taskmetrics-internal-accumulators-in-jsonprotocol. Authored-by: Josh Rosen <[email protected]> Signed-off-by: Josh Rosen <[email protected]>
1 parent 3a4ea84 commit f9a8ca5

File tree

4 files changed

+194
-38
lines changed

4 files changed

+194
-38
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,18 @@ package object config {
271271
.toSequence
272272
.createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS)
273273

274+
private[spark] val EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS =
275+
ConfigBuilder("spark.eventLog.includeTaskMetricsAccumulators")
276+
.doc("Whether to include TaskMetrics' underlying accumulator values in the event log (as " +
277+
"part of the Task/Stage/Job metrics' 'Accumulables' fields. This configuration defaults " +
278+
"to false because the TaskMetrics values are already logged in the 'Task Metrics' " +
279+
"fields (so the accumulator updates are redundant). This flag exists only as a " +
280+
"backwards-compatibility escape hatch for applications that might rely on the old " +
281+
"behavior. See SPARK-42204 for details.")
282+
.version("4.0.0")
283+
.booleanConf
284+
.createWithDefault(false)
285+
274286
private[spark] val EVENT_LOG_OVERWRITE =
275287
ConfigBuilder("spark.eventLog.overwrite")
276288
.version("1.0.0")

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter
3131
import org.apache.spark.executor.ExecutorMetrics
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.internal.config._
34-
import org.apache.spark.util.{JsonProtocol, Utils}
34+
import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils}
3535

3636
/**
3737
* A SparkListener that logs events to persistent storage.
@@ -74,6 +74,8 @@ private[spark] class EventLoggingListener(
7474
private val liveStageExecutorMetrics =
7575
mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]]
7676

77+
private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf)
78+
7779
/**
7880
* Creates the log file in the configured log directory.
7981
*/
@@ -84,7 +86,7 @@ private[spark] class EventLoggingListener(
8486

8587
private def initEventLog(): Unit = {
8688
val metadata = SparkListenerLogStart(SPARK_VERSION)
87-
val eventJson = JsonProtocol.sparkEventToJsonString(metadata)
89+
val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions)
8890
logWriter.writeEvent(eventJson, flushLogger = true)
8991
if (testing && loggedEvents != null) {
9092
loggedEvents += eventJson
@@ -93,7 +95,7 @@ private[spark] class EventLoggingListener(
9395

9496
/** Log the event as JSON. */
9597
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = {
96-
val eventJson = JsonProtocol.sparkEventToJsonString(event)
98+
val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions)
9799
logWriter.writeEvent(eventJson, flushLogger)
98100
if (testing) {
99101
loggedEvents += eventJson

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 82 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.json4s.jackson.JsonMethods.compact
2828

2929
import org.apache.spark._
3030
import org.apache.spark.executor._
31+
import org.apache.spark.internal.config._
3132
import org.apache.spark.metrics.ExecutorMetricType
3233
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
3334
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
@@ -37,6 +38,16 @@ import org.apache.spark.storage._
3738
import org.apache.spark.util.ArrayImplicits._
3839
import org.apache.spark.util.Utils.weakIntern
3940

41+
/**
42+
* Helper class for passing configuration options to JsonProtocol.
43+
* We use this instead of passing SparkConf directly because it lets us avoid
44+
* repeated re-parsing of configuration values on each read.
45+
*/
46+
private[spark] class JsonProtocolOptions(conf: SparkConf) {
47+
val includeTaskMetricsAccumulators: Boolean =
48+
conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)
49+
}
50+
4051
/**
4152
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
4253
* and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
@@ -55,30 +66,41 @@ import org.apache.spark.util.Utils.weakIntern
5566
private[spark] object JsonProtocol extends JsonUtils {
5667
// TODO: Remove this file and put JSON serialization into each individual class.
5768

69+
private[util]
70+
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))
71+
5872
/** ------------------------------------------------- *
5973
* JSON serialization methods for SparkListenerEvents |
6074
* -------------------------------------------------- */
6175

76+
// Only for use in tests. Production code should use the two-argument overload defined below.
6277
def sparkEventToJsonString(event: SparkListenerEvent): String = {
78+
sparkEventToJsonString(event, defaultOptions)
79+
}
80+
81+
def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = {
6382
toJsonString { generator =>
64-
writeSparkEventToJson(event, generator)
83+
writeSparkEventToJson(event, generator, options)
6584
}
6685
}
6786

68-
def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = {
87+
def writeSparkEventToJson(
88+
event: SparkListenerEvent,
89+
g: JsonGenerator,
90+
options: JsonProtocolOptions): Unit = {
6991
event match {
7092
case stageSubmitted: SparkListenerStageSubmitted =>
71-
stageSubmittedToJson(stageSubmitted, g)
93+
stageSubmittedToJson(stageSubmitted, g, options)
7294
case stageCompleted: SparkListenerStageCompleted =>
73-
stageCompletedToJson(stageCompleted, g)
95+
stageCompletedToJson(stageCompleted, g, options)
7496
case taskStart: SparkListenerTaskStart =>
75-
taskStartToJson(taskStart, g)
97+
taskStartToJson(taskStart, g, options)
7698
case taskGettingResult: SparkListenerTaskGettingResult =>
77-
taskGettingResultToJson(taskGettingResult, g)
99+
taskGettingResultToJson(taskGettingResult, g, options)
78100
case taskEnd: SparkListenerTaskEnd =>
79-
taskEndToJson(taskEnd, g)
101+
taskEndToJson(taskEnd, g, options)
80102
case jobStart: SparkListenerJobStart =>
81-
jobStartToJson(jobStart, g)
103+
jobStartToJson(jobStart, g, options)
82104
case jobEnd: SparkListenerJobEnd =>
83105
jobEndToJson(jobEnd, g)
84106
case environmentUpdate: SparkListenerEnvironmentUpdate =>
@@ -112,51 +134,64 @@ private[spark] object JsonProtocol extends JsonUtils {
112134
}
113135
}
114136

115-
def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = {
137+
def stageSubmittedToJson(
138+
stageSubmitted: SparkListenerStageSubmitted,
139+
g: JsonGenerator,
140+
options: JsonProtocolOptions): Unit = {
116141
g.writeStartObject()
117142
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
118143
g.writeFieldName("Stage Info")
119144
// SPARK-42205: don't log accumulables in start events:
120-
stageInfoToJson(stageSubmitted.stageInfo, g, includeAccumulables = false)
145+
stageInfoToJson(stageSubmitted.stageInfo, g, options, includeAccumulables = false)
121146
Option(stageSubmitted.properties).foreach { properties =>
122147
g.writeFieldName("Properties")
123148
propertiesToJson(properties, g)
124149
}
125150
g.writeEndObject()
126151
}
127152

128-
def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = {
153+
def stageCompletedToJson(
154+
stageCompleted: SparkListenerStageCompleted,
155+
g: JsonGenerator,
156+
options: JsonProtocolOptions): Unit = {
129157
g.writeStartObject()
130158
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
131159
g.writeFieldName("Stage Info")
132-
stageInfoToJson(stageCompleted.stageInfo, g, includeAccumulables = true)
160+
stageInfoToJson(stageCompleted.stageInfo, g, options, includeAccumulables = true)
133161
g.writeEndObject()
134162
}
135163

136-
def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = {
164+
def taskStartToJson(
165+
taskStart: SparkListenerTaskStart,
166+
g: JsonGenerator,
167+
options: JsonProtocolOptions): Unit = {
137168
g.writeStartObject()
138169
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
139170
g.writeNumberField("Stage ID", taskStart.stageId)
140171
g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
141172
g.writeFieldName("Task Info")
142173
// SPARK-42205: don't log accumulables in start events:
143-
taskInfoToJson(taskStart.taskInfo, g, includeAccumulables = false)
174+
taskInfoToJson(taskStart.taskInfo, g, options, includeAccumulables = false)
144175
g.writeEndObject()
145176
}
146177

147178
def taskGettingResultToJson(
148179
taskGettingResult: SparkListenerTaskGettingResult,
149-
g: JsonGenerator): Unit = {
180+
g: JsonGenerator,
181+
options: JsonProtocolOptions): Unit = {
150182
val taskInfo = taskGettingResult.taskInfo
151183
g.writeStartObject()
152184
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
153185
g.writeFieldName("Task Info")
154186
// SPARK-42205: don't log accumulables in "task getting result" events:
155-
taskInfoToJson(taskInfo, g, includeAccumulables = false)
187+
taskInfoToJson(taskInfo, g, options, includeAccumulables = false)
156188
g.writeEndObject()
157189
}
158190

159-
def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
191+
def taskEndToJson(
192+
taskEnd: SparkListenerTaskEnd,
193+
g: JsonGenerator,
194+
options: JsonProtocolOptions): Unit = {
160195
g.writeStartObject()
161196
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
162197
g.writeNumberField("Stage ID", taskEnd.stageId)
@@ -165,7 +200,7 @@ private[spark] object JsonProtocol extends JsonUtils {
165200
g.writeFieldName("Task End Reason")
166201
taskEndReasonToJson(taskEnd.reason, g)
167202
g.writeFieldName("Task Info")
168-
taskInfoToJson(taskEnd.taskInfo, g, includeAccumulables = true)
203+
taskInfoToJson(taskEnd.taskInfo, g, options, includeAccumulables = true)
169204
g.writeFieldName("Task Executor Metrics")
170205
executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
171206
Option(taskEnd.taskMetrics).foreach { m =>
@@ -175,7 +210,10 @@ private[spark] object JsonProtocol extends JsonUtils {
175210
g.writeEndObject()
176211
}
177212

178-
def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit = {
213+
def jobStartToJson(
214+
jobStart: SparkListenerJobStart,
215+
g: JsonGenerator,
216+
options: JsonProtocolOptions): Unit = {
179217
g.writeStartObject()
180218
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart)
181219
g.writeNumberField("Job ID", jobStart.jobId)
@@ -186,7 +224,7 @@ private[spark] object JsonProtocol extends JsonUtils {
186224
// the job was submitted: it is technically possible for a stage to belong to multiple
187225
// concurrent jobs, so this situation can arise even without races occurring between
188226
// event logging and stage completion.
189-
jobStart.stageInfos.foreach(stageInfoToJson(_, g, includeAccumulables = true))
227+
jobStart.stageInfos.foreach(stageInfoToJson(_, g, options, includeAccumulables = true))
190228
g.writeEndArray()
191229
g.writeArrayFieldStart("Stage IDs")
192230
jobStart.stageIds.foreach(g.writeNumber)
@@ -386,6 +424,7 @@ private[spark] object JsonProtocol extends JsonUtils {
386424
def stageInfoToJson(
387425
stageInfo: StageInfo,
388426
g: JsonGenerator,
427+
options: JsonProtocolOptions,
389428
includeAccumulables: Boolean): Unit = {
390429
g.writeStartObject()
391430
g.writeNumberField("Stage ID", stageInfo.stageId)
@@ -404,7 +443,10 @@ private[spark] object JsonProtocol extends JsonUtils {
404443
stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _))
405444
g.writeFieldName("Accumulables")
406445
if (includeAccumulables) {
407-
accumulablesToJson(stageInfo.accumulables.values, g)
446+
accumulablesToJson(
447+
stageInfo.accumulables.values,
448+
g,
449+
includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators)
408450
} else {
409451
g.writeStartArray()
410452
g.writeEndArray()
@@ -418,6 +460,7 @@ private[spark] object JsonProtocol extends JsonUtils {
418460
def taskInfoToJson(
419461
taskInfo: TaskInfo,
420462
g: JsonGenerator,
463+
options: JsonProtocolOptions,
421464
includeAccumulables: Boolean): Unit = {
422465
g.writeStartObject()
423466
g.writeNumberField("Task ID", taskInfo.taskId)
@@ -435,21 +478,34 @@ private[spark] object JsonProtocol extends JsonUtils {
435478
g.writeBooleanField("Killed", taskInfo.killed)
436479
g.writeFieldName("Accumulables")
437480
if (includeAccumulables) {
438-
accumulablesToJson(taskInfo.accumulables, g)
481+
accumulablesToJson(
482+
taskInfo.accumulables,
483+
g,
484+
includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators)
439485
} else {
440486
g.writeStartArray()
441487
g.writeEndArray()
442488
}
443489
g.writeEndObject()
444490
}
445491

446-
private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses")
492+
private[util] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES)
493+
494+
private[this] val taskMetricAccumulableNames = TaskMetrics.empty.nameToAccums.keySet.toSet
447495

448-
def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = {
496+
def accumulablesToJson(
497+
accumulables: Iterable[AccumulableInfo],
498+
g: JsonGenerator,
499+
includeTaskMetricsAccumulators: Boolean = true): Unit = {
449500
g.writeStartArray()
450501
accumulables
451-
.filterNot(_.name.exists(accumulableExcludeList.contains))
452-
.toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g))
502+
.filterNot { acc =>
503+
acc.name.exists(accumulableExcludeList.contains) ||
504+
(!includeTaskMetricsAccumulators && acc.name.exists(taskMetricAccumulableNames.contains))
505+
}
506+
.toList
507+
.sortBy(_.id)
508+
.foreach(a => accumulableInfoToJson(a, g))
453509
g.writeEndArray()
454510
}
455511

0 commit comments

Comments
 (0)