1616 */
1717package org .apache .spark .scheduler .cluster .kubernetes
1818
19- import java .util .concurrent .ScheduledExecutorService
19+ import java .util .concurrent .{ ExecutorService , ScheduledExecutorService , TimeUnit }
2020
2121import io .fabric8 .kubernetes .api .model .{DoneablePod , Pod , PodBuilder , PodList }
2222import io .fabric8 .kubernetes .client .{KubernetesClient , Watch , Watcher }
23- import io .fabric8 .kubernetes .client .dsl .{FilterWatchListDeletable , MixedOperation , NonNamespaceOperation , PodResource , Resource }
24- import org .mockito .{ArgumentCaptor , Mock , MockitoAnnotations }
25- import org .mockito .Mockito .{verify , when }
23+ import io .fabric8 .kubernetes .client .dsl .{FilterWatchListDeletable , MixedOperation , NonNamespaceOperation , PodResource }
24+ import org .mockito .{AdditionalAnswers , ArgumentCaptor , Mock , MockitoAnnotations }
25+ import org .mockito .Matchers .{any , eq => mockitoEq }
26+ import org .mockito .Mockito .{doNothing , never , verify , when }
2627import org .scalatest .BeforeAndAfter
28+ import org .scalatest .mock .MockitoSugar ._
29+ import scala .collection .JavaConverters ._
2730
2831import org .apache .spark .{SparkConf , SparkContext , SparkFunSuite }
2932import org .apache .spark .deploy .kubernetes .config ._
3033import org .apache .spark .deploy .kubernetes .constants ._
31- import org .apache .spark .rpc .RpcEnv
32- import org .apache .spark .scheduler .TaskSchedulerImpl
34+ import org .apache .spark .rpc .{RpcCallContext , RpcEndpoint , RpcEndpointAddress , RpcEndpointRef , RpcEnv }
35+ import org .apache .spark .scheduler .{LiveListenerBus , TaskSchedulerImpl }
36+ import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .RegisterExecutor
37+ import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
3338
3439private [spark] class KubernetesClusterSchedulerBackendSuite
3540 extends SparkFunSuite with BeforeAndAfter {
@@ -39,22 +44,31 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
3944 private val NAMESPACE = " test-namespace"
4045 private val SPARK_DRIVER_HOST = " localhost"
4146 private val SPARK_DRIVER_PORT = 7077
47+ private val POD_ALLOCATION_INTERVAL = 60L
48+ private val DRIVER_URL = RpcEndpointAddress (
49+ SPARK_DRIVER_HOST , SPARK_DRIVER_PORT , CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
4250
4351 private type PODS = MixedOperation [Pod , PodList , DoneablePod , PodResource [Pod , DoneablePod ]]
4452 private type LABELLED_PODS = FilterWatchListDeletable [
45- Pod , PodList , java.lang.Boolean , Watch , Watcher [Pod ]]
53+ Pod , PodList , java.lang.Boolean , Watch , Watcher [Pod ]]
4654 private type IN_NAMESPACE_PODS = NonNamespaceOperation [
47- Pod , PodList , DoneablePod , PodResource [Pod , DoneablePod ]]
55+ Pod , PodList , DoneablePod , PodResource [Pod , DoneablePod ]]
4856
4957 @ Mock
5058 private var sparkContext : SparkContext = _
5159
60+ @ Mock
61+ private var listenerBus : LiveListenerBus = _
62+
5263 @ Mock
5364 private var taskSchedulerImpl : TaskSchedulerImpl = _
5465
5566 @ Mock
5667 private var allocatorExecutor : ScheduledExecutorService = _
5768
69+ @ Mock
70+ private var requestExecutorsService : ExecutorService = _
71+
5872 @ Mock
5973 private var executorPodFactory : ExecutorPodFactory = _
6074
@@ -79,11 +93,17 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
7993 @ Mock
8094 private var rpcEnv : RpcEnv = _
8195
96+ @ Mock
97+ private var driverEndpointRef : RpcEndpointRef = _
98+
8299 @ Mock
83100 private var executorPodsWatch : Watch = _
84101
85102 private var sparkConf : SparkConf = _
86103 private var executorPodsWatcherArgument : ArgumentCaptor [Watcher [Pod ]] = _
104+ private var allocatorRunnable : ArgumentCaptor [Runnable ] = _
105+ private var requestExecutorRunnable : ArgumentCaptor [Runnable ] = _
106+ private var driverEndpoint : ArgumentCaptor [RpcEndpoint ] = _
87107
88108 private val driverPod = new PodBuilder ()
89109 .withNewMetadata()
@@ -101,8 +121,13 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
101121 .set(KUBERNETES_NAMESPACE , NAMESPACE )
102122 .set(" spark.driver.host" , SPARK_DRIVER_HOST )
103123 .set(" spark.driver.port" , SPARK_DRIVER_PORT .toString)
124+ .set(KUBERNETES_ALLOCATION_BATCH_DELAY , POD_ALLOCATION_INTERVAL )
104125 executorPodsWatcherArgument = ArgumentCaptor .forClass(classOf [Watcher [Pod ]])
126+ allocatorRunnable = ArgumentCaptor .forClass(classOf [Runnable ])
127+ requestExecutorRunnable = ArgumentCaptor .forClass(classOf [Runnable ])
128+ driverEndpoint = ArgumentCaptor .forClass(classOf [RpcEndpoint ])
105129 when(sparkContext.conf).thenReturn(sparkConf)
130+ when(sparkContext.listenerBus).thenReturn(listenerBus)
106131 when(taskSchedulerImpl.sc).thenReturn(sparkContext)
107132 when(kubernetesClient.pods()).thenReturn(podOperations)
108133 when(podOperations.withLabel(SPARK_APP_ID_LABEL , APP_ID )).thenReturn(podsWithLabelOperations)
@@ -111,26 +136,164 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
111136 when(podOperations.inNamespace(NAMESPACE )).thenReturn(podsInNamespace)
112137 when(podsInNamespace.withName(DRIVER_POD_NAME )).thenReturn(podsWithDriverName)
113138 when(podsWithDriverName.get()).thenReturn(driverPod)
139+ when(allocatorExecutor.scheduleWithFixedDelay(
140+ allocatorRunnable.capture(),
141+ mockitoEq(0L ),
142+ mockitoEq(POD_ALLOCATION_INTERVAL ),
143+ mockitoEq(TimeUnit .SECONDS ))).thenReturn(null )
144+ // Creating Futures in Scala backed by a Java executor service resolves to running
145+ // ExecutorService#execute (as opposed to submit)
146+ doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
147+ when(rpcEnv.setupEndpoint(
148+ mockitoEq(CoarseGrainedSchedulerBackend .ENDPOINT_NAME ), driverEndpoint.capture()))
149+ .thenReturn(driverEndpointRef)
114150 }
115151
116152 test(" Basic lifecycle expectations when starting and stopping the scheduler." ) {
117153 val scheduler = newSchedulerBackend(true )
118154 scheduler.start()
119155 verify(shuffleManager).start(APP_ID )
120156 assert(executorPodsWatcherArgument.getValue != null )
157+ assert(allocatorRunnable.getValue != null )
121158 scheduler.stop()
122159 verify(shuffleManager).stop()
123160 verify(executorPodsWatch).close()
124161 }
125162
163+ test(" Static allocation should request executors upon first allocator run." ) {
164+ sparkConf
165+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 2 )
166+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 2 )
167+ val scheduler = newSchedulerBackend(true )
168+ scheduler.start()
169+ requestExecutorRunnable.getValue.run()
170+ val firstExecutorPod = new PodBuilder ()
171+ .withNewMetadata()
172+ .withName(" pod1" )
173+ .endMetadata()
174+ .build()
175+ val secondExecutorPod = new PodBuilder ()
176+ .withNewMetadata()
177+ .withName(" pod2" )
178+ .endMetadata()
179+ .build()
180+ when(podOperations.create(firstExecutorPod))
181+ .thenReturn(firstExecutorPod)
182+ when(podOperations.create(secondExecutorPod))
183+ .thenReturn(secondExecutorPod)
184+ when(executorPodFactory.createExecutorPod(
185+ " 1" ,
186+ APP_ID ,
187+ DRIVER_URL ,
188+ sparkConf.getExecutorEnv,
189+ driverPod,
190+ Map .empty)).thenReturn(firstExecutorPod)
191+ when(executorPodFactory.createExecutorPod(
192+ " 2" ,
193+ APP_ID ,
194+ DRIVER_URL ,
195+ sparkConf.getExecutorEnv,
196+ driverPod,
197+ Map .empty)).thenReturn(secondExecutorPod)
198+ allocatorRunnable.getValue.run()
199+ verify(podOperations).create(firstExecutorPod)
200+ verify(podOperations).create(secondExecutorPod)
201+ }
202+
203+ test(" Killing executors deletes the executor pods" ) {
204+ sparkConf
205+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 2 )
206+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 2 )
207+ val scheduler = newSchedulerBackend(true )
208+ scheduler.start()
209+ requestExecutorRunnable.getValue.run()
210+ val firstExecutorPod = new PodBuilder ()
211+ .withNewMetadata()
212+ .withName(" pod1" )
213+ .endMetadata()
214+ .build()
215+ val secondExecutorPod = new PodBuilder ()
216+ .withNewMetadata()
217+ .withName(" pod2" )
218+ .endMetadata()
219+ .build()
220+ when(podOperations.create(any(classOf [Pod ])))
221+ .thenAnswer(AdditionalAnswers .returnsFirstArg())
222+ when(executorPodFactory.createExecutorPod(
223+ " 1" ,
224+ APP_ID ,
225+ DRIVER_URL ,
226+ sparkConf.getExecutorEnv,
227+ driverPod,
228+ Map .empty)).thenReturn(firstExecutorPod)
229+ when(executorPodFactory.createExecutorPod(
230+ " 2" ,
231+ APP_ID ,
232+ DRIVER_URL ,
233+ sparkConf.getExecutorEnv,
234+ driverPod,
235+ Map .empty)).thenReturn(secondExecutorPod)
236+ allocatorRunnable.getValue.run()
237+ scheduler.doKillExecutors(Seq (" 1" , " 2" ))
238+ requestExecutorRunnable.getAllValues.asScala.last.run()
239+ verify(podOperations).delete(firstExecutorPod)
240+ verify(podOperations).delete(secondExecutorPod)
241+ }
242+
243+ test(" Executors should be requested in batches." ) {
244+ sparkConf
245+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
246+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 2 )
247+ val scheduler = newSchedulerBackend(true )
248+ scheduler.start()
249+ requestExecutorRunnable.getValue.run()
250+ val firstExecutorPod = new PodBuilder ()
251+ .withNewMetadata()
252+ .withName(" pod1" )
253+ .endMetadata()
254+ .build()
255+ val secondExecutorPod = new PodBuilder ()
256+ .withNewMetadata()
257+ .withName(" pod2" )
258+ .endMetadata()
259+ .build()
260+ when(podOperations.create(any(classOf [Pod ])))
261+ .thenAnswer(AdditionalAnswers .returnsFirstArg())
262+ when(executorPodFactory.createExecutorPod(
263+ " 1" ,
264+ APP_ID ,
265+ DRIVER_URL ,
266+ sparkConf.getExecutorEnv,
267+ driverPod,
268+ Map .empty)).thenReturn(firstExecutorPod)
269+ when(executorPodFactory.createExecutorPod(
270+ " 2" ,
271+ APP_ID ,
272+ DRIVER_URL ,
273+ sparkConf.getExecutorEnv,
274+ driverPod,
275+ Map .empty)).thenReturn(secondExecutorPod)
276+ allocatorRunnable.getValue.run()
277+ verify(podOperations).create(firstExecutorPod)
278+ verify(podOperations, never()).create(secondExecutorPod)
279+ val registerFirstExecutorMessage = RegisterExecutor (
280+ " 1" , mock[RpcEndpointRef ], " localhost" , 1 , Map .empty[String , String ])
281+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq .empty)
282+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext ])
283+ .apply(registerFirstExecutorMessage)
284+ allocatorRunnable.getValue.run()
285+ verify(podOperations).create(secondExecutorPod)
286+ }
287+
126288 private def newSchedulerBackend (externalShuffle : Boolean ): KubernetesClusterSchedulerBackend = {
127289 new KubernetesClusterSchedulerBackend (
128290 taskSchedulerImpl,
129291 rpcEnv,
130292 executorPodFactory,
131293 if (externalShuffle) Some (shuffleManager) else None ,
132294 kubernetesClient,
133- allocatorExecutor)
295+ allocatorExecutor,
296+ requestExecutorsService)
134297 }
135298
136299}
0 commit comments