@@ -41,11 +41,11 @@ private[spark] case class ProcfsMetrics(
4141
4242// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
4343// project.
44- private [spark] class ProcfsMetricsGetter (val procfsDir : String = " /proc/" ) extends Logging {
45- val procfsStatFile = " stat"
46- val testing = sys.env.contains(" SPARK_TESTING" ) || sys.props.contains(" spark.testing" )
47- val pageSize = computePageSize()
48- var isAvailable : Boolean = isProcfsAvailable
44+ private [spark] class ProcfsMetricsGetter (procfsDir : String = " /proc/" ) extends Logging {
45+ private val procfsStatFile = " stat"
46+ private val testing = sys.env.contains(" SPARK_TESTING" ) || sys.props.contains(" spark.testing" )
47+ private val pageSize = computePageSize()
48+ private var isAvailable : Boolean = isProcfsAvailable
4949 private val pid = computePid()
5050
5151 private lazy val isProcfsAvailable : Boolean = {
@@ -170,32 +170,33 @@ private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") exten
170170 // http://man7.org/linux/man-pages/man5/proc.5.html
171171 try {
172172 val pidDir = new File (procfsDir, pid.toString)
173- Utils .tryWithResource(new InputStreamReader (
174- new FileInputStream (
175- new File (pidDir, procfsStatFile)), Charset .forName(" UTF-8" ))) { fReader =>
176- val in = new BufferedReader (fReader)
177- val procInfo = in.readLine
178- val procInfoSplit = procInfo.split(" " )
179- val vmem = procInfoSplit(22 ).toLong
180- val rssMem = procInfoSplit(23 ).toLong * pageSize
181- if (procInfoSplit(1 ).toLowerCase(Locale .US ).contains(" java" )) {
182- allMetrics.copy(
183- jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
184- jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
185- )
186- }
187- else if (procInfoSplit(1 ).toLowerCase(Locale .US ).contains(" python" )) {
188- allMetrics.copy(
189- pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
190- pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
191- )
192- }
193- else {
194- allMetrics.copy(
195- otherVmemTotal = allMetrics.otherVmemTotal + vmem,
196- otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
197- )
198- }
173+ def openReader (): BufferedReader = {
174+ val f = new File (new File (procfsDir, pid.toString), procfsStatFile)
175+ new BufferedReader (new InputStreamReader (new FileInputStream (f), Charset .forName(" UTF-8" )))
176+ }
177+ Utils .tryWithResource(openReader) { in =>
178+ val procInfo = in.readLine
179+ val procInfoSplit = procInfo.split(" " )
180+ val vmem = procInfoSplit(22 ).toLong
181+ val rssMem = procInfoSplit(23 ).toLong * pageSize
182+ if (procInfoSplit(1 ).toLowerCase(Locale .US ).contains(" java" )) {
183+ allMetrics.copy(
184+ jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
185+ jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
186+ )
187+ }
188+ else if (procInfoSplit(1 ).toLowerCase(Locale .US ).contains(" python" )) {
189+ allMetrics.copy(
190+ pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
191+ pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
192+ )
193+ }
194+ else {
195+ allMetrics.copy(
196+ otherVmemTotal = allMetrics.otherVmemTotal + vmem,
197+ otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
198+ )
199+ }
199200 }
200201 } catch {
201202 case f : IOException =>
@@ -213,6 +214,11 @@ private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") exten
213214 var allMetrics = ProcfsMetrics (0 , 0 , 0 , 0 , 0 , 0 )
214215 for (p <- pids) {
215216 allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
217+ // if we had an error getting any of the metrics, we don't want to report partial metrics, as
218+ // that would be misleading.
219+ if (! isAvailable) {
220+ return ProcfsMetrics (0 , 0 , 0 , 0 , 0 , 0 )
221+ }
216222 }
217223 allMetrics
218224 }
0 commit comments