@@ -187,9 +187,6 @@ public class CapacityScheduler extends
187187
188188 private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr ;
189189
190- // timeout to join when we stop this service
191- protected final long THREAD_JOIN_TIMEOUT_MS = 1000 ;
192-
193190 private PreemptionManager preemptionManager = new PreemptionManager ();
194191
195192 private volatile boolean isLazyPreemptionEnabled = false ;
@@ -227,27 +224,14 @@ public Configuration getConf() {
227224 private ResourceCalculator calculator ;
228225 private boolean usePortForNodeName ;
229226
230- private boolean scheduleAsynchronously ;
231- @ VisibleForTesting
232- protected List <AsyncScheduleThread > asyncSchedulerThreads ;
233- private ResourceCommitterService resourceCommitterService ;
227+ private AsyncSchedulingConfiguration asyncSchedulingConf ;
234228 private RMNodeLabelsManager labelManager ;
235229 private AppPriorityACLsManager appPriorityACLManager ;
236230 private boolean multiNodePlacementEnabled ;
237231
238232 private boolean printedVerboseLoggingForAsyncScheduling ;
239233 private boolean appShouldFailFast ;
240234
241- /**
242- * EXPERT
243- */
244- private long asyncScheduleInterval ;
245- private static final String ASYNC_SCHEDULER_INTERVAL =
246- CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_PREFIX
247- + ".scheduling-interval-ms" ;
248- private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5 ;
249- private long asyncMaxPendingBacklogs ;
250-
251235 private CSMaxRunningAppsEnforcer maxRunningEnforcer ;
252236
253237 public CapacityScheduler () {
@@ -376,27 +360,7 @@ private ResourceCalculator initResourceCalculator() {
376360 }
377361
378362 private void initAsyncSchedulingProperties () {
379- scheduleAsynchronously = this .conf .getScheduleAynschronously ();
380- asyncScheduleInterval = this .conf .getLong (ASYNC_SCHEDULER_INTERVAL ,
381- DEFAULT_ASYNC_SCHEDULER_INTERVAL );
382-
383- // number of threads for async scheduling
384- int maxAsyncSchedulingThreads = this .conf .getInt (
385- CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD , 1 );
386- maxAsyncSchedulingThreads = Math .max (maxAsyncSchedulingThreads , 1 );
387-
388- if (scheduleAsynchronously ) {
389- asyncSchedulerThreads = new ArrayList <>();
390- for (int i = 0 ; i < maxAsyncSchedulingThreads ; i ++) {
391- asyncSchedulerThreads .add (new AsyncScheduleThread (this ));
392- }
393- resourceCommitterService = new ResourceCommitterService (this );
394- asyncMaxPendingBacklogs = this .conf .getInt (
395- CapacitySchedulerConfiguration .
396- SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS ,
397- CapacitySchedulerConfiguration .
398- DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS );
399- }
363+ this .asyncSchedulingConf = new AsyncSchedulingConfiguration (conf , this );
400364 }
401365
402366 private void initMultiNodePlacement () {
@@ -419,8 +383,8 @@ private void printSchedulerInitialized() {
419383 getResourceCalculator ().getClass (),
420384 getMinimumResourceCapability (),
421385 getMaximumResourceCapability (),
422- scheduleAsynchronously ,
423- asyncScheduleInterval ,
386+ asyncSchedulingConf . isScheduleAsynchronously () ,
387+ asyncSchedulingConf . getAsyncScheduleInterval () ,
424388 multiNodePlacementEnabled ,
425389 assignMultipleEnabled ,
426390 maxAssignPerHeartbeat ,
@@ -431,15 +395,7 @@ private void startSchedulerThreads() {
431395 writeLock .lock ();
432396 try {
433397 activitiesManager .start ();
434- if (scheduleAsynchronously ) {
435- Preconditions .checkNotNull (asyncSchedulerThreads ,
436- "asyncSchedulerThreads is null" );
437- for (Thread t : asyncSchedulerThreads ) {
438- t .start ();
439- }
440-
441- resourceCommitterService .start ();
442- }
398+ asyncSchedulingConf .startThreads ();
443399 } finally {
444400 writeLock .unlock ();
445401 }
@@ -465,14 +421,7 @@ public void serviceStop() throws Exception {
465421 writeLock .lock ();
466422 try {
467423 this .activitiesManager .stop ();
468- if (scheduleAsynchronously && asyncSchedulerThreads != null ) {
469- for (Thread t : asyncSchedulerThreads ) {
470- t .interrupt ();
471- t .join (THREAD_JOIN_TIMEOUT_MS );
472- }
473- resourceCommitterService .interrupt ();
474- resourceCommitterService .join (THREAD_JOIN_TIMEOUT_MS );
475- }
424+ asyncSchedulingConf .serviceStopInvoked ();
476425 } finally {
477426 writeLock .unlock ();
478427 }
@@ -539,7 +488,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
539488 }
540489
541490 long getAsyncScheduleInterval () {
542- return asyncScheduleInterval ;
491+ return asyncSchedulingConf . getAsyncScheduleInterval () ;
543492 }
544493
545494 private final static Random random = new Random (System .currentTimeMillis ());
@@ -671,6 +620,11 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
671620 Thread .sleep (cs .getAsyncScheduleInterval ());
672621 }
673622
623+ @ VisibleForTesting
624+ public void setAsyncSchedulingConf (AsyncSchedulingConfiguration conf ) {
625+ this .asyncSchedulingConf = conf ;
626+ }
627+
674628 static class AsyncScheduleThread extends Thread {
675629
676630 private final CapacityScheduler cs ;
@@ -692,7 +646,7 @@ public void run() {
692646 } else {
693647 // Don't run schedule if we have some pending backlogs already
694648 if (cs .getAsyncSchedulingPendingBacklogs ()
695- > cs .asyncMaxPendingBacklogs ) {
649+ > cs .asyncSchedulingConf . getAsyncMaxPendingBacklogs () ) {
696650 Thread .sleep (1 );
697651 } else {
698652 schedule (cs );
@@ -1479,7 +1433,7 @@ protected void nodeUpdate(RMNode rmNode) {
14791433 }
14801434
14811435 // Try to do scheduling
1482- if (!scheduleAsynchronously ) {
1436+ if (!asyncSchedulingConf . isScheduleAsynchronously () ) {
14831437 writeLock .lock ();
14841438 try {
14851439 // reset allocation and reservation stats before we start doing any
@@ -2291,8 +2245,8 @@ private void addNode(RMNode nodeManager) {
22912245 "Added node " + nodeManager .getNodeAddress () + " clusterResource: "
22922246 + clusterResource );
22932247
2294- if (scheduleAsynchronously && getNumClusterNodes () == 1 ) {
2295- for (AsyncScheduleThread t : asyncSchedulerThreads ) {
2248+ if (asyncSchedulingConf . isScheduleAsynchronously () && getNumClusterNodes () == 1 ) {
2249+ for (AsyncScheduleThread t : asyncSchedulingConf . asyncSchedulerThreads ) {
22962250 t .beginSchedule ();
22972251 }
22982252 }
@@ -2340,11 +2294,7 @@ private void removeNode(RMNode nodeInfo) {
23402294 new ResourceLimits (clusterResource ));
23412295 int numNodes = nodeTracker .nodeCount ();
23422296
2343- if (scheduleAsynchronously && numNodes == 0 ) {
2344- for (AsyncScheduleThread t : asyncSchedulerThreads ) {
2345- t .suspendSchedule ();
2346- }
2347- }
2297+ asyncSchedulingConf .nodeRemoved (numNodes );
23482298
23492299 LOG .info (
23502300 "Removed node " + nodeInfo .getNodeAddress () + " clusterResource: "
@@ -3092,9 +3042,9 @@ public void submitResourceCommitRequest(Resource cluster,
30923042 return ;
30933043 }
30943044
3095- if (scheduleAsynchronously ) {
3045+ if (asyncSchedulingConf . isScheduleAsynchronously () ) {
30963046 // Submit to a commit thread and commit it async-ly
3097- resourceCommitterService .addNewCommitRequest (request );
3047+ asyncSchedulingConf . resourceCommitterService .addNewCommitRequest (request );
30983048 } else {
30993049 // Otherwise do it sync-ly.
31003050 tryCommit (cluster , request , true );
@@ -3339,10 +3289,7 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
33393289 }
33403290
33413291 public int getAsyncSchedulingPendingBacklogs () {
3342- if (scheduleAsynchronously ) {
3343- return resourceCommitterService .getPendingBacklogs ();
3344- }
3345- return 0 ;
3292+ return asyncSchedulingConf .getPendingBacklogs ();
33463293 }
33473294
33483295 @ Override
@@ -3483,7 +3430,7 @@ public boolean isMultiNodePlacementEnabled() {
34833430 }
34843431
34853432 public int getNumAsyncSchedulerThreads () {
3486- return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads . size ();
3433+ return asyncSchedulingConf . getNumAsyncSchedulerThreads ();
34873434 }
34883435
34893436 @ VisibleForTesting
@@ -3503,4 +3450,109 @@ public boolean placementConstraintEnabled() {
35033450 public void setQueueManager (CapacitySchedulerQueueManager qm ) {
35043451 this .queueManager = qm ;
35053452 }
3453+
3454+ @ VisibleForTesting
3455+ public List <AsyncScheduleThread > getAsyncSchedulerThreads () {
3456+ return asyncSchedulingConf .getAsyncSchedulerThreads ();
3457+ }
3458+
3459+ static class AsyncSchedulingConfiguration {
3460+ // timeout to join when we stop this service
3461+ private static final long THREAD_JOIN_TIMEOUT_MS = 1000 ;
3462+
3463+ @ VisibleForTesting
3464+ protected List <AsyncScheduleThread > asyncSchedulerThreads ;
3465+ private ResourceCommitterService resourceCommitterService ;
3466+
3467+ private long asyncScheduleInterval ;
3468+ private static final String ASYNC_SCHEDULER_INTERVAL =
3469+ CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_PREFIX
3470+ + ".scheduling-interval-ms" ;
3471+ private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5 ;
3472+ private long asyncMaxPendingBacklogs ;
3473+
3474+ private final boolean scheduleAsynchronously ;
3475+
3476+ AsyncSchedulingConfiguration (CapacitySchedulerConfiguration conf ,
3477+ CapacityScheduler cs ) {
3478+ this .scheduleAsynchronously = conf .getScheduleAynschronously ();
3479+ if (this .scheduleAsynchronously ) {
3480+ this .asyncScheduleInterval = conf .getLong (
3481+ CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_INTERVAL ,
3482+ CapacitySchedulerConfiguration .DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL );
3483+ // number of threads for async scheduling
3484+ int maxAsyncSchedulingThreads = conf .getInt (
3485+ CapacitySchedulerConfiguration .SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD ,
3486+ 1 );
3487+ maxAsyncSchedulingThreads = Math .max (maxAsyncSchedulingThreads , 1 );
3488+ this .asyncMaxPendingBacklogs = conf .getInt (
3489+ CapacitySchedulerConfiguration .
3490+ SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS ,
3491+ CapacitySchedulerConfiguration .
3492+ DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS );
3493+
3494+ this .asyncSchedulerThreads = new ArrayList <>();
3495+ for (int i = 0 ; i < maxAsyncSchedulingThreads ; i ++) {
3496+ asyncSchedulerThreads .add (new AsyncScheduleThread (cs ));
3497+ }
3498+ this .resourceCommitterService = new ResourceCommitterService (cs );
3499+ }
3500+ }
3501+ public boolean isScheduleAsynchronously () {
3502+ return scheduleAsynchronously ;
3503+ }
3504+ public long getAsyncScheduleInterval () {
3505+ return asyncScheduleInterval ;
3506+ }
3507+ public long getAsyncMaxPendingBacklogs () {
3508+ return asyncMaxPendingBacklogs ;
3509+ }
3510+
3511+ public void startThreads () {
3512+ if (scheduleAsynchronously ) {
3513+ Preconditions .checkNotNull (asyncSchedulerThreads ,
3514+ "asyncSchedulerThreads is null" );
3515+ for (Thread t : asyncSchedulerThreads ) {
3516+ t .start ();
3517+ }
3518+
3519+ resourceCommitterService .start ();
3520+ }
3521+ }
3522+
3523+ public void serviceStopInvoked () throws InterruptedException {
3524+ if (scheduleAsynchronously && asyncSchedulerThreads != null ) {
3525+ for (Thread t : asyncSchedulerThreads ) {
3526+ t .interrupt ();
3527+ t .join (THREAD_JOIN_TIMEOUT_MS );
3528+ }
3529+ resourceCommitterService .interrupt ();
3530+ resourceCommitterService .join (THREAD_JOIN_TIMEOUT_MS );
3531+ }
3532+ }
3533+
3534+ public void nodeRemoved (int numNodes ) {
3535+ if (scheduleAsynchronously && numNodes == 0 ) {
3536+ for (AsyncScheduleThread t : asyncSchedulerThreads ) {
3537+ t .suspendSchedule ();
3538+ }
3539+ }
3540+ }
3541+
3542+ public int getPendingBacklogs () {
3543+ if (scheduleAsynchronously ) {
3544+ return resourceCommitterService .getPendingBacklogs ();
3545+ }
3546+ return 0 ;
3547+ }
3548+
3549+ public int getNumAsyncSchedulerThreads () {
3550+ return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads .size ();
3551+ }
3552+
3553+ @ VisibleForTesting
3554+ public List <AsyncScheduleThread > getAsyncSchedulerThreads () {
3555+ return asyncSchedulerThreads ;
3556+ }
3557+ }
35063558}
0 commit comments