From 3751e67892bbd6e63978883f89486a955b149331 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Jan 2023 00:04:03 -0800 Subject: [PATCH 1/4] Add missing backward compatibility test for Task Executor Metrics field --- .../org/apache/spark/util/JsonProtocolSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index be8a165d2d2e4..c3e27f855d98b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage._ class JsonProtocolSuite extends SparkFunSuite { - import JsonProtocol.toJsonString + import JsonProtocol._ import JsonProtocolSuite._ test("SparkListenerEvent") { @@ -368,6 +368,18 @@ class JsonProtocolSuite extends SparkFunSuite { assert(newMetrics.peakExecutionMemory == 0) } + test("Task Executor Metrics backward compatibility") { + // The "Task Executor Metrics" field was introduced in Spark 3.0.0 in SPARK-23429 + val oldJson = taskEndJsonString.removeField("Task Executor Metrics") + val oldTaskEnd = JsonProtocol.taskEndFromJson(oldJson) + val newTaskEnd = JsonProtocol.taskEndFromJson(taskEndJsonString) + assert(oldTaskEnd.taskExecutorMetrics != newTaskEnd.taskExecutorMetrics) + assert(oldTaskEnd.taskExecutorMetrics.isSet()) + ExecutorMetricType.metricToOffset.keys.foreach { metric => + assert(oldTaskEnd.taskExecutorMetrics.getMetricValue(metric) == 0) + } + } + test("StorageLevel backward compatibility") { // "Use Off Heap" was added in Spark 3.4.0 val level = StorageLevel( From 06f1b2002010d570d0c4257347e14f01296406a7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Jan 2023 00:24:42 -0800 Subject: [PATCH 2/4] skip logging --- .../org/apache/spark/executor/ExecutorMetrics.scala | 12 ++++++++++++ .../scala/org/apache/spark/util/JsonProtocol.scala | 11 +++++++++-- .../org/apache/spark/util/JsonProtocolSuite.scala | 9 +++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index 486e59652218b..e4ee69f82dd5c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -85,6 +85,18 @@ class ExecutorMetrics private[spark] extends Serializable { } updated } + + private[spark] def allMetricsAreZero(): Boolean = { + var foundNonZero = false + var i = 0 + while (i < metrics.length && !foundNonZero) { + if (metrics(i) != 0) { + foundNonZero = true + } + i += 1 + } + !foundNonZero + } } private[spark] object ExecutorMetrics { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 75dab8dc5358b..cabcfa9bbeb95 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -177,8 +177,15 @@ private[spark] object JsonProtocol { taskEndReasonToJson(taskEnd.reason, g) g.writeFieldName("Task Info") taskInfoToJson(taskEnd.taskInfo, g) - g.writeFieldName("Task Executor Metrics") - executorMetricsToJson(taskEnd.taskExecutorMetrics, g) + // SPARK-42206: if all metrics are zero (which is often the case when + // spark.executor.metrics.pollingInterval = 0 (the default config) and tasks complete + // between executor heartbeats) then omit the metrics field in order to save space + // in the event log JSON. The Spark History Server already treats missing metrics + // as all zero values, so this change has no impact on history server UI reconstruction. + if (!taskEnd.taskExecutorMetrics.allMetricsAreZero()) { + g.writeFieldName("Task Executor Metrics") + executorMetricsToJson(taskEnd.taskExecutorMetrics, g) + } Option(taskEnd.taskMetrics).foreach { m => g.writeFieldName("Task Metrics") taskMetricsToJson(m, g) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c3e27f855d98b..ecd7eb94fa33a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -790,6 +790,15 @@ class JsonProtocolSuite extends SparkFunSuite { |}""".stripMargin assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected) } + + test("SPARK-42206: skip logging of all-zero Task Executor Metrics") { + val allZeroTaskExecutorMetrics = new ExecutorMetrics(Map.empty[String, Long]) + val taskEnd = taskEndFromJson(taskEndJsonString) + .copy(taskExecutorMetrics = allZeroTaskExecutorMetrics) + val json = sparkEventToJsonString(taskEnd) + assertEquals(taskEnd, taskEndFromJson(json)) + assert(!json.has("Task Executor Metrics")) + } } From fe82737ee7f0ede24b95e8dd10d7964f9009a998 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Jan 2023 00:43:06 -0800 Subject: [PATCH 3/4] Add flag --- .../spark/internal/config/package.scala | 13 +++++++ .../scheduler/EventLoggingListener.scala | 8 ++-- .../org/apache/spark/util/JsonProtocol.scala | 37 ++++++++++++++++--- .../apache/spark/util/JsonProtocolSuite.scala | 18 +++++++-- 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index be210cfe59b3d..009a57e3df453 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -212,6 +212,19 @@ package object config { .toSequence .createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS) + private[spark] val EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS = + ConfigBuilder("spark.eventLog.includeAllZeroTaskExecutorMetrics") + .doc("Controls whether task end events will include the 'Task Executor Metrics' field " + + "even if all metric values are zero. In Spark 3.4.x and earlier these metrics would be " + + "logged even if all values were zero, but Spark 3.5.0 omits the metrics from the event " + + "to save space in logs. this flag does not impact Spark History Server behavior: it " + + "exists only as a backwards-compatibility escape hatch for user applications that " + + "might be directly parsing event logs and that relied on the old behavior. " + + "See SPARK-42206 for more details.") + .version("3.5.0") + .booleanConf + .createWithDefault(false) + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite") .version("1.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index b52a0f2f999dd..ec92368994390 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils} /** * A SparkListener that logs events to persistent storage. @@ -74,6 +74,8 @@ private[spark] class EventLoggingListener( private val liveStageExecutorMetrics = mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]] + private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf) + /** * Creates the log file in the configured log directory. */ @@ -84,7 +86,7 @@ private[spark] class EventLoggingListener( private def initEventLog(): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) - val eventJson = JsonProtocol.sparkEventToJsonString(metadata) + val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger = true) if (testing && loggedEvents != null) { loggedEvents += eventJson @@ -93,7 +95,7 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = { - val eventJson = JsonProtocol.sparkEventToJsonString(event) + val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger) if (testing) { loggedEvents += eventJson diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index cabcfa9bbeb95..357f095128e85 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods.compact import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} @@ -39,6 +40,16 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage._ import org.apache.spark.util.Utils.weakIntern +/** + * Helper class for passing configuration options to JsonProtocol. + * We use this instead of passing SparkConf directly because it lets us avoid + * repeated re-parsing of configuration values on each read. + */ +private[spark] class JsonProtocolOptions(conf: SparkConf) { + val includeAllZeroTaskExecutorMetrics: Boolean = + conf.get(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS) +} + /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output @@ -60,13 +71,19 @@ private[spark] object JsonProtocol { private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) + /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ - def sparkEventToJsonString(event: SparkListenerEvent): String = { + def sparkEventToJsonString(event: SparkListenerEvent): String = { + sparkEventToJsonString(event, defaultOptions) + } + + def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = { toJsonString { generator => - writeSparkEventToJson(event, generator) + writeSparkEventToJson(event, generator, options) } } @@ -79,7 +96,10 @@ private[spark] object JsonProtocol { new String(baos.toByteArray, StandardCharsets.UTF_8) } - def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = { + def writeSparkEventToJson( + event: SparkListenerEvent, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => stageSubmittedToJson(stageSubmitted, g) @@ -90,7 +110,7 @@ private[spark] object JsonProtocol { case taskGettingResult: SparkListenerTaskGettingResult => taskGettingResultToJson(taskGettingResult, g) case taskEnd: SparkListenerTaskEnd => - taskEndToJson(taskEnd, g) + taskEndToJson(taskEnd, g, options) case jobStart: SparkListenerJobStart => jobStartToJson(jobStart, g) case jobEnd: SparkListenerJobEnd => @@ -167,7 +187,10 @@ private[spark] object JsonProtocol { g.writeEndObject() } - def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = { + def taskEndToJson( + taskEnd: SparkListenerTaskEnd, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) g.writeNumberField("Stage ID", taskEnd.stageId) @@ -182,7 +205,9 @@ private[spark] object JsonProtocol { // between executor heartbeats) then omit the metrics field in order to save space // in the event log JSON. The Spark History Server already treats missing metrics // as all zero values, so this change has no impact on history server UI reconstruction. - if (!taskEnd.taskExecutorMetrics.allMetricsAreZero()) { + if ( + options.includeAllZeroTaskExecutorMetrics || + !taskEnd.taskExecutorMetrics.allMetricsAreZero()) { g.writeFieldName("Task Executor Metrics") executorMetricsToJson(taskEnd.taskExecutorMetrics, g) } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index ecd7eb94fa33a..731e142dd3ccd 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource._ @@ -795,9 +796,20 @@ class JsonProtocolSuite extends SparkFunSuite { val allZeroTaskExecutorMetrics = new ExecutorMetrics(Map.empty[String, Long]) val taskEnd = taskEndFromJson(taskEndJsonString) .copy(taskExecutorMetrics = allZeroTaskExecutorMetrics) - val json = sparkEventToJsonString(taskEnd) - assertEquals(taskEnd, taskEndFromJson(json)) - assert(!json.has("Task Executor Metrics")) + + // Test new default behavior: + val newOptions = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS, false)) + val newJson = sparkEventToJsonString(taskEnd, newOptions) + assertEquals(taskEnd, taskEndFromJson(newJson)) + assert(!newJson.has("Task Executor Metrics")) + + // Test backwards compatibility flag: + val oldOptions = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS, true)) + val oldJson = sparkEventToJsonString(taskEnd, oldOptions) + assertEquals(taskEnd, taskEndFromJson(oldJson)) + assert(oldJson.has("Task Executor Metrics")) } } From ffbb0c001325bfb39ddb956f07da69b534e7d212 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Jan 2023 11:29:36 -0800 Subject: [PATCH 4/4] Fix Scala 2.13 compilation. --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 731e142dd3ccd..b75f005b98c98 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -793,7 +793,7 @@ class JsonProtocolSuite extends SparkFunSuite { } test("SPARK-42206: skip logging of all-zero Task Executor Metrics") { - val allZeroTaskExecutorMetrics = new ExecutorMetrics(Map.empty[String, Long]) + val allZeroTaskExecutorMetrics = new ExecutorMetrics(Array.emptyLongArray) val taskEnd = taskEndFromJson(taskEndJsonString) .copy(taskExecutorMetrics = allZeroTaskExecutorMetrics)