Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,8 @@ class DAGScheduler(
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && !updates.isZero) {
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
event.taskInfo.setAccumulables(
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
}
}
} catch {
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.scheduler

import scala.collection.mutable.ListBuffer

import org.apache.spark.TaskState
import org.apache.spark.TaskState.TaskState
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -54,7 +52,13 @@ class TaskInfo(
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()
def accumulables: Seq[AccumulableInfo] = _accumulables

private[this] var _accumulables: Seq[AccumulableInfo] = Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to maintain binary compatibility, I could rewrite this to be a lazy val that returns a ListBuffer formed from the "real" accumulables which can remain private. I might go ahead and do that just to avoid any chance of incompatibility-related problems, although I don't anticipate this being an issue in practice.


private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = {
_accumulables = newAccumulables
}

/**
* The time when the task has completed successfully (including the time to remotely fetch
Expand Down
83 changes: 62 additions & 21 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.{HashMap, LinkedHashMap}

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.executor._
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
Expand Down Expand Up @@ -147,9 +147,8 @@ private[spark] object UIData {
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
outputMetrics =
OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
Expand All @@ -171,9 +170,9 @@ private[spark] object UIData {
speculative = taskInfo.speculative
)
newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
newTaskInfo.setAccumulables(taskInfo.accumulables.filter {
accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
}
})
newTaskInfo.finishTime = taskInfo.finishTime
newTaskInfo.failed = taskInfo.failed
newTaskInfo
Expand All @@ -197,8 +196,32 @@ private[spark] object UIData {
shuffleWriteMetrics: ShuffleWriteMetricsUIData)

case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
EMPTY
} else {
new InputMetricsUIData(
bytesRead = metrics.bytesRead,
recordsRead = metrics.recordsRead)
}
}
private val EMPTY = InputMetricsUIData(0, 0)
}

case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
object OutputMetricsUIData {
def apply(metrics: OutputMetrics): OutputMetricsUIData = {
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the else block is more common ?
If yes, would be good to invert the condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For OutputMetrics, I'd actually assume the opposite: these metrics are referring to bytes written to an external system, not bytes written to shuffle, so the majority of tasks won't have non-zero values for this metric (all but the last stage in a multi-stage job, for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

EMPTY
} else {
new OutputMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten)
}
}
private val EMPTY = OutputMetricsUIData(0, 0)
}

case class ShuffleReadMetricsUIData(
remoteBlocksFetched: Long,
Expand All @@ -212,17 +235,30 @@ private[spark] object UIData {

object ShuffleReadMetricsUIData {
def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
new ShuffleReadMetricsUIData(
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
totalBytesRead = metrics.totalBytesRead,
totalBlocksFetched = metrics.totalBlocksFetched
)
if (
metrics.remoteBlocksFetched == 0 &&
metrics.localBlocksFetched == 0 &&
metrics.remoteBytesRead == 0 &&
metrics.localBytesRead == 0 &&
metrics.fetchWaitTime == 0 &&
metrics.recordsRead == 0 &&
metrics.totalBytesRead == 0 &&
metrics.totalBlocksFetched == 0) {
EMPTY
} else {
new ShuffleReadMetricsUIData(
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
totalBytesRead = metrics.totalBytesRead,
totalBlocksFetched = metrics.totalBlocksFetched
)
}
}
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
}

case class ShuffleWriteMetricsUIData(
Expand All @@ -232,12 +268,17 @@ private[spark] object UIData {

object ShuffleWriteMetricsUIData {
def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
new ShuffleWriteMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten,
writeTime = metrics.writeTime
)
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) {
EMPTY
} else {
new ShuffleWriteMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten,
writeTime = metrics.writeTime
)
}
}
private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
}

}
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,8 @@ private[spark] object JsonProtocol {
val index = (json \ "Index").extract[Int]
val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val executorId = (json \ "Executor ID").extract[String].intern()
val host = (json \ "Host").extract[String].intern()
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
Expand All @@ -713,7 +713,7 @@ private[spark] object JsonProtocol {
taskInfo.finishTime = finishTime
taskInfo.failed = failed
taskInfo.killed = killed
accumulables.foreach { taskInfo.accumulables += _ }
taskInfo.setAccumulables(accumulables)
taskInfo
}

Expand Down Expand Up @@ -885,8 +885,8 @@ private[spark] object JsonProtocol {
if (json == JNothing) {
return null
}
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val executorId = (json \ "Executor ID").extract[String].intern()
val host = (json \ "Host").extract[String].intern()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general intern'ing can be dangerous.
I dont expect issues with host, but executorId (for long running jobs) can essentially OOM the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scratch that, jdk7 improvements help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading http://java-performance.info/string-intern-in-java-6-7-8/ it seems significantly safer in Java 7 (I also had the impression from tribal lore that intern() had been a bad idea in older JVMs) but not necessarily great for performance if you're going to intern hundreds of thousands of strings. But if this is a pretty targeted use for a hotspot, seems OK.

val port = (json \ "Port").extract[Int]
BlockManagerId(executorId, host, port)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
internal = false,
countFailedValues = false,
metadata = None)
taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum))

val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
speculative)
val (acc1, acc2, acc3) =
(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
taskInfo.accumulables += acc1
taskInfo.accumulables += acc2
taskInfo.accumulables += acc3
taskInfo.setAccumulables(
List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)))
taskInfo
}

Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ object MimaExcludes {
// [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess mima still fails to ignore @DeveloperApi changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's intentional; we removed the logic for excluding that annotation in #11751. The rationale is discussed in https://issues.apache.org/jira/browse/SPARK-13920:

Our MIMA binary compatibility checks currently ignore APIs which are marked as @Experimentalor @DeveloperApi, but I don't think this makes sense. Even if those annotations reserve the right to break binary compatibility, we should still avoid compatibility breaks whenever possible and should be informed explicitly when compatibility breaks.

)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
val taskInfo = createTaskInfo(0, 0)
taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
listener.onOtherEvent(executionStart)
listener.onJobStart(jobStart)
Expand Down