From 6441f0624dfcda9c7193a64bfb416a145b5aabdf Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 2 Nov 2016 14:00:16 -0700 Subject: [PATCH 1/9] Re-use same instance for empty metrics UI data objects. --- .../org/apache/spark/ui/jobs/UIData.scala | 79 ++++++++++++++----- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index f4a04609c4c69..6ce5814cb29cc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.{HashMap, LinkedHashMap} import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor._ import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet @@ -147,9 +147,8 @@ private[spark] object UIData { memoryBytesSpilled = m.memoryBytesSpilled, diskBytesSpilled = m.diskBytesSpilled, peakExecutionMemory = m.peakExecutionMemory, - inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead), - outputMetrics = - OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten), + inputMetrics = InputMetricsUIData(m.inputMetrics), + outputMetrics = OutputMetricsUIData(m.outputMetrics), shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics), shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics)) } @@ -197,8 +196,32 @@ private[spark] object UIData { shuffleWriteMetrics: ShuffleWriteMetricsUIData) case class InputMetricsUIData(bytesRead: Long, recordsRead: Long) + object InputMetricsUIData { + def apply(metrics: InputMetrics): InputMetricsUIData = { + if (metrics.bytesRead == 0 && metrics.recordsRead == 0) { + EMPTY + } else { + new InputMetricsUIData( + bytesRead = metrics.bytesRead, + recordsRead = metrics.recordsRead) + } + } + val EMPTY = InputMetricsUIData(0, 0) + } case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long) + object OutputMetricsUIData { + def apply(metrics: OutputMetrics): OutputMetricsUIData = { + if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) { + EMPTY + } else { + new OutputMetricsUIData( + bytesWritten = metrics.bytesWritten, + recordsWritten = metrics.recordsWritten) + } + } + val EMPTY = OutputMetricsUIData(0, 0) + } case class ShuffleReadMetricsUIData( remoteBlocksFetched: Long, @@ -212,17 +235,30 @@ private[spark] object UIData { object ShuffleReadMetricsUIData { def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = { - new ShuffleReadMetricsUIData( - remoteBlocksFetched = metrics.remoteBlocksFetched, - localBlocksFetched = metrics.localBlocksFetched, - remoteBytesRead = metrics.remoteBytesRead, - localBytesRead = metrics.localBytesRead, - fetchWaitTime = metrics.fetchWaitTime, - recordsRead = metrics.recordsRead, - totalBytesRead = metrics.totalBytesRead, - totalBlocksFetched = metrics.totalBlocksFetched - ) + if ( + metrics.remoteBlocksFetched == 0 && + metrics.localBlocksFetched == 0 && + metrics.remoteBytesRead == 0 && + metrics.localBytesRead == 0 && + metrics.fetchWaitTime == 0 && + metrics.recordsRead == 0 && + metrics.totalBytesRead == 0 && + metrics.totalBlocksFetched == 0) { + EMPTY + } else { + new ShuffleReadMetricsUIData( + remoteBlocksFetched = metrics.remoteBlocksFetched, + localBlocksFetched = metrics.localBlocksFetched, + remoteBytesRead = metrics.remoteBytesRead, + localBytesRead = metrics.localBytesRead, + fetchWaitTime = metrics.fetchWaitTime, + recordsRead = metrics.recordsRead, + totalBytesRead = metrics.totalBytesRead, + totalBlocksFetched = metrics.totalBlocksFetched + ) + } } + val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0) } case class ShuffleWriteMetricsUIData( @@ -232,12 +268,17 @@ private[spark] object UIData { object ShuffleWriteMetricsUIData { def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = { - new ShuffleWriteMetricsUIData( - bytesWritten = metrics.bytesWritten, - recordsWritten = metrics.recordsWritten, - writeTime = metrics.writeTime - ) + if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) { + EMPTY + } else { + new ShuffleWriteMetricsUIData( + bytesWritten = metrics.bytesWritten, + recordsWritten = metrics.recordsWritten, + writeTime = metrics.writeTime + ) + } } + val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0) } } From ade86db901127bf13c0e0bdc3f09c933a093bb76 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 2 Nov 2016 17:12:09 -0700 Subject: [PATCH 2/9] Change TaskInfo.accumulables into an immutable List. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 4 +--- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 7 ++----- .../apache/spark/sql/execution/ui/SQLListenerSuite.scala | 2 +- 7 files changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f2517401cb76b..90f3ff79d3f5a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,7 +1089,7 @@ class DAGScheduler( // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) + event.taskInfo.accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index eeb7963c9e610..d74b3e23fcd28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.collection.mutable.ListBuffer - import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi @@ -54,7 +52,7 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - val accumulables = ListBuffer[AccumulableInfo]() + var accumulables: List[AccumulableInfo] = Nil /** * The time when the task has completed successfully (including the time to remotely fetch diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 6ce5814cb29cc..e6e6440a7dc32 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -170,7 +170,7 @@ private[spark] object UIData { speculative = taskInfo.speculative ) newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo.accumulables ++= taskInfo.accumulables.filter { + newTaskInfo.accumulables = taskInfo.accumulables.filter { accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) } newTaskInfo.finishTime = taskInfo.finishTime diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c11eb3ffa4601..5effca50ad2d7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -713,7 +713,7 @@ private[spark] object JsonProtocol { taskInfo.finishTime = finishTime taskInfo.failed = failed taskInfo.killed = killed - accumulables.foreach { taskInfo.accumulables += _ } + taskInfo.accumulables = accumulables.toList taskInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 8418fa74d2c63..0a22e7e7ac9c5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with internal = false, countFailedValues = false, metadata = None) - taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum) + taskInfo.accumulables = List(internalAccum, sqlAccum, userAccum) val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index d5146d70ebaa3..7aad5c6717b51 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -788,11 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) - val (acc1, acc2, acc3) = - (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) - taskInfo.accumulables += acc1 - taskInfo.accumulables += acc2 - taskInfo.accumulables += acc3 + taskInfo.accumulables = + List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 19b6d2603129c..f6bc3ffa0ed10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) val taskInfo = createTaskInfo(0, 0) - taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo) + taskInfo.accumulables = List(sqlMetricInfo, nonSqlMetricInfo) val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) listener.onOtherEvent(executionStart) listener.onJobStart(jobStart) From 7e05630e9a78c455db8c8c499f0590c864624e05 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 2 Nov 2016 17:27:06 -0700 Subject: [PATCH 3/9] Intern hostname and executor id strings in blockManagerId and taskInfo JSON protocol. --- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5effca50ad2d7..03f19b10616f8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -694,8 +694,8 @@ private[spark] object JsonProtocol { val index = (json \ "Index").extract[Int] val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] - val executorId = (json \ "Executor ID").extract[String] - val host = (json \ "Host").extract[String] + val executorId = (json \ "Executor ID").extract[String].intern() + val host = (json \ "Host").extract[String].intern() val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false) val gettingResultTime = (json \ "Getting Result Time").extract[Long] @@ -885,8 +885,8 @@ private[spark] object JsonProtocol { if (json == JNothing) { return null } - val executorId = (json \ "Executor ID").extract[String] - val host = (json \ "Host").extract[String] + val executorId = (json \ "Executor ID").extract[String].intern() + val host = (json \ "Host").extract[String].intern() val port = (json \ "Port").extract[Int] BlockManagerId(executorId, host, port) } From 738cb5a43bce8b7e49035f3935104db23a6541c6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 2 Nov 2016 19:15:57 -0700 Subject: [PATCH 4/9] Add MiMa exclude. --- project/MimaExcludes.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 350b144f8294b..12f7ed202b9db 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -86,7 +86,10 @@ object MimaExcludes { // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), + + // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") ) } From f8aee5ddf0a01d4ce5133a460e5c6c3cd6e3cefd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Nov 2016 11:04:37 -0700 Subject: [PATCH 5/9] Make EMPTY values private. --- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index e6e6440a7dc32..c10b0385ca26c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -206,7 +206,7 @@ private[spark] object UIData { recordsRead = metrics.recordsRead) } } - val EMPTY = InputMetricsUIData(0, 0) + private val EMPTY = InputMetricsUIData(0, 0) } case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long) @@ -220,7 +220,7 @@ private[spark] object UIData { recordsWritten = metrics.recordsWritten) } } - val EMPTY = OutputMetricsUIData(0, 0) + private val EMPTY = OutputMetricsUIData(0, 0) } case class ShuffleReadMetricsUIData( @@ -258,7 +258,7 @@ private[spark] object UIData { ) } } - val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0) + private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0) } case class ShuffleWriteMetricsUIData( @@ -278,7 +278,7 @@ private[spark] object UIData { ) } } - val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0) + private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0) } } From 4c867f10ff5d8d80c7e8d44b4702785c0a017b64 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 3 Nov 2016 13:58:50 -0700 Subject: [PATCH 6/9] Avoid binary compatibility change. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 9 ++++++++- .../apache/spark/status/api/v1/AllStagesResource.scala | 2 +- .../org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 4 ++-- .../org/apache/spark/InternalAccumulatorSuite.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 4 ++-- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- project/MimaExcludes.scala | 5 +---- .../org/apache/spark/sql/execution/ui/SQLListener.scala | 2 +- .../apache/spark/sql/execution/ui/SQLListenerSuite.scala | 2 +- 13 files changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 90f3ff79d3f5a..306b090176ebd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,7 +1089,7 @@ class DAGScheduler( // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo.accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) + event.taskInfo._accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index d74b3e23fcd28..7f52f05e5b5f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.collection.mutable.ListBuffer + import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi @@ -52,7 +54,12 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - var accumulables: List[AccumulableInfo] = Nil + lazy val accumulables: ListBuffer[AccumulableInfo] = ListBuffer(_accumulables: _*) + + // Note: all internal uses should use _accumulables; the above `accumulables` is only exposed + // for backwards-compatibility purposes for non-Spark-users of this class. + // See SPARK-18236 for discussion. + private[spark] var _accumulables: List[AccumulableInfo] = Nil /** * The time when the task has completed successfully (including the time to remotely fetch diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index acb7c23079681..a09e11f6b958e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -146,7 +146,7 @@ private[v1] object AllStagesResource { host = uiData.taskInfo.host, taskLocality = uiData.taskInfo.taskLocality.toString(), speculative = uiData.taskInfo.speculative, - accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, + accumulatorUpdates = uiData.taskInfo._accumulables.map { convertAccumulableInfo }, errorMessage = uiData.errorMessage, taskMetrics = uiData.metrics.map { convertUiTaskMetrics } ) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d874589e..45ce175df3cb9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -361,7 +361,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) - for (accumulableInfo <- info.accumulables) { + for (accumulableInfo <- info._accumulables) { stageData.accumulables(accumulableInfo.id) = accumulableInfo } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8c7cefe200739..69f3e0f353442 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -905,7 +905,7 @@ private[ui] class TaskDataSource( val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) val gettingResultTime = getGettingResultTime(info, currentTime) - val externalAccumulableReadable = info.accumulables + val externalAccumulableReadable = info._accumulables .filterNot(_.internal) .flatMap { a => (a.name, a.update) match { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index c10b0385ca26c..6e4f925c24803 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -170,7 +170,7 @@ private[spark] object UIData { speculative = taskInfo.speculative ) newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo.accumulables = taskInfo.accumulables.filter { + newTaskInfo._accumulables = taskInfo._accumulables.filter { accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) } newTaskInfo.finishTime = taskInfo.finishTime diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 03f19b10616f8..0e83d79d44657 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -281,7 +281,7 @@ private[spark] object JsonProtocol { ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ ("Killed" -> taskInfo.killed) ~ - ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson))) + ("Accumulables" -> JArray(taskInfo._accumulables.toList.map(accumulableInfoToJson))) } def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { @@ -713,7 +713,7 @@ private[spark] object JsonProtocol { taskInfo.finishTime = finishTime taskInfo.failed = failed taskInfo.killed = killed - taskInfo.accumulables = accumulables.toList + taskInfo._accumulables = accumulables.toList taskInfo } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 840f55ce2f6e5..195d429dea9b0 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -66,7 +66,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { assert(stageAccum.value.get.asInstanceOf[Long] === numPartitions) // The accumulator should be updated locally on each task val taskAccumValues = taskInfos.map { taskInfo => - val taskAccum = findTestAccum(taskInfo.accumulables) + val taskAccum = findTestAccum(taskInfo._accumulables) assert(taskAccum.update.isDefined) assert(taskAccum.update.get.asInstanceOf[Long] === 1L) taskAccum.value.get.asInstanceOf[Long] diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 0a22e7e7ac9c5..8843038e4c74a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -403,9 +403,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with internal = false, countFailedValues = false, metadata = None) - taskInfo.accumulables = List(internalAccum, sqlAccum, userAccum) + taskInfo._accumulables = List(internalAccum, sqlAccum, userAccum) val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) - assert(newTaskInfo.accumulables === Seq(userAccum)) + assert(newTaskInfo._accumulables === Seq(userAccum)) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7aad5c6717b51..33f155b7632b1 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -596,7 +596,7 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) - assert(info1.accumulables === info2.accumulables) + assert(info1._accumulables === info2._accumulables) } private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { @@ -788,7 +788,7 @@ private[spark] object JsonProtocolSuite extends Assertions { private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) - taskInfo.accumulables = + taskInfo._accumulables = List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 12f7ed202b9db..350b144f8294b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -86,10 +86,7 @@ object MimaExcludes { // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), - - // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 60f13432d78d2..1d5b504de1b7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -347,7 +347,7 @@ class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskInfo.accumulables.flatMap { a => + taskEnd.taskInfo._accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index f6bc3ffa0ed10..c0084543622ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) val taskInfo = createTaskInfo(0, 0) - taskInfo.accumulables = List(sqlMetricInfo, nonSqlMetricInfo) + taskInfo._accumulables = List(sqlMetricInfo, nonSqlMetricInfo) val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) listener.onOtherEvent(executionStart) listener.onJobStart(jobStart) From 383824361685de9b8b922413c03df4aa4125197d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 7 Nov 2016 09:43:38 -0800 Subject: [PATCH 7/9] Revert "Avoid binary compatibility change." This reverts commit 4c867f10ff5d8d80c7e8d44b4702785c0a017b64. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 9 +-------- .../apache/spark/status/api/v1/AllStagesResource.scala | 2 +- .../org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- .../main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 2 +- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 4 ++-- .../org/apache/spark/InternalAccumulatorSuite.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 4 ++-- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- project/MimaExcludes.scala | 5 ++++- .../org/apache/spark/sql/execution/ui/SQLListener.scala | 2 +- .../apache/spark/sql/execution/ui/SQLListenerSuite.scala | 2 +- 13 files changed, 19 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 306b090176ebd..90f3ff79d3f5a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,7 +1089,7 @@ class DAGScheduler( // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo._accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) + event.taskInfo.accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 7f52f05e5b5f3..d74b3e23fcd28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.collection.mutable.ListBuffer - import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi @@ -54,12 +52,7 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - lazy val accumulables: ListBuffer[AccumulableInfo] = ListBuffer(_accumulables: _*) - - // Note: all internal uses should use _accumulables; the above `accumulables` is only exposed - // for backwards-compatibility purposes for non-Spark-users of this class. - // See SPARK-18236 for discussion. - private[spark] var _accumulables: List[AccumulableInfo] = Nil + var accumulables: List[AccumulableInfo] = Nil /** * The time when the task has completed successfully (including the time to remotely fetch diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index a09e11f6b958e..acb7c23079681 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -146,7 +146,7 @@ private[v1] object AllStagesResource { host = uiData.taskInfo.host, taskLocality = uiData.taskInfo.taskLocality.toString(), speculative = uiData.taskInfo.speculative, - accumulatorUpdates = uiData.taskInfo._accumulables.map { convertAccumulableInfo }, + accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, errorMessage = uiData.errorMessage, taskMetrics = uiData.metrics.map { convertUiTaskMetrics } ) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 45ce175df3cb9..83dc5d874589e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -361,7 +361,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) - for (accumulableInfo <- info._accumulables) { + for (accumulableInfo <- info.accumulables) { stageData.accumulables(accumulableInfo.id) = accumulableInfo } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 69f3e0f353442..8c7cefe200739 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -905,7 +905,7 @@ private[ui] class TaskDataSource( val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) val gettingResultTime = getGettingResultTime(info, currentTime) - val externalAccumulableReadable = info._accumulables + val externalAccumulableReadable = info.accumulables .filterNot(_.internal) .flatMap { a => (a.name, a.update) match { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 6e4f925c24803..c10b0385ca26c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -170,7 +170,7 @@ private[spark] object UIData { speculative = taskInfo.speculative ) newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo._accumulables = taskInfo._accumulables.filter { + newTaskInfo.accumulables = taskInfo.accumulables.filter { accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) } newTaskInfo.finishTime = taskInfo.finishTime diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 0e83d79d44657..03f19b10616f8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -281,7 +281,7 @@ private[spark] object JsonProtocol { ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ ("Killed" -> taskInfo.killed) ~ - ("Accumulables" -> JArray(taskInfo._accumulables.toList.map(accumulableInfoToJson))) + ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson))) } def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { @@ -713,7 +713,7 @@ private[spark] object JsonProtocol { taskInfo.finishTime = finishTime taskInfo.failed = failed taskInfo.killed = killed - taskInfo._accumulables = accumulables.toList + taskInfo.accumulables = accumulables.toList taskInfo } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 195d429dea9b0..840f55ce2f6e5 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -66,7 +66,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { assert(stageAccum.value.get.asInstanceOf[Long] === numPartitions) // The accumulator should be updated locally on each task val taskAccumValues = taskInfos.map { taskInfo => - val taskAccum = findTestAccum(taskInfo._accumulables) + val taskAccum = findTestAccum(taskInfo.accumulables) assert(taskAccum.update.isDefined) assert(taskAccum.update.get.asInstanceOf[Long] === 1L) taskAccum.value.get.asInstanceOf[Long] diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 8843038e4c74a..0a22e7e7ac9c5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -403,9 +403,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with internal = false, countFailedValues = false, metadata = None) - taskInfo._accumulables = List(internalAccum, sqlAccum, userAccum) + taskInfo.accumulables = List(internalAccum, sqlAccum, userAccum) val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) - assert(newTaskInfo._accumulables === Seq(userAccum)) + assert(newTaskInfo.accumulables === Seq(userAccum)) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 33f155b7632b1..7aad5c6717b51 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -596,7 +596,7 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) - assert(info1._accumulables === info2._accumulables) + assert(info1.accumulables === info2.accumulables) } private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { @@ -788,7 +788,7 @@ private[spark] object JsonProtocolSuite extends Assertions { private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) - taskInfo._accumulables = + taskInfo.accumulables = List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 350b144f8294b..12f7ed202b9db 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -86,7 +86,10 @@ object MimaExcludes { // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), + + // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 1d5b504de1b7f..60f13432d78d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -347,7 +347,7 @@ class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.taskId, taskEnd.stageId, taskEnd.stageAttemptId, - taskEnd.taskInfo._accumulables.flatMap { a => + taskEnd.taskInfo.accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's if (a.metadata == Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index c0084543622ae..f6bc3ffa0ed10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) val taskInfo = createTaskInfo(0, 0) - taskInfo._accumulables = List(sqlMetricInfo, nonSqlMetricInfo) + taskInfo.accumulables = List(sqlMetricInfo, nonSqlMetricInfo) val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) listener.onOtherEvent(executionStart) listener.onJobStart(jobStart) From 4c7067e740a35a5cd3ee8fa1ff2b23289110917a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 7 Nov 2016 10:43:59 -0800 Subject: [PATCH 8/9] Accept binary compatibility break and expose nicer interface. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- .../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 8 +++++++- core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 4 ++-- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 90f3ff79d3f5a..7fde34d8974c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,7 +1089,8 @@ class DAGScheduler( // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo.accumulables ::= acc.toInfo(Some(updates.value), Some(acc.value)) + event.taskInfo.setAccumulables( + acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index d74b3e23fcd28..59680139e7af3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -52,7 +52,13 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - var accumulables: List[AccumulableInfo] = Nil + def accumulables: Seq[AccumulableInfo] = _accumulables + + private[this] var _accumulables: Seq[AccumulableInfo] = Nil + + private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = { + _accumulables = newAccumulables + } /** * The time when the task has completed successfully (including the time to remotely fetch diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index c10b0385ca26c..9ce8542f02791 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -170,9 +170,9 @@ private[spark] object UIData { speculative = taskInfo.speculative ) newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo.accumulables = taskInfo.accumulables.filter { + newTaskInfo.setAccumulables(taskInfo.accumulables.filter { accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) - } + }) newTaskInfo.finishTime = taskInfo.finishTime newTaskInfo.failed = taskInfo.failed newTaskInfo diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 03f19b10616f8..b94a6d699e662 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -713,7 +713,7 @@ private[spark] object JsonProtocol { taskInfo.finishTime = finishTime taskInfo.failed = failed taskInfo.killed = killed - taskInfo.accumulables = accumulables.toList + taskInfo.setAccumulables(accumulables) taskInfo } From 96621633534e84eb8e9a33c4e7afbec62b776430 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 7 Nov 2016 11:12:18 -0800 Subject: [PATCH 9/9] Fix test compilation. --- .../org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- .../org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 0a22e7e7ac9c5..da853f1be8b95 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with internal = false, countFailedValues = false, metadata = None) - taskInfo.accumulables = List(internalAccum, sqlAccum, userAccum) + taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum)) val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7aad5c6717b51..85da79180fd0b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -788,8 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) - taskInfo.accumulables = - List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) + taskInfo.setAccumulables( + List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))) taskInfo } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index f6bc3ffa0ed10..948a155457b65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) val taskInfo = createTaskInfo(0, 0) - taskInfo.accumulables = List(sqlMetricInfo, nonSqlMetricInfo) + taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo)) val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) listener.onOtherEvent(executionStart) listener.onJobStart(jobStart)