-
Notifications
You must be signed in to change notification settings - Fork 974
Support eagerly kill redundant executors #4592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .createWithDefault(true) | ||
|
|
||
| val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED = | ||
| buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's valuable to introduce a new namespace spark.sql.finalWriteStage.
| buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled") | |
| buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.enabled") |
Codecov Report
@@ Coverage Diff @@
## master #4592 +/- ##
============================================
+ Coverage 53.26% 53.30% +0.03%
Complexity 13 13
============================================
Files 577 577
Lines 31557 31568 +11
Branches 4244 4245 +1
============================================
+ Hits 16810 16827 +17
+ Misses 13161 13153 -8
- Partials 1586 1588 +2
... and 11 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
| // - target executors > min executors | ||
| val numActiveExecutors = sc.getExecutorIds().length | ||
| val expectedCores = partitionSpecs.length | ||
| val targetExecutors = (((expectedCores / executorCores) + 1) * factor).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(expectedCores / executorCores) + 1 => math.ceil(expectedCores.toFloat / executorCores)
| // - target executors > min executors | ||
| val numActiveExecutors = sc.getExecutorIds().length | ||
| val expectedCores = partitionSpecs.length | ||
| val targetExecutors = (((expectedCores / executorCores) + 1) * factor).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep at least 1 exec?
| executorIdsToKill.toSeq | ||
| } | ||
|
|
||
| private def killExecutors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.SparkContext#killExecutors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a story about DRA. Since apache/spark#20604, org.apache.spark.SparkContext#killExecutors does not allow with DRA ON, so this pr hack the internal interface to kill executors. I think that pr is not very reaonable, it should be ok to kill executors with DRA ON if the min executor is less than the target executor.
This is not always true,
|
| */ | ||
| case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPlan] { | ||
| override def apply(plan: SparkPlan): SparkPlan = { | ||
| if (!conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamicAllocation enabled?
yea, but at least, the JIT should work better for shuffle read and scheduler related code with the older one
it really depend on the machine .. I'm not sure how to find the bad machine in a easy way
We first kill executor according to the alive time and exclude the one who has shuffle data, so it should be safe for this case. |
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is for unstable calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, e.g., CoarseGrainedSchedulerBackend is under private[spark]
|
thanks for review, merging to master ! |
Why are the changes needed?
This pr adds a new rule
FinalStageResourceManagerto eagerly kill redundant executorsWe first get the final stage partition which is the actually required cores, then kill the redundant executors. The priority of kill executors follow:
The reason why add this feature is that, if the previous stage contains lots executors but final stage has less, then the tasks of final stage would be scheduled randomly in all exists executors which may cause resource waste. e.g., each executor only run 1 or 2 tasks but holds 4 or 5 cores.
How was this patch tested?
test manually