Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ package object config {
.toSequence
.createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS)

private[spark] val EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS =
ConfigBuilder("spark.eventLog.includeTaskMetricsAccumulators")
.doc("Whether to include TaskMetrics' underlying accumulator values in the event log (as " +
"part of the Task/Stage/Job metrics' 'Accumulables' fields. This configuration defaults " +
"to false because the TaskMetrics values are already logged in the 'Task Metrics' " +
"fields (so the accumulator updates are redundant). This flag exists only as a " +
"backwards-compatibility escape hatch for applications that might rely on the old " +
"behavior. See SPARK-42204 for details.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite")
.version("1.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.{JsonProtocol, Utils}
import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils}

/**
* A SparkListener that logs events to persistent storage.
Expand Down Expand Up @@ -74,6 +74,8 @@ private[spark] class EventLoggingListener(
private val liveStageExecutorMetrics =
mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]]

private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf)

/**
* Creates the log file in the configured log directory.
*/
Expand All @@ -84,7 +86,7 @@ private[spark] class EventLoggingListener(

private def initEventLog(): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
val eventJson = JsonProtocol.sparkEventToJsonString(metadata)
val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions)
logWriter.writeEvent(eventJson, flushLogger = true)
if (testing && loggedEvents != null) {
loggedEvents += eventJson
Expand All @@ -93,7 +95,7 @@ private[spark] class EventLoggingListener(

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = {
val eventJson = JsonProtocol.sparkEventToJsonString(event)
val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions)
logWriter.writeEvent(eventJson, flushLogger)
if (testing) {
loggedEvents += eventJson
Expand Down
108 changes: 82 additions & 26 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.json4s.jackson.JsonMethods.compact

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.internal.config._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
Expand All @@ -37,6 +38,16 @@ import org.apache.spark.storage._
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils.weakIntern

/**
* Helper class for passing configuration options to JsonProtocol.
* We use this instead of passing SparkConf directly because it lets us avoid
* repeated re-parsing of configuration values on each read.
*/
private[spark] class JsonProtocolOptions(conf: SparkConf) {
Copy link
Contributor Author

@JoshRosen JoshRosen Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is a bit weird, so I'd like to explain it in greater detail here:

To date, JsonProtocol has just been a big object with no configuration options of its own. All other event logging configurations take effect at higher levels, such as EventLoggingListener. We cannot implement this particular change at those levels, though: we shouldn't mutate the mutable TaskInfo or StageInfo objects referenced by the listener events and adding copy() methods to them would be expensive.

JsonProtocol is currently implemented as a global object, so there's no straightforward way to make it easily configurable and testable. If I did something like "get the active SparkContext's configuration and read the configuration value" on each call then this would destroy performance because configuration reading is not cheap.

One approach would be to refactor this so that JsonProtocol is no longer a singleton: we could give it a proper constructor which accepts a SparkConf. That's a much larger refactoring than I want to undertake right now, though, and it would involve significant updates in a bunch of test code.

The approach taken here is to thread through an JsonProtocolOptions class which holds the configuration values. Right now it only holds a single value, but this is done as future-proofing in case we add additional configurability. The lone caller in EventLoggingListener constructs the JsonProtocolOptions using SparkConf, so the configuration is only read from the conf once (avoiding conf reader performance overheads).

This is a more C-style approach than the usual OOP approach that we generally take, but I think it's okay in this limited internal context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

val includeTaskMetricsAccumulators: Boolean =
conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS)
}

/**
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
* and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
Expand All @@ -55,30 +66,41 @@ import org.apache.spark.util.Utils.weakIntern
private[spark] object JsonProtocol extends JsonUtils {
// TODO: Remove this file and put JSON serialization into each individual class.

private[util]
val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false))

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */

// Only for use in tests. Production code should use the two-argument overload defined below.
def sparkEventToJsonString(event: SparkListenerEvent): String = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method using default configurations is retained for the purposes of avoiding changes in a bunch of test code across several suites.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note for this sparkEventToJsonString that it is only for tests ?

sparkEventToJsonString(event, defaultOptions)
}

def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = {
toJsonString { generator =>
writeSparkEventToJson(event, generator)
writeSparkEventToJson(event, generator, options)
}
}

def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = {
def writeSparkEventToJson(
event: SparkListenerEvent,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
stageSubmittedToJson(stageSubmitted, g)
stageSubmittedToJson(stageSubmitted, g, options)
case stageCompleted: SparkListenerStageCompleted =>
stageCompletedToJson(stageCompleted, g)
stageCompletedToJson(stageCompleted, g, options)
case taskStart: SparkListenerTaskStart =>
taskStartToJson(taskStart, g)
taskStartToJson(taskStart, g, options)
case taskGettingResult: SparkListenerTaskGettingResult =>
taskGettingResultToJson(taskGettingResult, g)
taskGettingResultToJson(taskGettingResult, g, options)
case taskEnd: SparkListenerTaskEnd =>
taskEndToJson(taskEnd, g)
taskEndToJson(taskEnd, g, options)
case jobStart: SparkListenerJobStart =>
jobStartToJson(jobStart, g)
jobStartToJson(jobStart, g, options)
case jobEnd: SparkListenerJobEnd =>
jobEndToJson(jobEnd, g)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
Expand Down Expand Up @@ -112,51 +134,64 @@ private[spark] object JsonProtocol extends JsonUtils {
}
}

def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = {
def stageSubmittedToJson(
stageSubmitted: SparkListenerStageSubmitted,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
g.writeFieldName("Stage Info")
// SPARK-42205: don't log accumulables in start events:
stageInfoToJson(stageSubmitted.stageInfo, g, includeAccumulables = false)
stageInfoToJson(stageSubmitted.stageInfo, g, options, includeAccumulables = false)
Option(stageSubmitted.properties).foreach { properties =>
g.writeFieldName("Properties")
propertiesToJson(properties, g)
}
g.writeEndObject()
}

def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = {
def stageCompletedToJson(
stageCompleted: SparkListenerStageCompleted,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
g.writeFieldName("Stage Info")
stageInfoToJson(stageCompleted.stageInfo, g, includeAccumulables = true)
stageInfoToJson(stageCompleted.stageInfo, g, options, includeAccumulables = true)
g.writeEndObject()
}

def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = {
def taskStartToJson(
taskStart: SparkListenerTaskStart,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
g.writeNumberField("Stage ID", taskStart.stageId)
g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
g.writeFieldName("Task Info")
// SPARK-42205: don't log accumulables in start events:
taskInfoToJson(taskStart.taskInfo, g, includeAccumulables = false)
taskInfoToJson(taskStart.taskInfo, g, options, includeAccumulables = false)
g.writeEndObject()
}

def taskGettingResultToJson(
taskGettingResult: SparkListenerTaskGettingResult,
g: JsonGenerator): Unit = {
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
val taskInfo = taskGettingResult.taskInfo
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
g.writeFieldName("Task Info")
// SPARK-42205: don't log accumulables in "task getting result" events:
taskInfoToJson(taskInfo, g, includeAccumulables = false)
taskInfoToJson(taskInfo, g, options, includeAccumulables = false)
g.writeEndObject()
}

def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
def taskEndToJson(
taskEnd: SparkListenerTaskEnd,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
g.writeNumberField("Stage ID", taskEnd.stageId)
Expand All @@ -165,7 +200,7 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeFieldName("Task End Reason")
taskEndReasonToJson(taskEnd.reason, g)
g.writeFieldName("Task Info")
taskInfoToJson(taskEnd.taskInfo, g, includeAccumulables = true)
taskInfoToJson(taskEnd.taskInfo, g, options, includeAccumulables = true)
g.writeFieldName("Task Executor Metrics")
executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
Option(taskEnd.taskMetrics).foreach { m =>
Expand All @@ -175,7 +210,10 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeEndObject()
}

def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit = {
def jobStartToJson(
jobStart: SparkListenerJobStart,
g: JsonGenerator,
options: JsonProtocolOptions): Unit = {
g.writeStartObject()
g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart)
g.writeNumberField("Job ID", jobStart.jobId)
Expand All @@ -186,7 +224,7 @@ private[spark] object JsonProtocol extends JsonUtils {
// the job was submitted: it is technically possible for a stage to belong to multiple
// concurrent jobs, so this situation can arise even without races occurring between
// event logging and stage completion.
jobStart.stageInfos.foreach(stageInfoToJson(_, g, includeAccumulables = true))
jobStart.stageInfos.foreach(stageInfoToJson(_, g, options, includeAccumulables = true))
g.writeEndArray()
g.writeArrayFieldStart("Stage IDs")
jobStart.stageIds.foreach(g.writeNumber)
Expand Down Expand Up @@ -386,6 +424,7 @@ private[spark] object JsonProtocol extends JsonUtils {
def stageInfoToJson(
stageInfo: StageInfo,
g: JsonGenerator,
options: JsonProtocolOptions,
includeAccumulables: Boolean): Unit = {
g.writeStartObject()
g.writeNumberField("Stage ID", stageInfo.stageId)
Expand All @@ -404,7 +443,10 @@ private[spark] object JsonProtocol extends JsonUtils {
stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _))
g.writeFieldName("Accumulables")
if (includeAccumulables) {
accumulablesToJson(stageInfo.accumulables.values, g)
accumulablesToJson(
stageInfo.accumulables.values,
g,
includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators)
} else {
g.writeStartArray()
g.writeEndArray()
Expand All @@ -418,6 +460,7 @@ private[spark] object JsonProtocol extends JsonUtils {
def taskInfoToJson(
taskInfo: TaskInfo,
g: JsonGenerator,
options: JsonProtocolOptions,
includeAccumulables: Boolean): Unit = {
g.writeStartObject()
g.writeNumberField("Task ID", taskInfo.taskId)
Expand All @@ -435,21 +478,34 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeBooleanField("Killed", taskInfo.killed)
g.writeFieldName("Accumulables")
if (includeAccumulables) {
accumulablesToJson(taskInfo.accumulables, g)
accumulablesToJson(
taskInfo.accumulables,
g,
includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators)
} else {
g.writeStartArray()
g.writeEndArray()
}
g.writeEndObject()
}

private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses")
private[util] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES)

private[this] val taskMetricAccumulableNames = TaskMetrics.empty.nameToAccums.keySet.toSet

def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = {
def accumulablesToJson(
accumulables: Iterable[AccumulableInfo],
g: JsonGenerator,
includeTaskMetricsAccumulators: Boolean = true): Unit = {
g.writeStartArray()
accumulables
.filterNot(_.name.exists(accumulableExcludeList.contains))
.toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g))
.filterNot { acc =>
acc.name.exists(accumulableExcludeList.contains) ||
(!includeTaskMetricsAccumulators && acc.name.exists(taskMetricAccumulableNames.contains))
}
.toList
.sortBy(_.id)
.foreach(a => accumulableInfoToJson(a, g))
g.writeEndArray()
}

Expand Down
Loading