1717
1818package org .apache .spark .scheduler .mesos
1919
20- import java .util .Date
20+ import java .util .{ Collection , Collections , Date }
2121
22- import org . scalatest . mock . MockitoSugar
22+ import scala . collection . JavaConverters . _
2323
24+ import org .apache .mesos .Protos .Value .{Scalar , Type }
25+ import org .apache .mesos .Protos ._
26+ import org .apache .mesos .SchedulerDriver
27+ import org .mockito .Mockito ._
28+ import org .mockito .{ArgumentCaptor , Matchers }
29+ import org .scalatest .mock .MockitoSugar
2430import org .apache .spark .deploy .Command
2531import org .apache .spark .deploy .mesos .MesosDriverDescription
2632import org .apache .spark .scheduler .cluster .mesos ._
@@ -29,7 +35,7 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
2935
3036class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
3137
32- private val command = new Command (" mainClass" , Seq (" arg" ), null , null , null , null )
38+ private val command = new Command (" mainClass" , Seq (" arg" ), Map (), Seq (), Seq (), Seq () )
3339
3440 test(" can queue drivers" ) {
3541 val conf = new SparkConf ()
@@ -72,4 +78,56 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
7278 val state = scheduler.getSchedulerState()
7379 assert(state.queuedDrivers.isEmpty)
7480 }
81+
82+ test(" can handle multiple roles" ) {
83+ val conf = new SparkConf ()
84+ conf.setMaster(" mesos://localhost:5050" )
85+ conf.setAppName(" spark mesos" )
86+ val scheduler = new MesosClusterScheduler (
87+ new BlackHoleMesosClusterPersistenceEngineFactory , conf) {
88+ override def start (): Unit = { ready = true }
89+ }
90+ scheduler.start()
91+ val driver = mock[SchedulerDriver ]
92+ val response = scheduler.submitDriver(
93+ new MesosDriverDescription (" d1" , " jar" , 1500 , 1 , true ,
94+ command,
95+ Map ((" spark.mesos.executor.home" , " test" ), (" spark.app.name" , " test" )),
96+ " s1" ,
97+ new Date ()))
98+ assert(response.success)
99+ val offer = Offer .newBuilder()
100+ .addResources(
101+ Resource .newBuilder().setRole(" *" )
102+ .setScalar(Scalar .newBuilder().setValue(1 ).build()).setName(" cpus" ).setType(Type .SCALAR ))
103+ .addResources(
104+ Resource .newBuilder().setRole(" *" )
105+ .setScalar(Scalar .newBuilder().setValue(1000 ).build()).setName(" mem" ).setType(Type .SCALAR ))
106+ .addResources(
107+ Resource .newBuilder().setRole(" role2" )
108+ .setScalar(Scalar .newBuilder().setValue(1 ).build()).setName(" cpus" ).setType(Type .SCALAR ))
109+ .addResources(
110+ Resource .newBuilder().setRole(" role2" )
111+ .setScalar(Scalar .newBuilder().setValue(500 ).build()).setName(" mem" ).setType(Type .SCALAR ))
112+ .setId(OfferID .newBuilder().setValue(" o1" ).build())
113+ .setFrameworkId(FrameworkID .newBuilder().setValue(" f1" ).build())
114+ .setSlaveId(SlaveID .newBuilder().setValue(" s1" ).build())
115+ .setHostname(" host1" )
116+ .build()
117+
118+ val capture = ArgumentCaptor .forClass(classOf [Collection [TaskInfo ]])
119+
120+ when(
121+ driver.launchTasks(
122+ Matchers .eq(Collections .singleton(offer.getId)),
123+ capture.capture())
124+ ).thenReturn(Status .valueOf(1 ))
125+
126+ scheduler.resourceOffers(driver, List (offer).asJava)
127+
128+ verify(driver, times(1 )).launchTasks(
129+ Matchers .eq(Collections .singleton(offer.getId)),
130+ capture.capture()
131+ )
132+ }
75133}
0 commit comments