Skip to content

Commit 18fe510

Browse files
author
Reza Safi
committed
Removing timestamp from case class per reviewers request and adding a synchronized block
1 parent 7a21efc commit 18fe510

File tree

2 files changed

+36
-26
lines changed

2 files changed

+36
-26
lines changed

core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ private[spark] case class ProcfsMetrics(
3737
pythonVmemTotal: Long,
3838
pythonRSSTotal: Long,
3939
otherVmemTotal: Long,
40-
otherRSSTotal: Long,
41-
timeStamp: Long)
40+
otherRSSTotal: Long)
4241

4342

4443
// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
@@ -49,7 +48,8 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
4948
private val pageSize = computePageSize()
5049
private var isAvailable: Boolean = isProcfsAvailable
5150
private val pid = computePid()
52-
var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0)
51+
var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0)
52+
var lastimeMetricsComputed = 0L
5353
private val HEARTBEAT_INTERVAL_MS = if (testing) {
5454
0
5555
} else {
@@ -186,54 +186,64 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
186186
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
187187
allMetrics.copy(
188188
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
189-
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem),
190-
timeStamp = System.currentTimeMillis
189+
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
191190
)
192191
}
193192
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
194193
allMetrics.copy(
195194
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
196-
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem),
197-
timeStamp = System.currentTimeMillis
195+
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
198196
)
199197
}
200198
else {
201199
allMetrics.copy(
202200
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
203-
otherRSSTotal = allMetrics.otherRSSTotal + (rssMem),
204-
timeStamp = System.currentTimeMillis
201+
otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
205202
)
206203
}
207204
}
208205
} catch {
209206
case f: IOException =>
210207
logWarning("There was a problem with reading" +
211208
" the stat file of the process. ", f)
212-
ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
209+
ProcfsMetrics(0, 0, 0, 0, 0, 0)
213210
}
214211
}
215212

213+
private[spark] def isCacheValid(): Boolean = {
214+
val lastMetricComputation = System.currentTimeMillis() - lastimeMetricsComputed
215+
// ToDo: Should we make this configurable?
216+
return Math.min(1000, HEARTBEAT_INTERVAL_MS) > lastMetricComputation
217+
}
218+
216219
private[spark] def computeAllMetrics(): ProcfsMetrics = {
217220
if (!isAvailable) {
218-
return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
221+
lastimeMetricsComputed = System.currentTimeMillis
222+
cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0)
223+
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
219224
}
220-
val lastMetricComputation = System.currentTimeMillis() - cachedAllMetric.timeStamp
221-
// Check whether we have computed the metrics in the past 1s
222-
// ToDo: Should we make this configurable?
223-
if(lastMetricComputation > Math.min(1000, HEARTBEAT_INTERVAL_MS)) {
224-
val pids = computeProcessTree
225-
var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
226-
for (p <- pids) {
227-
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
228-
// if we had an error getting any of the metrics, we don't
229-
// want to report partial metrics, as that would be misleading.
230-
if (!isAvailable) {
231-
cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
225+
226+
if (!isCacheValid) {
227+
this.synchronized {
228+
if (isCacheValid) {
232229
return cachedAllMetric
233230
}
231+
val pids = computeProcessTree
232+
var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
233+
for (p <- pids) {
234+
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
235+
// if we had an error getting any of the metrics, we don't
236+
// want to report partial metrics, as that would be misleading.
237+
if (!isAvailable) {
238+
lastimeMetricsComputed = System.currentTimeMillis
239+
cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0)
240+
return cachedAllMetric
241+
}
242+
}
243+
lastimeMetricsComputed = System.currentTimeMillis
244+
cachedAllMetric = allMetrics
245+
allMetrics
234246
}
235-
cachedAllMetric = allMetrics
236-
allMetrics
237247
}
238248
else {
239249
cachedAllMetric

core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class ProcfsMetricsGetterSuite extends SparkFunSuite {
2525
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
2626

2727
test("testGetProcessInfo") {
28-
var r = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0)
28+
var r = ProcfsMetrics(0, 0, 0, 0, 0, 0)
2929
r = p.addProcfsMetricsFromOneProcess(r, 26109)
3030
assert(r.jvmVmemTotal == 4769947648L)
3131
assert(r.jvmRSSTotal == 262610944)

0 commit comments

Comments
 (0)