Skip to content

Commit 4659f4a

Browse files
author
Reza Safi
committed
Update the use of process builder and applying other review comments
1 parent 6e65360 commit 4659f4a

File tree

7 files changed

+36
-38
lines changed

7 files changed

+36
-38
lines changed

core/src/main/scala/org/apache/spark/Heartbeater.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ private[spark] class Heartbeater(
6060
}
6161

6262
/**
63-
* Get the current executor level metrics. These are returned as an array
63+
* Get the current executor level metrics. These are returned as an array, with the index
64+
* determined by ExecutorMetricType.metricToOffset
6465
*/
6566
def getCurrentMetrics(): ExecutorMetrics = {
67+
6668
val metrics = new Array[Long](ExecutorMetricType.numMetrics)
6769
var offset = 0
6870
ExecutorMetricType.metricGetters.foreach { metric =>

core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.metrics.ExecutorMetricType
2727
*/
2828
@DeveloperApi
2929
class ExecutorMetrics private[spark] extends Serializable {
30-
30+
// Metrics are indexed by ExecutorMetricType.metricToOffset
3131
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
3232
// the first element is initialized to -1, indicating that the values for the array
3333
// haven't been set yet.

core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,22 @@ private[spark] class ProcfsMetricsGetter(
138138
}
139139
val stdoutThread = Utils.processStreamByLine("read stdout for pgrep",
140140
process.getInputStream, appendChildPid)
141-
val error = process.getErrorStream
142-
var errorString = ""
143-
(0 until error.available()).foreach { i =>
144-
errorString += error.read()
145-
}
141+
val errorStringBuilder = new StringBuilder()
142+
val stdErrThread = Utils.processStreamByLine(
143+
"stderr for pgrep",
144+
process.getErrorStream,
145+
{ line =>
146+
errorStringBuilder.append(line)
147+
})
146148
val exitCode = process.waitFor()
147149
stdoutThread.join()
150+
stdErrThread.join()
151+
val errorString = errorStringBuilder.toString()
148152
// pgrep will have exit code of 1 if there are more than one child process
149153
// and it will have a exit code of 2 if there is no child process
150154
if (exitCode != 0 && exitCode > 2) {
151155
val cmd = builder.command().toArray.mkString(" ")
152-
logWarning(s"Process $cmd" +
153-
s" exited with code $exitCode, with stderr:" + s"${errorString} ")
156+
logWarning(s"Process $cmd exited with code $exitCode and stderr: $errorString")
154157
throw new SparkException(s"Process $cmd exited with code $exitCode")
155158
}
156159
childPidsInInt
@@ -165,12 +168,9 @@ private[spark] class ProcfsMetricsGetter(
165168

166169
def addProcfsMetricsFromOneProcess(
167170
allMetrics: ProcfsMetrics,
168-
pid: Int):
169-
ProcfsMetrics = {
171+
pid: Int): ProcfsMetrics = {
170172

171-
// Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
172-
// info. I tried that but found it not correct during tests, so I used normal string analysis
173-
// instead. The computation of RSS and Vmem are based on proc(5):
173+
// The computation of RSS and Vmem are based on proc(5):
174174
// http://man7.org/linux/man-pages/man5/proc.5.html
175175
try {
176176
val pidDir = new File(procfsDir, pid.toString)
@@ -180,28 +180,25 @@ private[spark] class ProcfsMetricsGetter(
180180
Utils.tryWithResource( new BufferedReader(fReader)) { in =>
181181
val procInfo = in.readLine
182182
val procInfoSplit = procInfo.split(" ")
183-
if (procInfoSplit != null) {
184-
val vmem = procInfoSplit(22).toLong
185-
val rssPages = procInfoSplit(23).toLong
186-
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
187-
return allMetrics.copy(
188-
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
189-
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssPages*pageSize)
190-
)
191-
}
192-
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
193-
return allMetrics.copy(
194-
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
195-
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssPages*pageSize)
196-
)
197-
}
198-
return allMetrics.copy(
199-
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
200-
otherRSSTotal = allMetrics.otherRSSTotal + (rssPages*pageSize)
201-
)
183+
val vmem = procInfoSplit(22).toLong
184+
val rssMem = procInfoSplit(23).toLong*pageSize
185+
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
186+
allMetrics.copy(
187+
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
188+
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
189+
)
190+
}
191+
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
192+
allMetrics.copy(
193+
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
194+
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
195+
)
202196
}
203197
else {
204-
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
198+
allMetrics.copy(
199+
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
200+
otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
201+
)
205202
}
206203
}
207204
}

core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,8 @@ private[spark] object ExecutorMetricType {
146146
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
147147
metricGetters.foreach { m =>
148148
var metricInSet = 0
149-
while (metricInSet < m.names.length) {
150-
definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics))
151-
metricInSet += 1
149+
(0 until m.names.length).foreach { idx =>
150+
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
152151
}
153152
numberOfMetrics += m.names.length
154153
}

core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite
2222

2323
class ProcfsMetricsGetterSuite extends SparkFunSuite {
2424

25-
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcessTree"), 4096L)
25+
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"), 4096L)
2626

2727
test("testGetProcessInfo") {
2828
var r = ProcfsMetrics(0, 0, 0, 0, 0, 0)

0 commit comments

Comments
 (0)