-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25449][CORE] Heartbeat shouldn't include accumulators for zero metrics #22473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@zsxwing for review |
|
add to whitelist |
| taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) | ||
| accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) | ||
| accumUpdates += | ||
| ((taskRunner.taskId, taskRunner.task.metrics.accumulators().filterNot(_.isZero))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a flag for this behavior change?
|
Test build #96301 has finished for PR 22473 at commit
|
|
Test build #96284 has finished for PR 22473 at commit
|
|
Test build #96279 has finished for PR 22473 at commit
|
|
Test build #96303 has finished for PR 22473 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. For the test, I suggest we refactor codes to make it easy to test. E.g., we can add the following method and test it directly to make the test simple:
private def collectAccumulatorUpdates(
taskRunners: Iterable[TaskRunner]): Array[(Long, Seq[AccumulatorV2[_, _]])] = {
val curGCTime = computeTotalGcTime()
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
for (taskRunner <- taskRunners) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
val accumulatorsToReport =
if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) {
taskRunner.task.metrics.accumulators().filterNot(_.isZero)
} else {
taskRunner.task.metrics.accumulators()
}
accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
}
}
accumUpdates.toArray
}
|
|
||
| // Whether to load classes in user jars before those in Spark jars | ||
| private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) | ||
| private val userClassPathFirst = conf.getBoolean(EXECUTOR_USER_CLASS_PATH_FIRST.key, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
| // Executor for the heartbeat task. | ||
| private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, | ||
| "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) | ||
| "executor-heartbeater", conf.getTimeAsMs(EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: conf.get(EXECUTOR_HEARTBEAT_INTERVAL). Could you search the whole code base and update them as well?
| * heartbeats about 10 minutes because the heartbeat interval is 10s. | ||
| */ | ||
| private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) | ||
| private val HEARTBEAT_MAX_FAILURES = conf.getInt(EXECUTOR_HEARTBEAT_MAX_FAILURES.key, 60) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| try { | ||
| val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( | ||
| message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) | ||
| message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a new apply method to object RpcTimeout to support ConfigEntry?
| taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) | ||
| accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) | ||
| val accumulatorsToReport = | ||
| if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer to keep this config value close to HEARTBEAT_MAX_FAILURES to avoid searching it in configs every heartbeat.
| ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional | ||
|
|
||
| private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = | ||
| ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call it spark.executor.heartbeat.dropZeroAccumulatorUpdates? externalAccums may contain user accumulators and not all of them are metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please call internal() to indicate that this is not a public config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question -- when would you not want this to be true? It's already changing behavior here, but what's the case where you need a safety valve to go back? it's just not broadcasting changes that can't matter because they're zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Since the user can see these accumulator updates in the public API SparkListenerExecutorMetricsUpdate, I would prefer to add a flag in case someone really needs these zero updates. E.g., a user may use the listener API to get all accumulators used in a task. After this change, they cannot get them until the task finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it's an internal-only safety-valve flag.
| .createWithDefaultString("10s") | ||
|
|
||
| private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = | ||
| ConfigBuilder("spark.executor.heartbeat.maxFailures").intConf.createWithDefault(60) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: call internal() to indicate that this is not a public config.
| } | ||
|
|
||
| private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = { | ||
| val c = "spark.executor.heartbeat.dropZeroMetrics" -> dropZeroMetrics.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key
| f(executor, heartbeats) | ||
| } | ||
|
|
||
| private def invokeReportHeartbeat(executor: Executor): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can mixin org.scalatest.PrivateMethodTester to replace this method, such as
val reportHeartBeat = PrivateMethod[Long]('reportHeartBeat)
...
executor.invokePrivate(reportHeartBeat())
|
Pushed updates. @zsxwing , I can add refactor Executor.scala so testing is easier, but I had hoped that doing it this way would allow more testing for reportHeartbeat in the future. We have pretty good coverage for receiving heartbeats but not much for sending. |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some nits
|
|
||
| val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") | ||
| val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") | ||
| val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change getTimeAsMs back to getTimeAsSeconds? There is a slight difference when the user doesn't specify the time unit. getTimeAsMs uses ms as default, while getTimeAsSeconds uses seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part in the code only does some validation logic on the values and possibly throws an error. Because we changed it to use get(EXECUTOR_HEARTBEAT_INTERVAL), which returns the value in ms, I wanted the other one to return value in ms as well so we're comparing values in the same unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use getTimeAsSeconds and manually convert it to ms?
| require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + | ||
| s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + | ||
| s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") | ||
| s"spark.network.timeout=${executorTimeoutThreshold}ms must be no less than the value of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "ms" -> "s" once you address the above comment
|
Test build #96704 has finished for PR 22473 at commit
|
|
retest this please |
|
Test build #96705 has finished for PR 22473 at commit
|
|
Test build #96712 has finished for PR 22473 at commit
|
|
Looks like |
|
LGTM pending tests |
|
Test build #96771 has finished for PR 22473 at commit
|
|
Thanks! Merging to master. |
… metrics ## What changes were proposed in this pull request? Heartbeat shouldn't include accumulators for zero metrics. Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22473 from mukulmurthy/25449-heartbeat. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
… metrics ## What changes were proposed in this pull request? Heartbeat shouldn't include accumulators for zero metrics. Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22473 from mukulmurthy/25449-heartbeat. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in #22473 from master. ## How was this patch tested? Jenkins Closes #24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in apache#22473 from master. ## How was this patch tested? Jenkins Closes apache#24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in apache#22473 from master. ## How was this patch tested? Jenkins Closes apache#24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in apache#22473 from master. ## How was this patch tested? Jenkins Closes apache#24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in apache/spark#22473 from master. ## How was this patch tested? Jenkins Closes #24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 53658ab)
## What changes were proposed in this pull request? Right now as we cast the heartbeat interval to seconds, any value less than 1 second will be casted to 0. This PR just backports the changes of the heartbeat interval in apache/spark#22473 from master. ## How was this patch tested? Jenkins Closes #24329 from zsxwing/SPARK-27419. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Heartbeat shouldn't include accumulators for zero metrics.
Heartbeats sent from executors to the driver every 10 seconds contain metrics and are generally on the order of a few KBs. However, for large jobs with lots of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die with heartbeat failures. We can mitigate this by not sending zero metrics to the driver.
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.