Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ class ExecutorMetrics private[spark] extends Serializable {
}
updated
}

private[spark] def allMetricsAreZero(): Boolean = {
var foundNonZero = false
var i = 0
while (i < metrics.length && !foundNonZero) {
if (metrics(i) != 0) {
foundNonZero = true
}
i += 1
}
!foundNonZero
}
}

private[spark] object ExecutorMetrics {
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,19 @@ package object config {
.toSequence
.createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS)

private[spark] val EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS =
ConfigBuilder("spark.eventLog.includeAllZeroTaskExecutorMetrics")
.doc("Controls whether task end events will include the 'Task Executor Metrics' field " +
"even if all metric values are zero. In Spark 3.4.x and earlier these metrics would be " +
"logged even if all values were zero, but Spark 3.5.0 omits the metrics from the event " +
"to save space in logs. this flag does not impact Spark History Server behavior: it " +
"exists only as a backwards-compatibility escape hatch for user applications that " +
"might be directly parsing event logs and that relied on the old behavior. " +
"See SPARK-42206 for more details.")
.version("3.5.0")
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite")
.version("1.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils}

/**
* A SparkListener that logs events to persistent storage.
Expand Down Expand Up @@ -74,6 +74,8 @@ private[spark] class EventLoggingListener(
private val liveStageExecutorMetrics =
mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]]

private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf)

/**
* Creates the log file in the configured log directory.
*/
Expand All @@ -84,7 +86,7 @@ private[spark] class EventLoggingListener(

private def initEventLog(): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
val eventJson = JsonProtocol.sparkEventToJsonString(metadata)
val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions)
logWriter.writeEvent(eventJson, flushLogger = true)
if (testing && loggedEvents != null) {
loggedEvents += eventJson
Expand All @@ -93,7 +95,7 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = {
val eventJson = JsonProtocol.sparkEventToJsonString(event)
val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions)
logWriter.writeEvent(eventJson, flushLogger)
if (testing) {
loggedEvents += eventJson
Expand Down
46 changes: 39 additions & 7 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
Expand All @@ -39,6 +40,16 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
import org.apache.spark.util.Utils.weakIntern

/**
* Helper class for passing configuration options to JsonProtocol.
* We use this instead of passing SparkConf directly because it lets us avoid
* repeated re-parsing of configuration values on each read.
*/
private[spark] class JsonProtocolOptions(conf: SparkConf) {
Copy link
Contributor Author

@JoshRosen JoshRosen Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same flagging pattern used in #39763, so these PRs will conflict and will need updating after one of them merges. I've marked this PR as WIP pending discussion on how / whether we want to flag this change (perhaps I'm being overly conservative and we'd be okay with saying that third-party code directly consuming event logs should update itself to be robust to the new / changed format and not go out of our way to provide escape hatches for this use-case).

val includeAllZeroTaskExecutorMetrics: Boolean =
conf.get(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS)
}

/**
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
* and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
Expand All @@ -60,13 +71,19 @@ private[spark] object JsonProtocol {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */

def sparkEventToJsonString(event: SparkListenerEvent): String = {
def sparkEventToJsonString(event: SparkListenerEvent): String = {
sparkEventToJsonString(event, defaultOptions)
}

def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = {
toJsonString { generator =>
writeSparkEventToJson(event, generator)
writeSparkEventToJson(event, generator, options)
}
}

Expand All @@ -79,7 +96,10 @@ private[spark] object JsonProtocol {
new String(baos.toByteArray, StandardCharsets.UTF_8)
}

def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = {
def writeSparkEventToJson(
event: SparkListenerEvent,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
stageSubmittedToJson(stageSubmitted, g)
Expand All @@ -90,7 +110,7 @@ private[spark] object JsonProtocol {
case taskGettingResult: SparkListenerTaskGettingResult =>
taskGettingResultToJson(taskGettingResult, g)
case taskEnd: SparkListenerTaskEnd =>
taskEndToJson(taskEnd, g)
taskEndToJson(taskEnd, g, options)
case jobStart: SparkListenerJobStart =>
jobStartToJson(jobStart, g)
case jobEnd: SparkListenerJobEnd =>
Expand Down Expand Up @@ -167,7 +187,10 @@ private[spark] object JsonProtocol {
g.writeEndObject()
}

def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
def taskEndToJson(
taskEnd: SparkListenerTaskEnd,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
g.writeNumberField("Stage ID", taskEnd.stageId)
Expand All @@ -177,8 +200,17 @@ private[spark] object JsonProtocol {
taskEndReasonToJson(taskEnd.reason, g)
g.writeFieldName("Task Info")
taskInfoToJson(taskEnd.taskInfo, g)
g.writeFieldName("Task Executor Metrics")
executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
// SPARK-42206: if all metrics are zero (which is often the case when
// spark.executor.metrics.pollingInterval = 0 (the default config) and tasks complete
// between executor heartbeats) then omit the metrics field in order to save space
// in the event log JSON. The Spark History Server already treats missing metrics
// as all zero values, so this change has no impact on history server UI reconstruction.
if (
options.includeAllZeroTaskExecutorMetrics ||
!taskEnd.taskExecutorMetrics.allMetricsAreZero()) {
g.writeFieldName("Task Executor Metrics")
executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
}
Option(taskEnd.taskMetrics).foreach { m =>
g.writeFieldName("Task Metrics")
taskMetricsToJson(m, g)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.exceptions.TestFailedException

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
import org.apache.spark.resource._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage._

class JsonProtocolSuite extends SparkFunSuite {
import JsonProtocol.toJsonString
import JsonProtocol._
import JsonProtocolSuite._

test("SparkListenerEvent") {
Expand Down Expand Up @@ -368,6 +369,18 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(newMetrics.peakExecutionMemory == 0)
}

test("Task Executor Metrics backward compatibility") {
// The "Task Executor Metrics" field was introduced in Spark 3.0.0 in SPARK-23429
val oldJson = taskEndJsonString.removeField("Task Executor Metrics")
val oldTaskEnd = JsonProtocol.taskEndFromJson(oldJson)
val newTaskEnd = JsonProtocol.taskEndFromJson(taskEndJsonString)
assert(oldTaskEnd.taskExecutorMetrics != newTaskEnd.taskExecutorMetrics)
assert(oldTaskEnd.taskExecutorMetrics.isSet())
ExecutorMetricType.metricToOffset.keys.foreach { metric =>
assert(oldTaskEnd.taskExecutorMetrics.getMetricValue(metric) == 0)
}
}

test("StorageLevel backward compatibility") {
// "Use Off Heap" was added in Spark 3.4.0
val level = StorageLevel(
Expand Down Expand Up @@ -778,6 +791,26 @@ class JsonProtocolSuite extends SparkFunSuite {
|}""".stripMargin
assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected)
}

test("SPARK-42206: skip logging of all-zero Task Executor Metrics") {
val allZeroTaskExecutorMetrics = new ExecutorMetrics(Array.emptyLongArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map.empty[String, Long] should also ok for Scala 2.13 after #39772 merged

val taskEnd = taskEndFromJson(taskEndJsonString)
.copy(taskExecutorMetrics = allZeroTaskExecutorMetrics)

// Test new default behavior:
val newOptions = new JsonProtocolOptions(
new SparkConf().set(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS, false))
val newJson = sparkEventToJsonString(taskEnd, newOptions)
assertEquals(taskEnd, taskEndFromJson(newJson))
assert(!newJson.has("Task Executor Metrics"))

// Test backwards compatibility flag:
val oldOptions = new JsonProtocolOptions(
new SparkConf().set(EVENT_LOG_INCLUDE_ALL_ZERO_TASK_EXECUTOR_METRICS, true))
val oldJson = sparkEventToJsonString(taskEnd, oldOptions)
assertEquals(taskEnd, taskEndFromJson(oldJson))
assert(oldJson.has("Task Executor Metrics"))
}
}


Expand Down