1717
1818package org .apache .spark .scheduler .mesos
1919
20- import org .apache .spark .executor .MesosExecutorBackend
21- import org .scalatest .FunSuite
22- import org .apache .spark .{SparkConf , SparkContext , LocalSparkContext }
23- import org .apache .spark .scheduler .{SparkListenerExecutorAdded , LiveListenerBus ,
24- TaskDescription , WorkerOffer , TaskSchedulerImpl }
25- import org .apache .spark .scheduler .cluster .ExecutorInfo
26- import org .apache .spark .scheduler .cluster .mesos .{MemoryUtils , MesosSchedulerBackend }
27- import org .apache .mesos .SchedulerDriver
28- import org .apache .mesos .Protos .{ExecutorInfo => MesosExecutorInfo , _ }
29- import org .apache .mesos .Protos .Value .Scalar
30- import org .easymock .{Capture , EasyMock }
3120import java .nio .ByteBuffer
3221import java .util .Collections
3322import java .util
34- import org .scalatest .mock .EasyMockSugar
3523
3624import scala .collection .mutable
3725import scala .collection .mutable .ArrayBuffer
3826
39- class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
27+ import org .scalatest .{BeforeAndAfter , FunSuite }
28+ import org .scalatest .mock .MockitoSugar
29+ import org .apache .mesos .SchedulerDriver
30+ import org .apache .mesos .Protos ._
31+ import org .apache .mesos .Protos .Value .Scalar
32+ import org .mockito .Mockito ._
33+ import org .mockito .Matchers ._
34+ import org .mockito .{Matchers , ArgumentCaptor }
35+
36+ import org .apache .spark .{SparkConf , SparkContext , LocalSparkContext }
37+ import org .apache .spark .executor .MesosExecutorBackend
38+ import org .apache .spark .scheduler .{SparkListenerExecutorAdded , LiveListenerBus , TaskDescription ,
39+ WorkerOffer , TaskSchedulerImpl }
40+ import org .apache .spark .scheduler .cluster .ExecutorInfo
41+ import org .apache .spark .scheduler .cluster .mesos .{MemoryUtils , MesosSchedulerBackend }
42+
43+ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {
4044
4145 test(" check spark-class location correctly" ) {
4246 val conf = new SparkConf
4347 conf.set(" spark.mesos.executor.home" , " /mesos-home" )
4448
45- val listenerBus = EasyMock .createMock(classOf [LiveListenerBus ])
46- listenerBus.post(SparkListenerExecutorAdded (EasyMock .anyLong, " s1" , new ExecutorInfo (" host1" , 2 , Map .empty)))
47- EasyMock .replay(listenerBus)
48-
49- val sc = EasyMock .createMock(classOf [SparkContext ])
50- EasyMock .expect(sc.getSparkHome()).andReturn(Option (" /spark-home" )).anyTimes()
51- EasyMock .expect(sc.conf).andReturn(conf).anyTimes()
52- EasyMock .expect(sc.executorEnvs).andReturn(new mutable.HashMap ).anyTimes()
53- EasyMock .expect(sc.executorMemory).andReturn(100 ).anyTimes()
54- EasyMock .expect(sc.listenerBus).andReturn(listenerBus)
55- EasyMock .replay(sc)
56- val taskScheduler = EasyMock .createMock(classOf [TaskSchedulerImpl ])
57- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
58- EasyMock .replay(taskScheduler)
49+ val listenerBus = mock[LiveListenerBus ]
50+ listenerBus.post(
51+ SparkListenerExecutorAdded (anyLong, " s1" , new ExecutorInfo (" host1" , 2 , Map .empty)))
52+
53+ val sc = mock[SparkContext ]
54+ when(sc.getSparkHome()).thenReturn(Option (" /spark-home" ))
55+
56+ when(sc.conf).thenReturn(conf)
57+ when(sc.executorEnvs).thenReturn(new mutable.HashMap [String , String ])
58+ when(sc.executorMemory).thenReturn(100 )
59+ when(sc.listenerBus).thenReturn(listenerBus)
60+ val taskScheduler = mock[TaskSchedulerImpl ]
61+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
5962
6063 val mesosSchedulerBackend = new MesosSchedulerBackend (taskScheduler, sc, " master" )
6164
@@ -84,20 +87,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
8487 .setSlaveId(SlaveID .newBuilder().setValue(s " s ${id.toString}" )).setHostname(s " host ${id.toString}" ).build()
8588 }
8689
87- val driver = EasyMock .createMock( classOf [SchedulerDriver ])
88- val taskScheduler = EasyMock .createMock( classOf [TaskSchedulerImpl ])
90+ val driver = mock [SchedulerDriver ]
91+ val taskScheduler = mock [TaskSchedulerImpl ]
8992
90- val listenerBus = EasyMock .createMock( classOf [LiveListenerBus ])
91- listenerBus.post(SparkListenerExecutorAdded ( EasyMock .anyLong, " s1 " , new ExecutorInfo ( " host1 " , 2 , Map .empty)))
92- EasyMock .replay(listenerBus )
93+ val listenerBus = mock [LiveListenerBus ]
94+ listenerBus.post(
95+ SparkListenerExecutorAdded (anyLong, " s1 " , new ExecutorInfo ( " host1 " , 2 , Map .empty)) )
9396
94- val sc = EasyMock .createMock(classOf [SparkContext ])
95- EasyMock .expect(sc.executorMemory).andReturn(100 ).anyTimes()
96- EasyMock .expect(sc.getSparkHome()).andReturn(Option (" /path" )).anyTimes()
97- EasyMock .expect(sc.executorEnvs).andReturn(new mutable.HashMap ).anyTimes()
98- EasyMock .expect(sc.conf).andReturn(new SparkConf ).anyTimes()
99- EasyMock .expect(sc.listenerBus).andReturn(listenerBus)
100- EasyMock .replay(sc)
97+ val sc = mock[SparkContext ]
98+ when(sc.executorMemory).thenReturn(100 )
99+ when(sc.getSparkHome()).thenReturn(Option (" /path" ))
100+ when(sc.executorEnvs).thenReturn(new mutable.HashMap [String , String ])
101+ when(sc.conf).thenReturn(new SparkConf )
102+ when(sc.listenerBus).thenReturn(listenerBus)
101103
102104 val minMem = MemoryUtils .calculateTotalMemory(sc).toInt
103105 val minCpu = 4
@@ -121,25 +123,24 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
121123 2
122124 ))
123125 val taskDesc = new TaskDescription (1L , 0 , " s1" , " n1" , 0 , ByteBuffer .wrap(new Array [Byte ](0 )))
124- EasyMock .expect(taskScheduler.resourceOffers(EasyMock .eq(expectedWorkerOffers))).andReturn(Seq (Seq (taskDesc)))
125- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
126- EasyMock .replay(taskScheduler)
126+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq (Seq (taskDesc)))
127+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
127128
128- val capture = new Capture [util.Collection [TaskInfo ]]
129- EasyMock .expect (
129+ val capture = ArgumentCaptor .forClass( classOf [util.Collection [TaskInfo ]])
130+ when (
130131 driver.launchTasks(
131- EasyMock .eq(Collections .singleton(mesosOffers.get(0 ).getId)),
132- EasyMock .capture(capture ),
133- EasyMock .anyObject (classOf [Filters ])
132+ Matchers .eq(Collections .singleton(mesosOffers.get(0 ).getId)),
133+ capture .capture(),
134+ any (classOf [Filters ])
134135 )
135- ).andReturn(Status .valueOf(1 )).once
136- EasyMock .expect(driver.declineOffer(mesosOffers.get(1 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
137- EasyMock .expect(driver.declineOffer(mesosOffers.get(2 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
138- EasyMock .replay(driver)
136+ ).thenReturn(Status .valueOf(1 ))
137+ when(driver.declineOffer(mesosOffers.get(1 ).getId)).thenReturn(Status .valueOf(1 ))
138+ when(driver.declineOffer(mesosOffers.get(2 ).getId)).thenReturn(Status .valueOf(1 ))
139139
140140 backend.resourceOffers(driver, mesosOffers)
141141
142- EasyMock .verify(driver)
142+ verify(driver, times(1 )).declineOffer(mesosOffers.get(1 ).getId)
143+ verify(driver, times(1 )).declineOffer(mesosOffers.get(2 ).getId)
143144 assert(capture.getValue.size() == 1 )
144145 val taskInfo = capture.getValue.iterator().next()
145146 assert(taskInfo.getName.equals(" n1" ))
@@ -151,15 +152,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
151152 // Unwanted resources offered on an existing node. Make sure they are declined
152153 val mesosOffers2 = new java.util.ArrayList [Offer ]
153154 mesosOffers2.add(createOffer(1 , minMem, minCpu))
154- EasyMock .reset(taskScheduler)
155- EasyMock .reset(driver)
156- EasyMock .expect(taskScheduler.resourceOffers(EasyMock .anyObject(classOf [Seq [WorkerOffer ]])).andReturn(Seq (Seq ())))
157- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
158- EasyMock .replay(taskScheduler)
159- EasyMock .expect(driver.declineOffer(mesosOffers2.get(0 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
160- EasyMock .replay(driver)
155+ reset(taskScheduler)
156+ reset(driver)
157+ when(taskScheduler.resourceOffers(any(classOf [Seq [WorkerOffer ]]))).thenReturn(Seq (Seq ()))
158+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
159+ when(driver.declineOffer(mesosOffers2.get(0 ).getId)).thenReturn(Status .valueOf(1 ))
161160
162161 backend.resourceOffers(driver, mesosOffers2)
163- EasyMock . verify(driver)
162+ verify(driver, times( 1 )).declineOffer(mesosOffers2.get( 0 ).getId )
164163 }
165164}
0 commit comments