-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17365][Core] Remove/Kill multiple executors together to reduce RPC call time. #15152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… contention [SPARK-17365][Core] Return executorIds which are actually killed while killing multiple executors Fix import wrap and remove unused import Add mima exclude and refactor few methods.
|
ok to test |
|
add to whitelist |
|
Test build #65609 has finished for PR 15152 at commit
|
|
Test build #65618 has finished for PR 15152 at commit
|
|
Test build #65655 has finished for PR 15152 at commit
|
|
PySpark unit tests failure unrelated. |
|
retest this please |
|
Test build #65675 has finished for PR 15152 at commit
|
|
@vanzin, @tgravescs Can you review these changes. I will resolve the MiMa exclude merge conflicts once the PR is ready to be accepted - as these are being added often. Thanks. |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok, a few minor things to fix though.
| def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) | ||
| def killExecutor(executorId: String): Boolean = { | ||
| val killedExecutors = killExecutors(Seq(executorId)) | ||
| killedExecutors.nonEmpty && killedExecutors(0).equals(executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I seriously hope the backend is not killing some random executor instead of the one that was asked...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't. Just added an additional check here to be sure. I need to change the short circuit operator here to &.
| .filter { id => force || !scheduler.isExecutorBusy(id) } | ||
| executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } | ||
|
|
||
| logInfo(s"Requesting to kill filtered executor(s) ${executorsToKill.mkString(", ")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds weird. The following looks slightly better: "Actual list of executors to be killed is ...".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one sounds better.
| val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) | ||
|
|
||
| killResponse.flatMap(killSuccessful => | ||
| Future.successful ( if (killSuccessful) executorsToKill else Seq.empty[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no space before '('
| assert(executorsPendingToRemove(manager).contains("2")) | ||
| assert(executorsPendingToRemove(manager).contains("3")) | ||
| assert(!removeExecutor(manager, "100")) // remove non-existent executors | ||
| assert(!(removeExecutors(manager, Seq("101", "102")) == Seq("101", "102"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!= doesn't work?
|
|
||
| // Keep removing until the limit is reached | ||
| assert(executorsPendingToRemove(manager).isEmpty) | ||
| assert(removeExecutors(manager, Seq("1")) == Seq("1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use === in all these.
|
|
||
| def canCreate(masterURL: String): Boolean = masterURL == "myDummyLocalExternalClusterManager" | ||
|
|
||
| override def createTaskScheduler(sc: SparkContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each parameter on a separate line when breaking. Happens in a bunch of methods here.
| ): Boolean = sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) | ||
|
|
||
| override def requestExecutors(numAdditionalExecutors: Int): Boolean = | ||
| sc.requestExecutors(numAdditionalExecutors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
|
@vanzin - fixed minor nits and indentation. |
|
Test build #65729 has finished for PR 15152 at commit
|
| def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) | ||
| def killExecutor(executorId: String): Boolean = { | ||
| val killedExecutors = killExecutors(Seq(executorId)) | ||
| killedExecutors.nonEmpty & killedExecutors(0).equals(executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be short-circuited, otherwise it will throw an exception if the list is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! The earlier one was fine. Missed it.
|
Test build #65740 has finished for PR 15152 at commit
|
|
LGTM. Merging to master. |
What changes were proposed in this pull request?
We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor.
How was this patch tested?
Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled.