1717
1818package org .apache .spark .deploy .mesos
1919
20- import akka .actor .{Props , ActorSystem , Actor }
21-
22- import scala .collection .mutable .{ArrayBuffer , HashMap }
23- import scala .Some
24-
25- import org .apache .spark .deploy .rest .MesosRestServer
2620import org .apache .spark .{Logging , SecurityManager , SparkConf }
27- import org .apache .spark .util .{ActorLogReceive , AkkaUtils , IntParam , Utils }
28-
29- import org .apache .spark .deploy .DeployMessages ._
21+ import org .apache .spark .util .{IntParam , Utils }
3022
31- import org .apache .spark .deploy .DriverDescription
32- import org .apache .spark .deploy .master .DriverInfo
33- import org .apache .spark .deploy .master .DriverState .DriverState
34- import org .apache .spark .deploy .master .DriverState
35- import org .apache .spark .deploy .worker .DriverRunner
36-
37- import java .io .{IOException , File }
38- import java .util .Date
39- import java .text .SimpleDateFormat
23+ import java .io .File
4024import org .apache .spark .deploy .mesos .ui .MesosClusterUI
41- import org .apache .spark .deploy .mesos .Messages .{DispatcherStateResponse , RequestDispatcherState }
42-
43- private [deploy] object Messages {
44- case object RequestDispatcherState
45-
46- case class DispatcherStateResponse (
47- activeDrivers : Iterable [DriverInfo ],
48- completedDrivers : Iterable [DriverInfo ]) {
49- }
50- }
25+ import org .apache .spark .deploy .rest .MesosRestServer
26+ import org .apache .spark .scheduler .cluster .mesos .{ClusterScheduler , MesosClusterScheduler }
5127
5228/*
5329 * A dispatcher actor that is responsible for managing drivers, that is intended to
@@ -58,221 +34,46 @@ private [deploy] object Messages {
5834private [spark] class MesosClusterDispatcher (
5935 host : String ,
6036 serverPort : Int ,
61- actorPort : Int ,
6237 webUiPort : Int ,
63- systemName : String ,
64- actorName : String ,
6538 conf : SparkConf ,
66- masterUrl : String ,
67- workDirPath : Option [ String ] = None ) extends Actor with ActorLogReceive with Logging {
68- val server = new MesosRestServer (host, serverPort, self, conf, masterUrl )
39+ scheduler : ClusterScheduler ) extends Logging {
40+
41+ val server = new MesosRestServer (host, serverPort, conf, scheduler )
6942
7043 val dispatcherPublicAddress = {
7144 val envVar = System .getenv(" SPARK_PUBLIC_DNS" )
7245 if (envVar != null ) envVar else host
7346 }
7447
75- lazy val workDir : File = {
76- val dir = workDirPath.map(new File (_)).getOrElse(new File (sparkHome, " work" ))
77-
78- // Attempt to remove the work directory if it exists on startup.
79- // This is to avoid unbounded growing the work directory as drivers
80- // are only deleted when it is over the retained count while it's running.
81- // We don't fail startup if we are not able to remove, as this is
82- // a short-term solution.
83- try {
84- if (dir.exists()) {
85- dir.delete()
86- }
87- } catch {
88- case e : IOException =>
89- logError(" Unable to remove work directory " + workDir, e)
90- }
91-
92- try {
93- // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
94- // So attempting to create and then check if directory was created or not.
95- dir.mkdirs()
96- if (! dir.exists() || ! dir.isDirectory) {
97- logError(" Failed to create work directory " + dir)
98- System .exit(1 )
99- }
100- assert (dir.isDirectory)
101- } catch {
102- case e : Exception =>
103- logError(" Failed to create work directory " + dir, e)
104- System .exit(1 )
105- }
106- dir
107- }
108-
10948 val webUi = new MesosClusterUI (
110- self, new SecurityManager (conf), webUiPort, conf, workDir, dispatcherPublicAddress)
111-
112- val runners = new HashMap [String , DriverRunner ]
113- val drivers = new HashMap [String , DriverInfo ]
114- val completedDrivers = new ArrayBuffer [DriverInfo ]
115- val RETAINED_DRIVERS = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
116- var nextDriverNumber = 0
117- val securityManager = new SecurityManager (conf)
118-
119- def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" )
49+ new SecurityManager (conf), webUiPort, conf, dispatcherPublicAddress, scheduler)
12050
12151 val sparkHome =
12252 new File (sys.env.get(" SPARK_HOME" ).getOrElse(" ." ))
12353
124- val akkaUrl = AkkaUtils .address(
125- AkkaUtils .protocol(context.system),
126- systemName,
127- host,
128- actorPort,
129- actorName)
130-
131- def newDriverId (submitDate : Date ): String = {
132- val appId = " driver-%s-%04d" .format(createDateFormat.format(submitDate), nextDriverNumber)
133- nextDriverNumber += 1
134- appId
135- }
136-
137- def createDriver (desc : DriverDescription ): DriverInfo = {
138- val now = System .currentTimeMillis()
139- val date = new Date (now)
140- new DriverInfo (now, newDriverId(date), desc, date)
141- }
142-
143- override def preStart () {
54+ def start () {
14455 server.start()
14556 webUi.bind()
14657 }
14758
148- override def postStop () {
59+ def stop () {
14960 webUi.stop()
15061 server.stop()
151- runners.values.foreach(_.kill())
152- }
153-
154- override def receiveWithLogging = {
155- case RequestSubmitDriver (driverDescription) => {
156- val driverInfo = createDriver(driverDescription)
157- val runner = new DriverRunner (conf, driverInfo.id, workDir,
158- sparkHome, driverDescription, self, akkaUrl, securityManager)
159- runners(driverInfo.id) = runner
160- drivers(driverInfo.id) = driverInfo
161- runner.start()
162- sender ! SubmitDriverResponse (true , Option (driverInfo.id), " " )
163- }
164-
165- case RequestKillDriver (driverId) => {
166- if (! drivers.contains(driverId)) {
167- if (completedDrivers.exists(_.id == driverId)) {
168- sender ! KillDriverResponse (driverId, false , " Driver already completed" )
169- } else {
170- sender ! KillDriverResponse (driverId, false , " Unknown driver" )
171- }
172- } else {
173- runners(driverId).kill()
174- sender ! KillDriverResponse (driverId, true , " " )
175- }
176- }
177-
178- case RequestDriverStatus (driverId) => {
179- drivers.get(driverId).orElse(completedDrivers.find(_.id == driverId)) match {
180- case Some (driver) =>
181- sender ! DriverStatusResponse (found = true , Some (driver.state),
182- None , None , driver.exception)
183- case None =>
184- sender ! DriverStatusResponse (found = false , None , None , None , None )
185- }
186- }
187-
188- case DriverStateChanged (driverId, state, exception) => {
189- logDriverChange(driverId, state, exception)
190- state match {
191- case DriverState .ERROR | DriverState .FINISHED | DriverState .KILLED | DriverState .FAILED =>
192- removeDriver(driverId, state, exception)
193- case _ =>
194- throw new Exception (s " Received unexpected state update for driver $driverId: $state" )
195- }
196- }
197-
198- case RequestDispatcherState => {
199- sender ! DispatcherStateResponse (drivers.values, completedDrivers)
200- }
201- }
202-
203- def logDriverChange (driverId : String , state : DriverState , exception : Option [Exception ]) {
204- state match {
205- case DriverState .ERROR =>
206- logWarning(s " Driver $driverId failed with unrecoverable exception: ${exception.get}" )
207- case DriverState .FAILED =>
208- logWarning(s " Driver $driverId exited with failure " )
209- case DriverState .FINISHED =>
210- logInfo(s " Driver $driverId exited successfully " )
211- case DriverState .KILLED =>
212- logInfo(s " Driver $driverId was killed by user " )
213- case _ =>
214- logDebug(s " Driver $driverId changed state to $state" )
215- }
216- }
217-
218- def removeDriver (driverId : String , state : DriverState , exception : Option [Exception ]) {
219- if (completedDrivers.size >= RETAINED_DRIVERS ) {
220- val toRemove = math.max(RETAINED_DRIVERS / 10 , 1 )
221- for (i <- 0 to (toRemove - 1 )) {
222- val driverId = completedDrivers(i).id
223- try {
224- new File (workDir, driverId).delete()
225- } catch {
226- case e : Exception =>
227- logWarning(" Unable to remove work dir for completed driver " + driverId, e)
228- }
229- }
230- completedDrivers.trimStart(toRemove)
231- }
232- val driverInfo = drivers.remove(driverId).get
233- driverInfo.exception = exception
234- driverInfo.state = state
235- completedDrivers += driverInfo
23662 }
23763}
23864
23965object MesosClusterDispatcher {
24066 def main (args : Array [String ]) {
24167 val conf = new SparkConf
242- val clusterArgs = new ClusterDispatcherArguments (args, conf)
243- val actorSystem = startSystemAndActor(clusterArgs)
244- Runtime .getRuntime().addShutdownHook(new Thread (" MesosClusterDispatcherShutdownHook" ) {
245- override def run () = {
246- // Makes sure we shut down the actor, which will kill all the drivers.
247- actorSystem.shutdown()
248- actorSystem.awaitTermination()
249- }
250- })
251- actorSystem.awaitTermination()
252- }
253-
254- def startSystemAndActor (
255- args : ClusterDispatcherArguments ): ActorSystem = {
256- // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
257- val conf = new SparkConf
258- val systemName = " spark-mesos-cluster"
259- val actorName = " MesosClusterDispatcher"
260- val securityMgr = new SecurityManager (conf)
261- val (actorSystem, boundPort) = AkkaUtils .createActorSystem(
262- systemName, args.host, 0 , conf, securityMgr)
263- actorSystem.actorOf(
264- Props (classOf [MesosClusterDispatcher ],
265- args.host,
266- args.port,
267- boundPort,
268- args.webUiPort,
269- systemName,
270- actorName,
271- conf,
272- args.masterUrl,
273- None ),
274- name = actorName)
275- actorSystem
68+ val dispatcherArgs = new ClusterDispatcherArguments (args, conf)
69+ val scheduler = new MesosClusterScheduler (conf)
70+ scheduler.start()
71+ new MesosClusterDispatcher (
72+ dispatcherArgs.host,
73+ dispatcherArgs.port,
74+ dispatcherArgs.webUiPort,
75+ conf,
76+ scheduler).start()
27677 }
27778
27879 class ClusterDispatcherArguments (args : Array [String ], conf : SparkConf ) {
0 commit comments