From c92d463d3d8abf13db968282e01734158d7fa6a2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 26 Jan 2023 16:47:53 -0800 Subject: [PATCH 1/7] Refactor: replace hardcoded string with constant --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 75dab8dc5358b..76224f39dbb8e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -432,7 +432,7 @@ private[spark] object JsonProtocol { g.writeEndObject() } - private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses") + private lazy val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = { g.writeStartArray() From aea48e7096534258b0b8f70e06dc9385a021ce65 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 26 Jan 2023 16:53:08 -0800 Subject: [PATCH 2/7] No need for it to be a lazy val. --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 76224f39dbb8e..ea0114e215c0b 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -432,7 +432,7 @@ private[spark] object JsonProtocol { g.writeEndObject() } - private lazy val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) + private[this] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = { g.writeStartArray() From 97809491de164b075ef7df86d1a2d97011f10889 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 26 Jan 2023 17:48:02 -0800 Subject: [PATCH 3/7] Flagged-off implementation. --- .../scheduler/EventLoggingListener.scala | 8 +- .../org/apache/spark/util/JsonProtocol.scala | 110 +++++++++++++----- .../apache/spark/util/JsonProtocolSuite.scala | 19 ++- 3 files changed, 97 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index b52a0f2f999dd..ec92368994390 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils} /** * A SparkListener that logs events to persistent storage. @@ -74,6 +74,8 @@ private[spark] class EventLoggingListener( private val liveStageExecutorMetrics = mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]] + private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf) + /** * Creates the log file in the configured log directory. */ @@ -84,7 +86,7 @@ private[spark] class EventLoggingListener( private def initEventLog(): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) - val eventJson = JsonProtocol.sparkEventToJsonString(metadata) + val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger = true) if (testing && loggedEvents != null) { loggedEvents += eventJson @@ -93,7 +95,7 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = { - val eventJson = JsonProtocol.sparkEventToJsonString(event) + val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger) if (testing) { loggedEvents += eventJson diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ea0114e215c0b..8d59833255b15 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -39,6 +39,15 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage._ import org.apache.spark.util.Utils.weakIntern +/** + * Helper class for passing configuration options to JsonProtocol. + * We use this instead of passing SparkConf directly because it lets us avoid + * repeated re-parsing of configuration values on each read. + */ +private[spark] class JsonProtocolOptions(conf: SparkConf) { + val includeTaskMetricsAccumulables: Boolean = true +} + /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output @@ -60,13 +69,19 @@ private[spark] object JsonProtocol { private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) + /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ def sparkEventToJsonString(event: SparkListenerEvent): String = { + sparkEventToJsonString(event, defaultOptions) + } + + def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = { toJsonString { generator => - writeSparkEventToJson(event, generator) + writeSparkEventToJson(event, generator, options) } } @@ -79,20 +94,23 @@ private[spark] object JsonProtocol { new String(baos.toByteArray, StandardCharsets.UTF_8) } - def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = { + def writeSparkEventToJson( + event: SparkListenerEvent, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => - stageSubmittedToJson(stageSubmitted, g) + stageSubmittedToJson(stageSubmitted, g, options) case stageCompleted: SparkListenerStageCompleted => - stageCompletedToJson(stageCompleted, g) + stageCompletedToJson(stageCompleted, g, options) case taskStart: SparkListenerTaskStart => - taskStartToJson(taskStart, g) + taskStartToJson(taskStart, g, options) case taskGettingResult: SparkListenerTaskGettingResult => - taskGettingResultToJson(taskGettingResult, g) + taskGettingResultToJson(taskGettingResult, g, options) case taskEnd: SparkListenerTaskEnd => - taskEndToJson(taskEnd, g) + taskEndToJson(taskEnd, g, options) case jobStart: SparkListenerJobStart => - jobStartToJson(jobStart, g) + jobStartToJson(jobStart, g, options) case jobEnd: SparkListenerJobEnd => jobEndToJson(jobEnd, g) case environmentUpdate: SparkListenerEnvironmentUpdate => @@ -126,11 +144,14 @@ private[spark] object JsonProtocol { } } - def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = { + def stageSubmittedToJson( + stageSubmitted: SparkListenerStageSubmitted, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) g.writeFieldName("Stage Info") - stageInfoToJson(stageSubmitted.stageInfo, g) + stageInfoToJson(stageSubmitted.stageInfo, g, options) Option(stageSubmitted.properties).foreach { properties => g.writeFieldName("Properties") propertiesToJson(properties, g) @@ -138,36 +159,46 @@ private[spark] object JsonProtocol { g.writeEndObject() } - def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = { + def stageCompletedToJson( + stageCompleted: SparkListenerStageCompleted, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) g.writeFieldName("Stage Info") - stageInfoToJson(stageCompleted.stageInfo, g) + stageInfoToJson(stageCompleted.stageInfo, g, options) g.writeEndObject() } - def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = { + def taskStartToJson( + taskStart: SparkListenerTaskStart, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) g.writeNumberField("Stage ID", taskStart.stageId) g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId) g.writeFieldName("Task Info") - taskInfoToJson(taskStart.taskInfo, g) + taskInfoToJson(taskStart.taskInfo, g, options) g.writeEndObject() } def taskGettingResultToJson( taskGettingResult: SparkListenerTaskGettingResult, - g: JsonGenerator): Unit = { + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { val taskInfo = taskGettingResult.taskInfo g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) g.writeFieldName("Task Info") - taskInfoToJson(taskInfo, g) + taskInfoToJson(taskInfo, g, options) g.writeEndObject() } - def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = { + def taskEndToJson( + taskEnd: SparkListenerTaskEnd, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) g.writeNumberField("Stage ID", taskEnd.stageId) @@ -176,7 +207,7 @@ private[spark] object JsonProtocol { g.writeFieldName("Task End Reason") taskEndReasonToJson(taskEnd.reason, g) g.writeFieldName("Task Info") - taskInfoToJson(taskEnd.taskInfo, g) + taskInfoToJson(taskEnd.taskInfo, g, options) g.writeFieldName("Task Executor Metrics") executorMetricsToJson(taskEnd.taskExecutorMetrics, g) Option(taskEnd.taskMetrics).foreach { m => @@ -186,13 +217,16 @@ private[spark] object JsonProtocol { g.writeEndObject() } - def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit = { + def jobStartToJson( + jobStart: SparkListenerJobStart, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) g.writeNumberField("Job ID", jobStart.jobId) g.writeNumberField("Submission Time", jobStart.time) g.writeArrayFieldStart("Stage Infos") // Added in Spark 1.2.0 - jobStart.stageInfos.foreach(stageInfoToJson(_, g)) + jobStart.stageInfos.foreach(stageInfoToJson(_, g, options)) g.writeEndArray() g.writeArrayFieldStart("Stage IDs") jobStart.stageIds.foreach(g.writeNumber) @@ -388,7 +422,10 @@ private[spark] object JsonProtocol { * JSON serialization methods for classes SparkListenerEvents depend on | * -------------------------------------------------------------------- */ - def stageInfoToJson(stageInfo: StageInfo, g: JsonGenerator): Unit = { + def stageInfoToJson( + stageInfo: StageInfo, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeNumberField("Stage ID", stageInfo.stageId) g.writeNumberField("Stage Attempt ID", stageInfo.attemptNumber) @@ -405,14 +442,20 @@ private[spark] object JsonProtocol { stageInfo.completionTime.foreach(g.writeNumberField("Completion Time", _)) stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _)) g.writeFieldName("Accumulables") - accumulablesToJson(stageInfo.accumulables.values, g) + accumulablesToJson( + stageInfo.accumulables.values, + g, + includeTaskMetricsAccumulables = options.includeTaskMetricsAccumulables) g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId) g.writeBooleanField("Shuffle Push Enabled", stageInfo.isShufflePushEnabled) g.writeNumberField("Shuffle Push Mergers Count", stageInfo.shuffleMergerCount) g.writeEndObject() } - def taskInfoToJson(taskInfo: TaskInfo, g: JsonGenerator): Unit = { + def taskInfoToJson( + taskInfo: TaskInfo, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeNumberField("Task ID", taskInfo.taskId) g.writeNumberField("Index", taskInfo.index) @@ -428,17 +471,30 @@ private[spark] object JsonProtocol { g.writeBooleanField("Failed", taskInfo.failed) g.writeBooleanField("Killed", taskInfo.killed) g.writeFieldName("Accumulables") - accumulablesToJson(taskInfo.accumulables, g) + accumulablesToJson( + taskInfo.accumulables, + g, + includeTaskMetricsAccumulables = options.includeTaskMetricsAccumulables) g.writeEndObject() } private[this] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) - def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = { + private[this] val taskMetricAccumulableNames = TaskMetrics.empty.nameToAccums.keySet.toSet + + def accumulablesToJson( + accumulables: Iterable[AccumulableInfo], + g: JsonGenerator, + includeTaskMetricsAccumulables: Boolean = true): Unit = { g.writeStartArray() accumulables - .filterNot(_.name.exists(accumulableExcludeList.contains)) - .toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g)) + .filterNot { acc => + acc.name.exists(accumulableExcludeList.contains) || + (!includeTaskMetricsAccumulables && acc.name.exists(taskMetricAccumulableNames.contains)) + } + .toList + .sortBy(_.id) + .foreach(a => accumulableInfoToJson(a, g)) g.writeEndArray() } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index be8a165d2d2e4..41e1bc74fd7c4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage._ class JsonProtocolSuite extends SparkFunSuite { - import JsonProtocol.toJsonString + import JsonProtocol._ import JsonProtocolSuite._ test("SparkListenerEvent") { @@ -271,7 +271,7 @@ class JsonProtocolSuite extends SparkFunSuite { test("StageInfo backward compatibility (details, accumulables)") { val info = makeStageInfo(1, 2, 3, 4L, 5L) - val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _)) + val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _, defaultOptions)) // Fields added after 1.0.0. assert(info.details.nonEmpty) @@ -289,7 +289,7 @@ class JsonProtocolSuite extends SparkFunSuite { test("StageInfo resourceProfileId") { val info = makeStageInfo(1, 2, 3, 4L, 5L, 5) - val json = toJsonString(JsonProtocol.stageInfoToJson(info, _)) + val json = toJsonString(JsonProtocol.stageInfoToJson(info, _, defaultOptions)) // Fields added after 1.0.0. assert(info.details.nonEmpty) @@ -466,7 +466,7 @@ class JsonProtocolSuite extends SparkFunSuite { stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties) - val oldEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _)).removeField("Stage Infos") + val oldEvent = sparkEventToJsonString(jobStart).removeField("Stage Infos") val expectedJobStart = SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties) assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent)) @@ -478,8 +478,7 @@ class JsonProtocolSuite extends SparkFunSuite { val stageIds = Seq[Int](1, 2, 3, 4) val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40L, x * 50L)) val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties) - val oldStartEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _)) - .removeField("Submission Time") + val oldStartEvent = sparkEventToJsonString(jobStart).removeField("Submission Time") val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties) assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent)) @@ -513,7 +512,7 @@ class JsonProtocolSuite extends SparkFunSuite { // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - val oldStageInfo = toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _)) + val oldStageInfo = toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _, defaultOptions)) .removeField("Parent IDs") val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) @@ -783,7 +782,7 @@ class JsonProtocolSuite extends SparkFunSuite { private[spark] object JsonProtocolSuite extends Assertions { import InternalAccumulator._ - import JsonProtocol.toJsonString + import JsonProtocol._ private val mapper = new ObjectMapper() @@ -833,7 +832,7 @@ private[spark] object JsonProtocolSuite extends Assertions { private def testStageInfo(info: StageInfo): Unit = { val newInfo = JsonProtocol.stageInfoFromJson( - toJsonString(JsonProtocol.stageInfoToJson(info, _))) + toJsonString(JsonProtocol.stageInfoToJson(info, _, defaultOptions))) assertEquals(info, newInfo) } @@ -857,7 +856,7 @@ private[spark] object JsonProtocolSuite extends Assertions { private def testTaskInfo(info: TaskInfo): Unit = { val newInfo = JsonProtocol.taskInfoFromJson( - toJsonString(JsonProtocol.taskInfoToJson(info, _))) + toJsonString(JsonProtocol.taskInfoToJson(info, _, defaultOptions))) assertEquals(info, newInfo) } From cb6435c881d214618e4593d411598d2a7bb634d7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 26 Jan 2023 19:05:56 -0800 Subject: [PATCH 4/7] Implement flag and add tests. --- .../spark/internal/config/package.scala | 12 ++ .../org/apache/spark/util/JsonProtocol.scala | 14 ++- .../apache/spark/util/JsonProtocolSuite.scala | 106 ++++++++++++++++++ 3 files changed, 126 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index be210cfe59b3d..99a62e2b545bb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -212,6 +212,18 @@ package object config { .toSequence .createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS) + private[spark] val EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS = + ConfigBuilder("spark.eventLog.includeTaskMetricsAccumulators") + .doc("Whether to include TaskMetrics' underlying accumulator values in the event log (as " + + "part of the Task/Stage/Job metrics' 'Accumulables' fields. This configuration defaults " + + "to false because the TaskMetrics values are already logged in the 'Task Metrics' " + + "fields (so the accumulator updates are redundant). This flag exists only as a " + + "backwards-compatibility escape hatch for applications that might rely on the old " + + "behavior. See SPARK-42204 for details.") + .version("3.5.0") + .booleanConf + .createWithDefault(false) + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite") .version("1.0.0") diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8d59833255b15..6d3da426f5320 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods.compact import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} @@ -45,7 +46,8 @@ import org.apache.spark.util.Utils.weakIntern * repeated re-parsing of configuration values on each read. */ private[spark] class JsonProtocolOptions(conf: SparkConf) { - val includeTaskMetricsAccumulables: Boolean = true + val includeTaskMetricsAccumulators: Boolean = + conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS) } /** @@ -445,7 +447,7 @@ private[spark] object JsonProtocol { accumulablesToJson( stageInfo.accumulables.values, g, - includeTaskMetricsAccumulables = options.includeTaskMetricsAccumulables) + includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators) g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId) g.writeBooleanField("Shuffle Push Enabled", stageInfo.isShufflePushEnabled) g.writeNumberField("Shuffle Push Mergers Count", stageInfo.shuffleMergerCount) @@ -474,23 +476,23 @@ private[spark] object JsonProtocol { accumulablesToJson( taskInfo.accumulables, g, - includeTaskMetricsAccumulables = options.includeTaskMetricsAccumulables) + includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators) g.writeEndObject() } - private[this] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) + private[util] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) private[this] val taskMetricAccumulableNames = TaskMetrics.empty.nameToAccums.keySet.toSet def accumulablesToJson( accumulables: Iterable[AccumulableInfo], g: JsonGenerator, - includeTaskMetricsAccumulables: Boolean = true): Unit = { + includeTaskMetricsAccumulators: Boolean = true): Unit = { g.writeStartArray() accumulables .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) || - (!includeTaskMetricsAccumulables && acc.name.exists(taskMetricAccumulableNames.contains)) + (!includeTaskMetricsAccumulators && acc.name.exists(taskMetricAccumulableNames.contains)) } .toList .sortBy(_.id) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 41e1bc74fd7c4..5bef4eca41cab 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource._ @@ -777,6 +778,111 @@ class JsonProtocolSuite extends SparkFunSuite { |}""".stripMargin assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected) } + + test("SPARK-42204: spark.eventLog.includeTaskMetricsAccumulators config") { + val includeConf = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, true)) + val excludeConf = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, false)) + + val taskMetricsAccumulables = TaskMetrics + .empty + .nameToAccums + .filterKeys(!JsonProtocol.accumulableExcludeList.contains(_)) + .values + .map(_.toInfo(Some(1), None)) + .toSeq + + val taskInfoWithTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 444L, false) + taskInfoWithTaskMetricsAccums.setAccumulables(taskMetricsAccumulables) + val taskInfoWithoutTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 444L, false) + taskInfoWithoutTaskMetricsAccums.setAccumulables(Seq.empty) + + val stageInfoWithTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 500L) + stageInfoWithTaskMetricsAccums.accumulables.clear() + stageInfoWithTaskMetricsAccums.accumulables ++= taskMetricsAccumulables.map(x => (x.id, x)) + val stageInfoWithoutTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 500L) + stageInfoWithoutTaskMetricsAccums.accumulables.clear() + + // Test events which should be impacted by the config. Due to SPARK-42205 we need to test + // start events in addition to end events. + + // TaskStart + { + val originalEvent = SparkListenerTaskStart(111, 0, taskInfoWithTaskMetricsAccums) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(taskInfo = taskInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // TaskEnd + { + val originalEvent = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, + taskInfoWithTaskMetricsAccums, + new ExecutorMetrics(Array(12L, 23L, 45L, 67L, 78L, 89L, + 90L, 123L, 456L, 789L, 40L, 20L, 20L, 10L, 20L, 10L, 301L)), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, 0, + hasHadoopInput = false, hasOutput = false)) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(taskInfo = taskInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // StageSubmitted + { + val originalEvent = SparkListenerStageSubmitted(stageInfoWithTaskMetricsAccums, properties) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(stageInfo = stageInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // StageCompleted + { + val originalEvent = SparkListenerStageCompleted(stageInfoWithTaskMetricsAccums) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(stageInfo = stageInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // JobStart + { + val originalEvent = + SparkListenerJobStart(1, 1, Seq(stageInfoWithTaskMetricsAccums), properties) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(stageInfos = Seq(stageInfoWithoutTaskMetricsAccums)) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // ExecutorMetricsUpdate events should be unaffected by the config: + val executorMetricsUpdate = + SparkListenerExecutorMetricsUpdate("0", Seq((0, 0, 0, taskMetricsAccumulables))) + assert( + sparkEventToJsonString(executorMetricsUpdate, includeConf) === + sparkEventToJsonString(executorMetricsUpdate, excludeConf)) + assertEquals( + JsonProtocol.sparkEventFromJson(sparkEventToJsonString(executorMetricsUpdate, includeConf)), + executorMetricsUpdate) + } } From ab539bf696538b83ed27e74dbce5860e23ab1afc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jul 2024 19:24:32 -0700 Subject: [PATCH 5/7] 3.5.0 -> 4.0.0 for conf version --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 71a8632936974..9eee293ee2a40 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -267,7 +267,7 @@ package object config { "fields (so the accumulator updates are redundant). This flag exists only as a " + "backwards-compatibility escape hatch for applications that might rely on the old " + "behavior. See SPARK-42204 for details.") - .version("3.5.0") + .version("4.0.0") .booleanConf .createWithDefault(false) From bbb0ade1941fc16ca0932c8ef1c39d8bbe61062f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Jul 2024 19:27:07 -0700 Subject: [PATCH 6/7] Minor review comments --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a93f95a8e16d3..901f0eb022eb3 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -66,12 +66,14 @@ private[spark] class JsonProtocolOptions(conf: SparkConf) { private[spark] object JsonProtocol extends JsonUtils { // TODO: Remove this file and put JSON serialization into each individual class. + private[util] val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ + // Only for use in tests. Production code should use the two-argument overload defined below. def sparkEventToJsonString(event: SparkListenerEvent): String = { sparkEventToJsonString(event, defaultOptions) } From c6c03762a66561f6d2ea90f497e60709ae404bc1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 4 Sep 2024 16:17:15 -0700 Subject: [PATCH 7/7] empty commit to re-trigger tests