Skip to content

Commit 2f6cca5

Browse files
JoshRosenMridul Muralidharan
authored andcommitted
[SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
### What changes were proposed in this pull request? This PR aims to reduce the memory consumption of `LiveStageMetrics.accumIdsToMetricType`, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs. In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics. Currently, that field is derived from `LiveExecutionData.metrics`, which contains metrics for _all_ operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all `LiveStageMetrics` instances will hold the same immutable `accumIdsToMetricType`. The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new `accumIdsToMetricType` map being created. This PR fixes this by changing `accumIdsToMetricType` to be a mutable `mutable.HashMap` which is shared across all `LivestageMetrics` instances belonging to the same `LiveExecutionData`. The modified classes are `private` and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code. ### Why are the changes needed? Addresses one contributing factor behind high driver memory / OOMs when executing complex queries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using benchmark that ran copies of a complex query: each test query launches ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from LiveStageMetrics. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43250 from JoshRosen/reduce-accum-ids-to-metric-type-mem-overhead. Authored-by: Josh Rosen <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 6f46ea2 commit 2f6cca5

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class SQLAppStatusListener(
9595
executionData.details = sqlStoreData.details
9696
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
9797
executionData.modifiedConfigs = sqlStoreData.modifiedConfigs
98-
executionData.metrics = sqlStoreData.metrics
98+
executionData.addMetrics(sqlStoreData.metrics)
9999
executionData.submissionTime = sqlStoreData.submissionTime
100100
executionData.completionTime = sqlStoreData.completionTime
101101
executionData.jobs = sqlStoreData.jobs
@@ -111,7 +111,7 @@ class SQLAppStatusListener(
111111

112112
// Record the accumulator IDs and metric types for the stages of this job, so that the code
113113
// that keeps track of the metrics knows which accumulators to look at.
114-
val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
114+
val accumIdsAndType = exec.metricAccumulatorIdToMetricType
115115
if (accumIdsAndType.nonEmpty) {
116116
event.stageInfos.foreach { stage =>
117117
stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
@@ -361,7 +361,7 @@ class SQLAppStatusListener(
361361
exec.details = details
362362
exec.physicalPlanDescription = physicalPlanDescription
363363
exec.modifiedConfigs = modifiedConfigs
364-
exec.metrics = sqlPlanMetrics
364+
exec.addMetrics(sqlPlanMetrics)
365365
exec.submissionTime = time
366366
update(exec)
367367
}
@@ -383,15 +383,15 @@ class SQLAppStatusListener(
383383

384384
val exec = getOrCreateExecution(executionId)
385385
exec.physicalPlanDescription = physicalPlanDescription
386-
exec.metrics ++= sqlPlanMetrics
386+
exec.addMetrics(sqlPlanMetrics)
387387
update(exec)
388388
}
389389

390390
private def onAdaptiveSQLMetricUpdate(event: SparkListenerSQLAdaptiveSQLMetricUpdates): Unit = {
391391
val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event
392392

393393
val exec = getOrCreateExecution(executionId)
394-
exec.metrics ++= sqlPlanMetrics
394+
exec.addMetrics(sqlPlanMetrics)
395395
update(exec)
396396
}
397397

@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
490490
var details: String = null
491491
var physicalPlanDescription: String = null
492492
var modifiedConfigs: Map[String, String] = _
493-
var metrics = collection.Seq[SQLPlanMetric]()
493+
private var _metrics = collection.Seq[SQLPlanMetric]()
494+
def metrics: collection.Seq[SQLPlanMetric] = _metrics
495+
// This mapping is shared across all LiveStageMetrics instances associated with
496+
// this LiveExecutionData, helping to reduce memory overhead by avoiding waste
497+
// from separate immutable maps with largely overlapping sets of entries.
498+
val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]()
494499
var submissionTime = -1L
495500
var completionTime: Option[Date] = None
496501
var errorMessage: Option[String] = None
@@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
522527
metricsValues)
523528
}
524529

530+
def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
531+
_metrics ++= newMetrics
532+
newMetrics.foreach { m =>
533+
metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType)
534+
}
535+
}
525536
}
526537

527538
private class LiveStageMetrics(
528539
val stageId: Int,
529540
val attemptId: Int,
530541
val numTasks: Int,
531-
val accumIdsToMetricType: Map[Long, String]) {
542+
val accumIdsToMetricType: mutable.Map[Long, String]) {
532543

533544
/**
534545
* Mapping of task IDs to their respective index. Note this may contain more elements than the

0 commit comments

Comments
 (0)