Skip to content

Commit 245221d

Browse files
author
Reza Safi
committed
[SPARK-24958] Add executors' process tree total memory information to heartbeat signals.
Spark executors' process tree total memory information can be really useful. Currently such information are not available. The goal of this PR is to compute such information for each executor, add these information to the heartbeat signals, and compute the peaks at the driver. This PR is tested by running the current unit tests and also some added ones. I have also tested this on our internal cluster and have verified the results.
1 parent 062f5d7 commit 245221d

19 files changed

+537
-190
lines changed

core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala

Lines changed: 90 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,38 @@ import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
2626
import scala.collection.mutable.Queue
2727

28-
import org.apache.spark.internal.Logging
28+
import org.apache.spark.SparkEnv
29+
import org.apache.spark.internal.{config, Logging}
2930

31+
private[spark] case class ProcfsBasedSystemsMetrics(jvmVmemTotal: Long,
32+
jvmRSSTotal: Long,
33+
pythonVmemTotal: Long,
34+
pythonRSSTotal: Long,
35+
otherVmemTotal: Long,
36+
otherRSSTotal: Long)
3037

3138
// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
3239
// project.
33-
class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
34-
val procfsDir = "/proc/"
40+
private[spark] class ProcfsBasedSystems extends Logging {
41+
var procfsDir = "/proc/"
42+
val procfsStatFile = "stat"
43+
var pageSize = 0
3544
var isAvailable: Boolean = isItProcfsBased
36-
val pid: Int = computePid()
37-
val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
45+
private val pid: Int = computePid()
46+
private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
3847
scala.collection.mutable.Map[ Int, Set[Int]]()
39-
val PROCFS_STAT_FILE = "stat"
40-
var latestJVMVmemTotal: Long = 0
41-
var latestJVMRSSTotal: Long = 0
42-
var latestPythonVmemTotal: Long = 0
43-
var latestPythonRSSTotal: Long = 0
44-
var latestOtherVmemTotal: Long = 0
45-
var latestOtherRSSTotal: Long = 0
4648

47-
createProcessTree
49+
var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
50+
private var latestJVMVmemTotal: Long = 0
51+
private var latestJVMRSSTotal: Long = 0
52+
private var latestPythonVmemTotal: Long = 0
53+
private var latestPythonRSSTotal: Long = 0
54+
private var latestOtherVmemTotal: Long = 0
55+
private var latestOtherRSSTotal: Long = 0
4856

49-
def isItProcfsBased: Boolean = {
57+
computeProcessTree()
58+
59+
private def isItProcfsBased: Boolean = {
5060
val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
5161
if (testing) {
5262
return true
@@ -59,14 +69,14 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
5969
catch {
6070
case f: FileNotFoundException => return false
6171
}
62-
63-
val shouldLogStageExecutorProcessTreeMetrics = org.apache.spark.SparkEnv.get.conf.
64-
getBoolean("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", true)
65-
true && shouldLogStageExecutorProcessTreeMetrics
72+
val shouldLogStageExecutorMetrics =
73+
SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
74+
val shouldLogStageExecutorProcessTreeMetrics =
75+
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
76+
shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
6677
}
6778

68-
69-
def computePid(): Int = {
79+
private def computePid(): Int = {
7080
if (!isAvailable) {
7181
return -1;
7282
}
@@ -75,7 +85,7 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
7585
// https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
7686
val cmd = Array("bash", "-c", "echo $PPID")
7787
val length = 10
78-
var out: Array[Byte] = Array.fill[Byte](length)(0)
88+
val out: Array[Byte] = Array.fill[Byte](length)(0)
7989
Runtime.getRuntime.exec(cmd).getInputStream.read(out)
8090
val pid = Integer.parseInt(new String(out, "UTF-8").trim)
8191
return pid;
@@ -92,11 +102,18 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
92102
}
93103
}
94104

105+
private def computePageSize(): Unit = {
106+
val cmd = Array("getconf", "PAGESIZE")
107+
val out: Array[Byte] = Array.fill[Byte](10)(0)
108+
Runtime.getRuntime.exec(cmd).getInputStream.read(out)
109+
pageSize = Integer.parseInt(new String(out, "UTF-8").trim)
110+
}
95111

96-
def createProcessTree(): Unit = {
112+
private def computeProcessTree(): Unit = {
97113
if (!isAvailable) {
98114
return
99115
}
116+
computePageSize
100117
val queue: Queue[Int] = new Queue[Int]()
101118
queue += pid
102119
while( !queue.isEmpty ) {
@@ -112,35 +129,37 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
112129
}
113130
}
114131

115-
116-
def updateProcessTree(): Unit = {
117-
if (!isAvailable) {
118-
return
119-
}
120-
val queue: Queue[Int] = new Queue[Int]()
121-
queue += pid
122-
while( !queue.isEmpty ) {
123-
val p = queue.dequeue()
124-
val c = getChildPIds(p)
125-
if(!c.isEmpty) {
126-
queue ++= c
127-
val preChildren = ptree.get(p)
128-
preChildren match {
129-
case Some(children) => if (!c.toSet.equals(children)) {
130-
val diff: Set[Int] = children -- c.toSet
131-
ptree.update(p, c.toSet )
132-
diff.foreach(ptree.remove(_))
133-
}
134-
case None => ptree.update(p, c.toSet )
135-
}
132+
private def getChildPIds(pid: Int): ArrayBuffer[Int] = {
133+
try {
134+
val cmd = Array("pgrep", "-P", pid.toString)
135+
val input = Runtime.getRuntime.exec(cmd).getInputStream
136+
val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer()
137+
var d = input.read()
138+
while (d != -1) {
139+
childPidsInByte.append(d.asInstanceOf[Byte])
140+
d = input.read()
136141
}
137-
else {
138-
ptree.update(p, Set[Int]())
142+
input.close()
143+
val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n")
144+
val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]()
145+
for (p <- childPids) {
146+
if (p != "") {
147+
childPidsInInt += Integer.parseInt(p)
148+
}
139149
}
150+
childPidsInInt
151+
} catch {
152+
case e: IOException => logDebug("IO Exception when trying to compute process tree." +
153+
" As a result reporting of ProcessTree metrics is stopped")
154+
isAvailable = false
155+
return new mutable.ArrayBuffer()
156+
case _ => logDebug("Some exception occurred when trying to compute process tree." +
157+
" As a result reporting of ProcessTree metrics is stopped")
158+
isAvailable = false
159+
return new mutable.ArrayBuffer()
140160
}
141161
}
142162

143-
144163
/**
145164
* Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
146165
* info. I tried that but found it not correct during tests, so I used normal string analysis
@@ -152,7 +171,7 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
152171
val pidDir: File = new File(procfsDir, pid.toString)
153172
val fReader = new InputStreamReader(
154173
new FileInputStream(
155-
new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8"))
174+
new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))
156175
val in: BufferedReader = new BufferedReader(fReader)
157176
val procInfo = in.readLine
158177
in.close
@@ -172,16 +191,19 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
172191
latestOtherRSSTotal += procInfoSplit(23).toLong }
173192
}
174193
} catch {
175-
case f: FileNotFoundException => return null
194+
case f: FileNotFoundException =>
176195
}
177196
}
178197

198+
def updateAllMetrics(): Unit = {
199+
allMetrics = computeAllMetrics
200+
}
179201

180-
def getOtherRSSInfo(): Long = {
202+
private def computeAllMetrics(): ProcfsBasedSystemsMetrics = {
181203
if (!isAvailable) {
182-
return -1
204+
return ProcfsBasedSystemsMetrics(-1, -1, -1, -1, -1, -1)
183205
}
184-
updateProcessTree
206+
computeProcessTree
185207
val pids = ptree.keySet
186208
latestJVMRSSTotal = 0
187209
latestJVMVmemTotal = 0
@@ -190,82 +212,57 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
190212
latestOtherRSSTotal = 0
191213
latestOtherVmemTotal = 0
192214
for (p <- pids) {
193-
getProcessInfo(p)
215+
getProcessInfo(p)
194216
}
195-
latestOtherRSSTotal
217+
ProcfsBasedSystemsMetrics(
218+
getJVMVirtualMemInfo,
219+
getJVMRSSInfo,
220+
getPythonVirtualMemInfo,
221+
getPythonRSSInfo,
222+
getOtherVirtualMemInfo,
223+
getOtherRSSInfo)
224+
196225
}
197226

227+
def getOtherRSSInfo(): Long = {
228+
if (!isAvailable) {
229+
return -1
230+
}
231+
latestOtherRSSTotal*pageSize
232+
}
198233

199234
def getOtherVirtualMemInfo(): Long = {
200235
if (!isAvailable) {
201236
return -1
202237
}
203-
// We won't call updateProcessTree and also compute total virtual memory here
204-
// since we already did all of this when we computed RSS info
205238
latestOtherVmemTotal
206239
}
207240

208-
209241
def getJVMRSSInfo(): Long = {
210242
if (!isAvailable) {
211243
return -1
212244
}
213-
latestJVMRSSTotal
245+
latestJVMRSSTotal*pageSize
214246
}
215247

216-
217248
def getJVMVirtualMemInfo(): Long = {
218249
if (!isAvailable) {
219250
return -1
220251
}
221252
latestJVMVmemTotal
222253
}
223254

224-
225255
def getPythonRSSInfo(): Long = {
226256
if (!isAvailable) {
227257
return -1
228258
}
229-
latestPythonRSSTotal
259+
latestPythonRSSTotal*pageSize
230260
}
231261

232-
233262
def getPythonVirtualMemInfo(): Long = {
234263
if (!isAvailable) {
235264
return -1
236265
}
237266
latestPythonVmemTotal
238267
}
239-
240-
241-
def getChildPIds(pid: Int): ArrayBuffer[Int] = {
242-
try {
243-
val cmd = Array("pgrep", "-P", pid.toString)
244-
val input = Runtime.getRuntime.exec(cmd).getInputStream
245-
val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer()
246-
var d = input.read()
247-
while (d != -1) {
248-
childPidsInByte.append(d.asInstanceOf[Byte])
249-
d = input.read()
250-
}
251-
input.close()
252-
val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n")
253-
val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]()
254-
for (p <- childPids) {
255-
if (p != "") {
256-
childPidsInInt += Integer.parseInt(p)
257-
}
258-
}
259-
childPidsInInt
260-
} catch {
261-
case e: IOException => logDebug("IO Exception when trying to compute process tree." +
262-
" As a result reporting of ProcessTree metrics is stopped")
263-
isAvailable = false
264-
return new mutable.ArrayBuffer()
265-
case _ => logDebug("Some exception occurred when trying to compute process tree. As a result" +
266-
" reporting of ProcessTree metrics is stopped")
267-
isAvailable = false
268-
return new mutable.ArrayBuffer()
269-
}
270-
}
271268
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ package object config {
7474
.booleanConf
7575
.createWithDefault(false)
7676

77+
private[spark] val EVENT_LOG_PROCESS_TREE_METRICS =
78+
ConfigBuilder("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled")
79+
.booleanConf
80+
.createWithDefault(false)
81+
7782
private[spark] val EVENT_LOG_OVERWRITE =
7883
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
7984

core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.metrics
1919
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
2020
import javax.management.ObjectName
2121

22-
import org.apache.spark.executor.{ProcessTreeMetrics, ProcfsBasedSystems}
22+
import org.apache.spark.executor.ProcfsBasedSystems
2323
import org.apache.spark.memory.MemoryManager
2424

2525
/**
@@ -62,37 +62,38 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
6262

6363
case object ProcessTreeJVMRSSMemory extends ExecutorMetricType {
6464
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
65-
ExecutorMetricType.pTreeInfo.getJVMRSSInfo()
65+
ExecutorMetricType.pTreeInfo.updateAllMetrics()
66+
ExecutorMetricType.pTreeInfo.allMetrics.jvmRSSTotal
6667
}
6768
}
6869

6970
case object ProcessTreeJVMVMemory extends ExecutorMetricType {
7071
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
71-
ExecutorMetricType.pTreeInfo.getJVMVirtualMemInfo()
72+
ExecutorMetricType.pTreeInfo.allMetrics.jvmVmemTotal
7273
}
7374
}
7475

7576
case object ProcessTreePythonRSSMemory extends ExecutorMetricType {
7677
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
77-
ExecutorMetricType.pTreeInfo.getPythonRSSInfo()
78+
ExecutorMetricType.pTreeInfo.allMetrics.pythonRSSTotal
7879
}
7980
}
8081

8182
case object ProcessTreePythonVMemory extends ExecutorMetricType {
8283
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
83-
ExecutorMetricType.pTreeInfo.getPythonVirtualMemInfo()
84+
ExecutorMetricType.pTreeInfo.allMetrics.pythonVmemTotal
8485
}
8586
}
8687

8788
case object ProcessTreeOtherRSSMemory extends ExecutorMetricType {
8889
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
89-
ExecutorMetricType.pTreeInfo.getOtherRSSInfo()
90+
ExecutorMetricType.pTreeInfo.allMetrics.otherRSSTotal
9091
}
9192
}
9293

9394
case object ProcessTreeOtherVMemory extends ExecutorMetricType {
9495
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
95-
ExecutorMetricType.pTreeInfo.getOtherVirtualMemInfo()
96+
ExecutorMetricType.pTreeInfo.allMetrics.otherVmemTotal
9697
}
9798
}
9899

@@ -121,7 +122,7 @@ case object MappedPoolMemory extends MBeanExecutorMetricType(
121122
"java.nio:type=BufferPool,name=mapped")
122123

123124
private[spark] object ExecutorMetricType {
124-
final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems
125+
final val pTreeInfo = new ProcfsBasedSystems
125126

126127
// List of all executor metric types
127128
val values = IndexedSeq(

core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
[ {
2+
"id" : "application_1538416563558_0014",
3+
"name" : "PythonBisectingKMeansExample",
4+
"attempts" : [ {
5+
"startTime" : "2018-10-02T00:42:39.580GMT",
6+
"endTime" : "2018-10-02T00:44:02.338GMT",
7+
"lastUpdated" : "",
8+
"duration" : 82758,
9+
"sparkUser" : "root",
10+
"completed" : true,
11+
"appSparkVersion" : "2.5.0-SNAPSHOT",
12+
"lastUpdatedEpoch" : 0,
13+
"startTimeEpoch" : 1538440959580,
14+
"endTimeEpoch" : 1538441042338
15+
} ]
16+
}, {
217
"id" : "application_1506645932520_24630151",
318
"name" : "Spark shell",
419
"attempts" : [ {

0 commit comments

Comments
 (0)