Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class SQLAppStatusListener(
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
executionData.modifiedConfigs = sqlStoreData.modifiedConfigs
executionData.metrics = sqlStoreData.metrics
executionData.addMetrics(sqlStoreData.metrics)
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.jobs = sqlStoreData.jobs
Expand All @@ -111,7 +111,7 @@ class SQLAppStatusListener(

// Record the accumulator IDs and metric types for the stages of this job, so that the code
// that keeps track of the metrics knows which accumulators to look at.
val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
val accumIdsAndType = exec.metricAccumulatorIdToMetricType
if (accumIdsAndType.nonEmpty) {
event.stageInfos.foreach { stage =>
stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
Expand Down Expand Up @@ -361,7 +361,7 @@ class SQLAppStatusListener(
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
exec.modifiedConfigs = modifiedConfigs
exec.metrics = sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
exec.submissionTime = time
update(exec)
}
Expand All @@ -383,15 +383,15 @@ class SQLAppStatusListener(

val exec = getOrCreateExecution(executionId)
exec.physicalPlanDescription = physicalPlanDescription
exec.metrics ++= sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
update(exec)
}

private def onAdaptiveSQLMetricUpdate(event: SparkListenerSQLAdaptiveSQLMetricUpdates): Unit = {
val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event

val exec = getOrCreateExecution(executionId)
exec.metrics ++= sqlPlanMetrics
exec.addMetrics(sqlPlanMetrics)
update(exec)
}

Expand Down Expand Up @@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var details: String = null
var physicalPlanDescription: String = null
var modifiedConfigs: Map[String, String] = _
var metrics = collection.Seq[SQLPlanMetric]()
private var _metrics = collection.Seq[SQLPlanMetric]()
def metrics: collection.Seq[SQLPlanMetric] = _metrics
// This mapping is shared across all LiveStageMetrics instances associated with
// this LiveExecutionData, helping to reduce memory overhead by avoiding waste
// from separate immutable maps with largely overlapping sets of entries.
val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None
Expand Down Expand Up @@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metricsValues)
}

def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
_metrics ++= newMetrics
newMetrics.foreach { m =>
metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType)
}
}
}

private class LiveStageMetrics(
val stageId: Int,
val attemptId: Int,
val numTasks: Int,
val accumIdsToMetricType: Map[Long, String]) {
val accumIdsToMetricType: mutable.Map[Long, String]) {

/**
* Mapping of task IDs to their respective index. Note this may contain more elements than the
Expand Down