@@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
314314 assert(executorsPendingToRemove(manager).isEmpty)
315315 }
316316
317+ test (" Removing with various numExecutorsTarget condition" ) {
318+ sc = createSparkContext(5 , 12 , 5 )
319+ val manager = sc.executorAllocationManager.get
320+
321+ sc.listenerBus.postToAll(SparkListenerStageSubmitted (createStageInfo(0 , 8 )))
322+
323+ // Remove when numExecutorsTarget is the same as the current number of executors
324+ assert(addExecutors(manager) === 1 )
325+ assert(addExecutors(manager) === 2 )
326+ (1 to 8 ).map { i => createTaskInfo(i, i, s " $i" ) }.foreach {
327+ info => sc.listenerBus.postToAll(SparkListenerTaskStart (0 , 0 , info)) }
328+ assert(executorIds(manager).size === 8 )
329+ assert(numExecutorsTarget(manager) === 8 )
330+ assert(maxNumExecutorsNeeded(manager) == 8 )
331+ assert(! removeExecutor(manager, " 1" )) // won't work since numExecutorsTarget == numExecutors
332+
333+ // Remove executors when numExecutorsTarget is lower than current number of executors
334+ (1 to 3 ).map { i => createTaskInfo(i, i, s " $i" ) }.foreach {
335+ info => sc.listenerBus.postToAll(SparkListenerTaskEnd (0 , 0 , null , Success , info, null )) }
336+ adjustRequestedExecutors(manager)
337+ assert(executorIds(manager).size === 8 )
338+ assert(numExecutorsTarget(manager) === 5 )
339+ assert(maxNumExecutorsNeeded(manager) == 5 )
340+ assert(removeExecutor(manager, " 1" ))
341+ assert(removeExecutors(manager, Seq (" 2" , " 3" ))=== Seq (" 2" , " 3" ))
342+ onExecutorRemoved(manager, " 1" )
343+ onExecutorRemoved(manager, " 2" )
344+ onExecutorRemoved(manager, " 3" )
345+
346+ // numExecutorsTarget is lower than minNumExecutors
347+ sc.listenerBus.postToAll(
348+ SparkListenerTaskEnd (0 , 0 , null , Success , createTaskInfo(4 , 4 , " 4" ), null ))
349+ assert(executorIds(manager).size === 5 )
350+ assert(numExecutorsTarget(manager) === 5 )
351+ assert(maxNumExecutorsNeeded(manager) == 4 )
352+ assert(! removeExecutor(manager, " 4" )) // lower limit
353+ assert(addExecutors(manager) === 0 ) // upper limit
354+ }
355+
317356 test (" interleaving add and remove" ) {
318- sc = createSparkContext(5 , 10 , 5 )
357+ sc = createSparkContext(5 , 12 , 5 )
319358 val manager = sc.executorAllocationManager.get
320359 sc.listenerBus.postToAll(SparkListenerStageSubmitted (createStageInfo(0 , 1000 )))
321360
@@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
331370 onExecutorAdded(manager, " 7" )
332371 onExecutorAdded(manager, " 8" )
333372 assert(executorIds(manager).size === 8 )
373+ assert(numExecutorsTarget(manager) === 8 )
334374
335- // Remove until limit
336- assert(removeExecutor(manager, " 1" ))
337- assert(removeExecutors(manager, Seq (" 2" , " 3" )) === Seq (" 2" , " 3" ))
338- assert(! removeExecutor(manager, " 4" )) // lower limit reached
339- assert(! removeExecutor(manager, " 5" ))
340- onExecutorRemoved(manager, " 1" )
341- onExecutorRemoved(manager, " 2" )
342- onExecutorRemoved(manager, " 3" )
343- assert(executorIds(manager).size === 5 )
344375
345- // Add until limit
346- assert(addExecutors (manager) === 2 ) // upper limit reached
347- assert(addExecutors (manager) === 0 )
348- assert( ! removeExecutor(manager, " 4 " )) // still at lower limit
349- assert((manager, Seq ( " 5 " )) !== Seq ( " 5 " ))
376+ // Remove when numTargetExecutors is equal to the current number of executors
377+ assert(! removeExecutor (manager, " 1 " ))
378+ assert(removeExecutors (manager, Seq ( " 2 " , " 3 " )) !== Seq ( " 2 " , " 3 " ) )
379+
380+ // Remove until limit
350381 onExecutorAdded(manager, " 9" )
351382 onExecutorAdded(manager, " 10" )
352383 onExecutorAdded(manager, " 11" )
353384 onExecutorAdded(manager, " 12" )
354- onExecutorAdded( manager, " 13 " )
355- assert(executorIds (manager).size === 10 )
385+ assert(executorIds( manager).size === 12 )
386+ assert(numExecutorsTarget (manager) === 8 )
356387
357- // Remove succeeds again, now that we are no longer at the lower limit
358- assert(removeExecutors(manager, Seq (" 4" , " 5" , " 6" )) === Seq (" 4" , " 5" , " 6" ))
359- assert(removeExecutor(manager, " 7" ))
360- assert(executorIds(manager).size === 10 )
361- assert(addExecutors(manager) === 0 )
388+ assert(removeExecutor(manager, " 1" ))
389+ assert(removeExecutors(manager, Seq (" 2" , " 3" , " 4" )) === Seq (" 2" , " 3" , " 4" ))
390+ assert(! removeExecutor(manager, " 5" )) // lower limit reached
391+ assert(! removeExecutor(manager, " 6" ))
392+ onExecutorRemoved(manager, " 1" )
393+ onExecutorRemoved(manager, " 2" )
394+ onExecutorRemoved(manager, " 3" )
362395 onExecutorRemoved(manager, " 4" )
363- onExecutorRemoved(manager, " 5" )
364396 assert(executorIds(manager).size === 8 )
365397
366- // Number of executors pending restarts at 1
367- assert(numExecutorsToAdd(manager) === 1 )
368- assert(addExecutors(manager) === 0 )
369- assert(executorIds(manager).size === 8 )
370- onExecutorRemoved(manager, " 6" )
371- onExecutorRemoved(manager, " 7" )
398+ // Add until limit
399+ assert(! removeExecutor(manager, " 7" )) // still at lower limit
400+ assert((manager, Seq (" 8" )) !== Seq (" 8" ))
401+ onExecutorAdded(manager, " 13" )
372402 onExecutorAdded(manager, " 14" )
373403 onExecutorAdded(manager, " 15" )
374- assert(executorIds(manager).size === 8 )
375- assert(addExecutors(manager) === 0 ) // still at upper limit
376404 onExecutorAdded(manager, " 16" )
405+ assert(executorIds(manager).size === 12 )
406+
407+ // Remove succeeds again, now that we are no longer at the lower limit
408+ assert(removeExecutors(manager, Seq (" 5" , " 6" , " 7" )) === Seq (" 5" , " 6" , " 7" ))
409+ assert(removeExecutor(manager, " 8" ))
410+ assert(executorIds(manager).size === 12 )
411+ onExecutorRemoved(manager, " 5" )
412+ onExecutorRemoved(manager, " 6" )
413+ assert(executorIds(manager).size === 10 )
414+ assert(numExecutorsToAdd(manager) === 4 )
415+ onExecutorRemoved(manager, " 9" )
416+ onExecutorRemoved(manager, " 10" )
417+ assert(addExecutors(manager) === 4 ) // at upper limit
377418 onExecutorAdded(manager, " 17" )
419+ onExecutorAdded(manager, " 18" )
378420 assert(executorIds(manager).size === 10 )
379- assert(numExecutorsTarget(manager) === 10 )
421+ assert(addExecutors(manager) === 0 ) // still at upper limit
422+ onExecutorAdded(manager, " 19" )
423+ onExecutorAdded(manager, " 20" )
424+ assert(executorIds(manager).size === 12 )
425+ assert(numExecutorsTarget(manager) === 12 )
380426 }
381427
382428 test(" starting/canceling add timer" ) {
@@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
915961 onExecutorAdded(manager, " third" )
916962 onExecutorAdded(manager, " fourth" )
917963 onExecutorAdded(manager, " fifth" )
918- assert(executorIds(manager) === Set (" first" , " second" , " third" , " fourth" , " fifth" ))
964+ onExecutorAdded(manager, " sixth" )
965+ onExecutorAdded(manager, " seventh" )
966+ onExecutorAdded(manager, " eighth" )
967+ assert(executorIds(manager) === Set (" first" , " second" , " third" , " fourth" , " fifth" ,
968+ " sixth" , " seventh" , " eighth" ))
919969
920970 removeExecutor(manager, " first" )
921971 removeExecutors(manager, Seq (" second" , " third" ))
922972 assert(executorsPendingToRemove(manager) === Set (" first" , " second" , " third" ))
923- assert(executorIds(manager) === Set (" first" , " second" , " third" , " fourth" , " fifth" ))
973+ assert(executorIds(manager) === Set (" first" , " second" , " third" , " fourth" , " fifth" ,
974+ " sixth" , " seventh" , " eighth" ))
924975
925976
926977 // Cluster manager lost will make all the live executors lost, so here simulate this behavior
0 commit comments