-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16654][CORE] Add UI coverage for Application Level Blacklisting #16346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. @username squito
|
ok to test |
|
Test build #70393 has finished for PR 16346 at commit
|
| s"has timed out") | ||
| nodeIdToBlacklistExpiryTime --= nodesToUnblacklist | ||
| nodesToUnblacklist.foreach { node => | ||
| nodeIdToBlacklistExpiryTime.remove(node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already done two lines above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's an error. The foreach block is correct so that we can send an event on the following line.
| val node = failuresInTaskSet.node | ||
| executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists)) | ||
| listenerBus.post( | ||
| SparkListenerExecutorBlacklisted(now, exec, newTotal)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fits in previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| clock: Clock = new SystemClock()) extends Logging { | ||
|
|
||
| def this(sc: SparkContext) = { | ||
| this(sc.listenerBus, sc.getConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably ok to use sc.conf here to avoid another copy of the conf instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| @DeveloperApi | ||
| case class SparkListenerNodeBlacklisted( | ||
| time: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I don't remember nodeId being a thing in Spark. Do you mean a hostname here, or ip address, or something else? Should probably use a better name, or at least document what it is.
(I see BlacklistTracker uses that, but as a user, I don't know what "nodeId" is.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've been using "node" and "host" interchangeably in this change set. To be more clear, I'll write this as hostId here and in SparkListenerNodeUnblacklisted.
| /* | ||
| Implicitly blacklist every executor associated with this node, and show this in the UI. | ||
| */ | ||
| activeStorageStatusList.foreach{status => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach { status =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| they may have been blacklisted initially (either explicitly through executor blacklisting | ||
| or implicitly through node blacklisting). Show this in the UI. | ||
| */ | ||
| activeStorageStatusList.foreach{status => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach { status =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| clock.setTime(0) | ||
| blacklist = new BlacklistTracker(conf, clock) | ||
| blacklist = new BlacklistTracker(null, conf, clock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this tracker is ever used, won't you get NPEs because the listener bus is null?
This basically means you're forcing every test to call configureBlacklistAndScheduler() manually. So either remove this initialization, or do this properly, and just call configureBlacklistAndScheduler() for tests that need to override the default config.
Since I don't see any test actually passing any custom config parameters, might as well do the initialization properly here and get rid of configureBlacklistAndScheduler() altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. For now, I've left configureBlacklistAndScheduler() in place, since the two tests at the end do not call it.
| // We don't directly use the application blacklist, but its presence triggers blacklisting | ||
| // within the taskset. | ||
| val blacklistTrackerOpt = Some(new BlacklistTracker(conf, clock)) | ||
| val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment regarding the null listener bus. I'd be more confident if you provided something here (even if it's a mocked bus you never actually check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
|
||
| override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted) | ||
| : Unit = synchronized { | ||
| /* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the preference is to use line comments for these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
Test build #71226 has finished for PR 16346 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I am only just thinking of this now, but I think it would also be nice for that executor summary table to also list the blacklisted executors. Otherwise, just some minor changes.
(nevermind, I didn't notice the "Blacklisted" column)
| } | ||
|
|
||
| override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted) | ||
| : Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move param to second line, and double-indent. (and the other added methods in this file)
override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = synchronized {There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the Spark project has some style standards. It would be great if these could be expressed programmatically, and if failing to meet these standards caused a compile time error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can run dev/scalastyle to check but it still misses some things
| new TaskSetBlacklist(conf, stageId, clock) | ||
| } | ||
|
|
||
| def configureBlacklistAndScheduler(confs: (String, String)*): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you're never passing in confs, couldn't you get rid of this method, and just change the initialization in beforeEach to
listenerBusMock = mock[LiveListenerBus]
blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. The code is cleaner this way.
| } | ||
|
|
||
| test("Blacklisting individual tasks and checking for SparkListenerEvents") { | ||
| configureBlacklistAndScheduler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the rest of the test code here has changed a lot since you first started working on this, but I'd avoid putting in a whole test case just for this. Instead, I'd look at the existing test cases, find all occurrences of blacklist.isNodeBlacklisted and blacklist.isExecutorBlacklisted, and add appropriate verify(listenerBusMock) calls around all of those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly this doesn't need a dedicated test. But there will be a large number - dozens - of listenerBusMock checks for blacklisting and unblacklisting. I'm working on this now, but, there will be a substantial diff here.
A freebie is that by checking for the listener bus events we're also asserting that executors and nodes are being blacklisted with the correct number of failures, and we're making that number explicit.
|
cc @ajbozarth as you were interested in this earlier |
|
I haven't been able to take a detailed look at the code, but all of my UI concerns seemed to have been addressed in the previous pr. I am wondering what you mean by having the summary table list the blacklisted executors though @squito |
|
@ajbozarth whoops, I hadn't noticed the extra "Blacklisted" column in the summary table at the top -- I was thinking we'd add another row for blacklisted executors. But I actually think the current version is better. So ignore that comment :) (mentioned the same to @jsoltren directly) |
|
Test build #71541 has finished for PR 16346 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of really minor things left, otherwise lgtm!
| } | ||
|
|
||
| private def updateExecutorBlacklist( | ||
| eid: String, isBlacklisted: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when multi-line, each arg goes on its own line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| : Unit = synchronized { | ||
| /* | ||
| Implicitly blacklist every executor associated with this node, and show this in the UI. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: single line comment //
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| new TaskSetBlacklist(conf, stageId, clock) | ||
| } | ||
|
|
||
| def configureBlacklistAndScheduler: Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that you're doing the equivalent thing in beforeEach, this entire method is unnecessary. beforeEach gets called automatically for each test so you don't need to call this explicitly.
sorry I was unclear about this earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, fixed.
|
the test failures look real and from the changes you've made, can you take a look? |
Strikes nodeId and replaces with hostId in resources for JSON tests.
|
The test failures are due to s/nodeId/hostId/ in the JSON code but not the tests. This is passing locally now with the changes I'm about to push to this PR. Thanks. |
|
Test build #71609 has finished for PR 16346 at commit
|
|
merged to master, thanks @jsoltren! |
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh  Author: José Hiram Soltren <[email protected]> Closes apache#16346 from jsoltren/SPARK-16654-submit.
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh  Author: José Hiram Soltren <[email protected]> Closes apache#16346 from jsoltren/SPARK-16654-submit.
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh  Author: José Hiram Soltren <[email protected]> Closes apache#16346 from jsoltren/SPARK-16654-submit.
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler.
What changes were proposed in this pull request?
Adds a UI to these patches by:
Updates .rat-excludes to pass tests.
@username squito
How was this patch tested?
./dev/run-tests

testOnly org.apache.spark.util.JsonProtocolSuite
testOnly org.apache.spark.scheduler.BlacklistTrackerSuite
testOnly org.apache.spark.deploy.history.HistoryServerSuite
https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh