Skip to content

Commit 0c1650c

Browse files
committed
resolve comments
1 parent 3c4c84f commit 0c1650c

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ case class AdaptiveSparkPlanExec(
6363
@transient context: AdaptiveExecutionContext,
6464
@transient preprocessingRules: Seq[Rule[SparkPlan]],
6565
@transient isSubquery: Boolean)
66-
extends LeafExecNode
67-
with AdaptiveSparkPlanHelper {
66+
extends LeafExecNode {
6867

6968
@transient private val lock = new Object()
7069

@@ -136,7 +135,7 @@ case class AdaptiveSparkPlanExec(
136135

137136
private def collectSQLMetrics(plan: SparkPlan): Seq[SQLMetric] = {
138137
val metrics = new mutable.ArrayBuffer[SQLMetric]()
139-
collect(plan) {
138+
plan.collect {
140139
case p: SparkPlan =>
141140
p.metrics.map { case metric =>
142141
metrics += metric._2
@@ -164,9 +163,6 @@ case class AdaptiveSparkPlanExec(
164163
currentPhysicalPlan = result.newPlan
165164
if (result.newStages.nonEmpty) {
166165
stagesToReplace = result.newStages ++ stagesToReplace
167-
if (isSubquery) {
168-
onUpdateAccumulator(collectSQLMetrics(this))
169-
}
170166
executionId.foreach(onUpdatePlan)
171167

172168
// Start materialization of all new stages.
@@ -503,6 +499,9 @@ case class AdaptiveSparkPlanExec(
503499
* Notify the listeners of the physical plan change.
504500
*/
505501
private def onUpdatePlan(executionId: Long): Unit = {
502+
if (isSubquery) {
503+
onUpdateAccumulator(collectSQLMetrics(currentPhysicalPlan))
504+
}
506505
context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
507506
executionId,
508507
SQLExecution.getQueryExecution(executionId).toString,

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,10 @@ class SQLAppStatusListener(
348348
val SparkListenerSQLAdaptiveAccumUpdates(executionId, accumIdsToMetricType) = event
349349

350350
val stages = liveExecutions.get(executionId).stages
351-
accumIdsToMetricType.map { case (accumulatorId, metricType) =>
352-
stages.foreach { stageId =>
353-
val liveStageMetric = stageMetrics.get(stageId)
354-
liveStageMetric.accumIdsToMetricType += (accumulatorId -> metricType)
355-
}
351+
stages.foreach { stageId =>
352+
val liveStageMetric = stageMetrics.get(stageId)
353+
stageMetrics.put(stageId, liveStageMetric.copy(
354+
accumIdsToMetricType = liveStageMetric.accumIdsToMetricType ++ accumIdsToMetricType))
356355
}
357356
}
358357

@@ -478,11 +477,11 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
478477

479478
}
480479

481-
private class LiveStageMetrics(
480+
private case class LiveStageMetrics(
482481
val stageId: Int,
483482
val attemptId: Int,
484483
val numTasks: Int,
485-
var accumIdsToMetricType: Map[Long, String]) {
484+
val accumIdsToMetricType: Map[Long, String]) {
486485

487486
/**
488487
* Mapping of task IDs to their respective index. Note this may contain more elements than the

0 commit comments

Comments
 (0)