Skip to content

Commit ad10d28

Browse files
committed
Address code review comments, change event logging to stage end.
1 parent 5d6ae1c commit ad10d28

20 files changed

+642
-452
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package org.apache.spark.executor
1919

2020
import java.io.{File, NotSerializableException}
2121
import java.lang.Thread.UncaughtExceptionHandler
22-
import java.lang.management.ManagementFactory
22+
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
2323
import java.net.{URI, URL}
2424
import java.nio.ByteBuffer
2525
import java.util.Properties
2626
import java.util.concurrent._
2727
import javax.annotation.concurrent.GuardedBy
28+
import javax.management.ObjectName
2829

2930
import scala.collection.JavaConverters._
3031
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
@@ -36,7 +37,7 @@ import org.apache.spark._
3637
import org.apache.spark.deploy.SparkHadoopUtil
3738
import org.apache.spark.internal.Logging
3839
import org.apache.spark.internal.config._
39-
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
40+
import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager}
4041
import org.apache.spark.rpc.RpcTimeout
4142
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
4243
import org.apache.spark.shuffle.FetchFailedException
@@ -71,6 +72,12 @@ private[spark] class Executor(
7172

7273
private val conf = env.conf
7374

75+
// BufferPoolMXBean for direct memory
76+
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)
77+
78+
// BufferPoolMXBean for mapped memory
79+
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)
80+
7481
// No ip or host:port - just hostname
7582
Utils.checkHost(executorHostname)
7683
// must not have port specified.
@@ -773,10 +780,8 @@ private[spark] class Executor(
773780
val curGCTime = computeTotalGcTime()
774781

775782
// get executor level memory metrics
776-
val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
777-
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
778-
env.memoryManager.onHeapExecutionMemoryUsed, env.memoryManager.offHeapExecutionMemoryUsed,
779-
env.memoryManager.onHeapStorageMemoryUsed, env.memoryManager.offHeapStorageMemoryUsed)
783+
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager,
784+
directBufferPool, mappedBufferPool)
780785

781786
for (taskRunner <- runningTasks.values().asScala) {
782787
if (taskRunner.task != null) {
@@ -814,4 +819,43 @@ private[spark] object Executor {
814819
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
815820
// used instead.
816821
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]
822+
823+
val DIRECT_BUFFER_POOL_NAME = "direct"
824+
val MAPPED_BUFFER_POOL_NAME = "mapped"
825+
826+
/** Get the BufferPoolMXBean for the specified buffer pool. */
827+
def getBufferPool(pool: String): BufferPoolMXBean = {
828+
val name = new ObjectName("java.nio:type=BufferPool,name=" + pool)
829+
ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
830+
name.toString, classOf[BufferPoolMXBean])
831+
}
832+
833+
/**
834+
* Get the current executor level memory metrics.
835+
*
836+
* @param memoryManager the memory manager
837+
* @param direct the direct memory buffer pool
838+
* @param mapped the mapped memory buffer pool
839+
* @return the executor memory metrics
840+
*/
841+
def getCurrentExecutorMetrics(
842+
memoryManager: MemoryManager,
843+
direct: BufferPoolMXBean,
844+
mapped: BufferPoolMXBean) : ExecutorMetrics = {
845+
val onHeapExecutionMemoryUsed = memoryManager.onHeapExecutionMemoryUsed
846+
val offHeapExecutionMemoryUsed = memoryManager.offHeapExecutionMemoryUsed
847+
val onHeapStorageMemoryUsed = memoryManager.onHeapStorageMemoryUsed
848+
val offHeapStorageMemoryUsed = memoryManager.offHeapStorageMemoryUsed
849+
new ExecutorMetrics(System.currentTimeMillis(),
850+
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
851+
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(),
852+
onHeapExecutionMemoryUsed,
853+
offHeapExecutionMemoryUsed,
854+
onHeapStorageMemoryUsed,
855+
offHeapStorageMemoryUsed,
856+
onHeapExecutionMemoryUsed + onHeapStorageMemoryUsed, // on heap unified memory
857+
offHeapExecutionMemoryUsed + offHeapStorageMemoryUsed, // off heap unified memory
858+
direct.getMemoryUsed,
859+
mapped.getMemoryUsed)
860+
}
817861
}

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,29 @@ import org.apache.spark.annotation.DeveloperApi
2626
* This is sent to the driver periodically (on executor heartbeat), to provide
2727
* information about each executor's metrics.
2828
*
29-
* @param timestamp the time the metrics were collected
30-
* @param jvmUsedMemory the amount of JVM used memory for the executor
29+
* @param timestamp the time the metrics were collected, or -1 for Spark history
30+
* log events which are logged when a stage has completed
31+
* @param jvmUsedHeapMemory the amount of JVM used heap memory for the executor
32+
* @param jvmUsedNonHeapMemory the amount of JVM used non-heap memory for the executor
3133
* @param onHeapExecutionMemory the amount of on heap execution memory used
3234
* @param offHeapExecutionMemory the amount of off heap execution memory used
3335
* @param onHeapStorageMemory the amount of on heap storage memory used
3436
* @param offHeapStorageMemory the amount of off heap storage memory used
37+
* @param onHeapUnifiedMemory the amount of on heap unified region memory used
38+
* @param offHeapUnifiedMemory the amount of off heap unified region memory used
39+
* @param directMemory the amount of direct memory used
40+
* @param mappedMemory the amount of mapped memory used
3541
*/
3642
@DeveloperApi
3743
class ExecutorMetrics private[spark] (
3844
val timestamp: Long,
39-
val jvmUsedMemory: Long,
45+
val jvmUsedHeapMemory: Long,
46+
val jvmUsedNonHeapMemory: Long,
4047
val onHeapExecutionMemory: Long,
4148
val offHeapExecutionMemory: Long,
4249
val onHeapStorageMemory: Long,
43-
val offHeapStorageMemory: Long) extends Serializable
50+
val offHeapStorageMemory: Long,
51+
val onHeapUnifiedMemory: Long,
52+
val offHeapUnifiedMemory: Long,
53+
val directMemory: Long,
54+
val mappedMemory: Long) extends Serializable

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
3535

3636
import org.apache.spark._
3737
import org.apache.spark.broadcast.Broadcast
38-
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
38+
import org.apache.spark.executor.{Executor, ExecutorMetrics, TaskMetrics}
3939
import org.apache.spark.internal.Logging
4040
import org.apache.spark.internal.config
4141
import org.apache.spark.network.util.JavaUtils
@@ -214,6 +214,12 @@ class DAGScheduler(
214214
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat,
215215
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
216216

217+
/** BufferPoolMXBean for direct memory */
218+
private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME)
219+
220+
/** BufferPoolMXBean for mapped memory */
221+
private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME)
222+
217223
/**
218224
* Called by the TaskSetManager to report task's starting.
219225
*/
@@ -1766,12 +1772,8 @@ class DAGScheduler(
17661772
/** Reports heartbeat metrics for the driver. */
17671773
private def reportHeartBeat(): Unit = {
17681774
// get driver memory metrics
1769-
val driverUpdates = new ExecutorMetrics(System.currentTimeMillis(),
1770-
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
1771-
sc.env.memoryManager.onHeapExecutionMemoryUsed,
1772-
sc.env.memoryManager.offHeapExecutionMemoryUsed,
1773-
sc.env.memoryManager.onHeapStorageMemoryUsed,
1774-
sc.env.memoryManager.offHeapStorageMemoryUsed)
1775+
val driverUpdates = Executor.getCurrentExecutorMetrics(
1776+
sc.env.memoryManager, directBufferPool, mappedBufferPool)
17751777
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
17761778
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
17771779
Some(driverUpdates)))

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ private[spark] class EventLoggingListener(
9494
// Visible for tests only.
9595
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
9696

97-
// Peak metric values for each executor
98-
private var peakExecutorMetrics = new mutable.HashMap[String, PeakExecutorMetrics]()
97+
// map of live stages, to peak executor metrics for the stage
98+
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
99+
mutable.HashMap[String, PeakExecutorMetrics]]()
99100

100101
/**
101102
* Creates the log file in the configured log directory.
@@ -162,7 +163,8 @@ private[spark] class EventLoggingListener(
162163
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
163164
logEvent(event)
164165
// clear the peak metrics when a new stage starts
165-
peakExecutorMetrics.values.foreach(_.reset())
166+
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
167+
new mutable.HashMap[String, PeakExecutorMetrics]())
166168
}
167169

168170
override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
@@ -177,6 +179,27 @@ private[spark] class EventLoggingListener(
177179

178180
// Events that trigger a flush
179181
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
182+
// log the peak executor metrics for the stage, for each executor
183+
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
184+
val executorMap = liveStageExecutorMetrics.remove(
185+
(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
186+
executorMap.foreach {
187+
executorEntry => {
188+
for ((executorId, peakExecutorMetrics) <- executorEntry) {
189+
val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory,
190+
peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory,
191+
peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory,
192+
peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory,
193+
peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory,
194+
peakExecutorMetrics.mappedMemory)
195+
val executorUpdate = new SparkListenerExecutorMetricsUpdate(
196+
executorId, accumUpdates, Some(executorMetrics))
197+
logEvent(executorUpdate)
198+
}
199+
}
200+
}
201+
202+
// log stage completed event
180203
logEvent(event, flushLogger = true)
181204
}
182205

@@ -205,12 +228,10 @@ private[spark] class EventLoggingListener(
205228
}
206229
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
207230
logEvent(event, flushLogger = true)
208-
peakExecutorMetrics.put(event.executorId, new PeakExecutorMetrics())
209231
}
210232

211233
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
212234
logEvent(event, flushLogger = true)
213-
peakExecutorMetrics.remove(event.executorId)
214235
}
215236

216237
override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
@@ -244,19 +265,13 @@ private[spark] class EventLoggingListener(
244265
}
245266
}
246267

247-
/**
248-
* Log if there is a new peak value for one of the memory metrics for the given executor.
249-
* Metrics are cleared out when a new stage is started in onStageSubmitted, so this will
250-
* log new peak memory metric values per executor per stage.
251-
*/
252268
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
253-
var log: Boolean = false
269+
// For the active stages, record any new peak values for the memory metrics for the executor
254270
event.executorUpdates.foreach { executorUpdates =>
255-
val peakMetrics = peakExecutorMetrics.getOrElseUpdate(event.execId, new PeakExecutorMetrics())
256-
if (peakMetrics.compareAndUpdate(executorUpdates)) {
257-
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
258-
logEvent(new SparkListenerExecutorMetricsUpdate(event.execId, accumUpdates,
259-
event.executorUpdates), flushLogger = true)
271+
liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
272+
val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
273+
event.execId, new PeakExecutorMetrics())
274+
peakMetrics.compareAndUpdate(executorUpdates)
260275
}
261276
}
262277
}

core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,40 @@ import org.apache.spark.executor.ExecutorMetrics
2121
import org.apache.spark.status.api.v1.PeakMemoryMetrics
2222

2323
/**
24-
* Records the peak values for executor level metrics. If jvmUsedMemory is -1, then no values have
25-
* been recorded yet.
24+
* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no
25+
* values have been recorded yet.
2626
*/
2727
private[spark] class PeakExecutorMetrics {
28-
private var jvmUsedMemory = -1L;
29-
private var onHeapExecutionMemory = 0L
30-
private var offHeapExecutionMemory = 0L
31-
private var onHeapStorageMemory = 0L
32-
private var offHeapStorageMemory = 0L
33-
private var onHeapUnifiedMemory = 0L
34-
private var offHeapUnifiedMemory = 0L
28+
private var _jvmUsedHeapMemory = -1L;
29+
private var _jvmUsedNonHeapMemory = 0L;
30+
private var _onHeapExecutionMemory = 0L
31+
private var _offHeapExecutionMemory = 0L
32+
private var _onHeapStorageMemory = 0L
33+
private var _offHeapStorageMemory = 0L
34+
private var _onHeapUnifiedMemory = 0L
35+
private var _offHeapUnifiedMemory = 0L
36+
private var _directMemory = 0L
37+
private var _mappedMemory = 0L
38+
39+
def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
40+
41+
def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
42+
43+
def onHeapExecutionMemory: Long = _onHeapExecutionMemory
44+
45+
def offHeapExecutionMemory: Long = _offHeapExecutionMemory
46+
47+
def onHeapStorageMemory: Long = _onHeapStorageMemory
48+
49+
def offHeapStorageMemory: Long = _offHeapStorageMemory
50+
51+
def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
52+
53+
def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
54+
55+
def directMemory: Long = _directMemory
56+
57+
def mappedMemory: Long = _mappedMemory
3558

3659
/**
3760
* Compare the specified memory values with the saved peak executor memory
@@ -43,36 +66,44 @@ private[spark] class PeakExecutorMetrics {
4366
def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
4467
var updated: Boolean = false
4568

46-
if (executorMetrics.jvmUsedMemory > jvmUsedMemory) {
47-
jvmUsedMemory = executorMetrics.jvmUsedMemory
69+
if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
70+
_jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
71+
updated = true
72+
}
73+
if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
74+
_jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
75+
updated = true
76+
}
77+
if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
78+
_onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
4879
updated = true
4980
}
50-
if (executorMetrics.onHeapExecutionMemory > onHeapExecutionMemory) {
51-
onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
81+
if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
82+
_offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
5283
updated = true
5384
}
54-
if (executorMetrics.offHeapExecutionMemory > offHeapExecutionMemory) {
55-
offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
85+
if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
86+
_onHeapStorageMemory = executorMetrics.onHeapStorageMemory
5687
updated = true
5788
}
58-
if (executorMetrics.onHeapStorageMemory > onHeapStorageMemory) {
59-
onHeapStorageMemory = executorMetrics.onHeapStorageMemory
89+
if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
90+
_offHeapStorageMemory = executorMetrics.offHeapStorageMemory
6091
updated = true
6192
}
62-
if (executorMetrics.offHeapStorageMemory > offHeapStorageMemory) {
63-
offHeapStorageMemory = executorMetrics.offHeapStorageMemory
93+
if (executorMetrics.onHeapUnifiedMemory > _onHeapUnifiedMemory) {
94+
_onHeapUnifiedMemory = executorMetrics.onHeapUnifiedMemory
6495
updated = true
6596
}
66-
val newOnHeapUnifiedMemory = (executorMetrics.onHeapExecutionMemory +
67-
executorMetrics.onHeapStorageMemory)
68-
if (newOnHeapUnifiedMemory > onHeapUnifiedMemory) {
69-
onHeapUnifiedMemory = newOnHeapUnifiedMemory
97+
if (executorMetrics.offHeapUnifiedMemory > _offHeapUnifiedMemory) {
98+
_offHeapUnifiedMemory = executorMetrics.offHeapUnifiedMemory
7099
updated = true
71100
}
72-
val newOffHeapUnifiedMemory = (executorMetrics.offHeapExecutionMemory +
73-
executorMetrics.offHeapStorageMemory)
74-
if ( newOffHeapUnifiedMemory > offHeapUnifiedMemory) {
75-
offHeapUnifiedMemory = newOffHeapUnifiedMemory
101+
if (executorMetrics.directMemory > _directMemory) {
102+
_directMemory = executorMetrics.directMemory
103+
updated = true
104+
}
105+
if (executorMetrics.mappedMemory > _mappedMemory) {
106+
_mappedMemory = executorMetrics.mappedMemory
76107
updated = true
77108
}
78109

@@ -84,23 +115,13 @@ private[spark] class PeakExecutorMetrics {
84115
* values set.
85116
*/
86117
def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = {
87-
if (jvmUsedMemory < 0) {
118+
if (_jvmUsedHeapMemory < 0) {
88119
None
89120
} else {
90-
Some(new PeakMemoryMetrics(jvmUsedMemory, onHeapExecutionMemory,
91-
offHeapExecutionMemory, onHeapStorageMemory, offHeapStorageMemory,
92-
onHeapUnifiedMemory, offHeapUnifiedMemory))
121+
Some(new PeakMemoryMetrics(_jvmUsedHeapMemory, _jvmUsedNonHeapMemory,
122+
_onHeapExecutionMemory, _offHeapExecutionMemory, _onHeapStorageMemory,
123+
_offHeapStorageMemory, _onHeapUnifiedMemory, _offHeapUnifiedMemory,
124+
_directMemory, _mappedMemory))
93125
}
94126
}
95-
96-
/** Clears/resets the saved peak values. */
97-
def reset(): Unit = {
98-
jvmUsedMemory = -1L;
99-
onHeapExecutionMemory = 0L
100-
offHeapExecutionMemory = 0L
101-
onHeapStorageMemory = 0L
102-
offHeapStorageMemory = 0L
103-
onHeapUnifiedMemory = 0L
104-
offHeapUnifiedMemory = 0L
105-
}
106127
}

0 commit comments

Comments
 (0)