@@ -37,46 +37,52 @@ import org.apache.spark.deploy.worker.DriverRunner
3737import java .io .{IOException , File }
3838import java .util .Date
3939import java .text .SimpleDateFormat
40- import scala .collection .mutable
40+ import org .apache .spark .deploy .mesos .ui .MesosClusterUI
41+ import org .apache .spark .deploy .mesos .Messages .{DispatcherStateResponse , RequestDispatcherState }
42+
43+ private [deploy] object Messages {
44+ case object RequestDispatcherState
45+
46+ case class DispatcherStateResponse (
47+ activeDrivers : Iterable [DriverInfo ],
48+ completedDrivers : Iterable [DriverInfo ]) {
49+ }
50+ }
4151
4252/*
4353 * A dispatcher actor that is responsible for managing drivers, that is intended to
4454 * used for Mesos cluster mode.
4555 * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
4656 * a daemon to launch drivers as Mesos frameworks upon request.
4757 */
48- class MesosClusterDispatcher (
58+ private [spark] class MesosClusterDispatcher (
4959 host : String ,
5060 serverPort : Int ,
5161 actorPort : Int ,
62+ webUiPort : Int ,
5263 systemName : String ,
5364 actorName : String ,
5465 conf : SparkConf ,
5566 masterUrl : String ,
5667 workDirPath : Option [String ] = None ) extends Actor with ActorLogReceive with Logging {
5768 val server = new MesosRestServer (host, serverPort, self, conf, masterUrl)
5869
59- val runners = new HashMap [String , DriverRunner ]
60- val drivers = new HashMap [String , DriverInfo ]
61- val completedDrivers = new ArrayBuffer [DriverInfo ]
62- val RETAINED_DRIVERS = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
63- var nextDriverNumber = 0
64-
65- var workDir : File = null
66-
67- def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" )
70+ val dispatcherPublicAddress = {
71+ val envVar = System .getenv(" SPARK_PUBLIC_DNS" )
72+ if (envVar != null ) envVar else host
73+ }
6874
69- def createWorkDir () {
70- workDir = workDirPath.map(new File (_)).getOrElse(new File (sparkHome, " work" ))
75+ lazy val workDir : File = {
76+ val dir = workDirPath.map(new File (_)).getOrElse(new File (sparkHome, " work" ))
7177
7278 // Attempt to remove the work directory if it exists on startup.
7379 // This is to avoid unbounded growing the work directory as drivers
7480 // are only deleted when it is over the retained count while it's running.
7581 // We don't fail startup if we are not able to remove, as this is
7682 // a short-term solution.
7783 try {
78- if (workDir .exists()) {
79- workDir .delete()
84+ if (dir .exists()) {
85+ dir .delete()
8086 }
8187 } catch {
8288 case e : IOException =>
@@ -86,19 +92,31 @@ class MesosClusterDispatcher(
8692 try {
8793 // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
8894 // So attempting to create and then check if directory was created or not.
89- workDir .mkdirs()
90- if (! workDir .exists() || ! workDir .isDirectory) {
91- logError(" Failed to create work directory " + workDir )
95+ dir .mkdirs()
96+ if (! dir .exists() || ! dir .isDirectory) {
97+ logError(" Failed to create work directory " + dir )
9298 System .exit(1 )
9399 }
94- assert (workDir .isDirectory)
100+ assert (dir .isDirectory)
95101 } catch {
96102 case e : Exception =>
97- logError(" Failed to create work directory " + workDir , e)
103+ logError(" Failed to create work directory " + dir , e)
98104 System .exit(1 )
99105 }
106+ dir
100107 }
101108
109+ val webUi = new MesosClusterUI (
110+ self, new SecurityManager (conf), webUiPort, conf, workDir, dispatcherPublicAddress)
111+
112+ val runners = new HashMap [String , DriverRunner ]
113+ val drivers = new HashMap [String , DriverInfo ]
114+ val completedDrivers = new ArrayBuffer [DriverInfo ]
115+ val RETAINED_DRIVERS = conf.getInt(" spark.deploy.retainedDrivers" , 200 )
116+ var nextDriverNumber = 0
117+
118+ def createDateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" )
119+
102120 val sparkHome =
103121 new File (sys.env.get(" SPARK_HOME" ).getOrElse(" ." ))
104122
@@ -122,11 +140,12 @@ class MesosClusterDispatcher(
122140 }
123141
124142 override def preStart () {
125- createWorkDir()
126143 server.start()
144+ webUi.bind()
127145 }
128146
129147 override def postStop () {
148+ webUi.stop()
130149 server.stop()
131150 runners.values.foreach(_.kill())
132151 }
@@ -173,6 +192,10 @@ class MesosClusterDispatcher(
173192 throw new Exception (s " Received unexpected state update for driver $driverId: $state" )
174193 }
175194 }
195+
196+ case RequestDispatcherState => {
197+ sender ! DispatcherStateResponse (drivers.values, completedDrivers)
198+ }
176199 }
177200
178201 def removeDriver (driverId : String , state : DriverState , exception : Option [Exception ]) {
@@ -225,6 +248,7 @@ object MesosClusterDispatcher {
225248 args.host,
226249 args.port,
227250 boundPort,
251+ args.webUiPort,
228252 systemName,
229253 actorName,
230254 conf,
@@ -237,6 +261,7 @@ object MesosClusterDispatcher {
237261 class ClusterDispatcherArguments (args : Array [String ], conf : SparkConf ) {
238262 var host = Utils .localHostName()
239263 var port = 7077
264+ var webUiPort = 8081
240265 var masterUrl : String = null
241266
242267 parse(args.toList)
@@ -251,6 +276,10 @@ object MesosClusterDispatcher {
251276 port = value
252277 parse(tail)
253278
279+ case (" --webui-port" | " -p" ) :: IntParam (value) :: tail =>
280+ webUiPort = value
281+ parse(tail)
282+
254283 case (" --master" | " -m" ) :: value :: tail =>
255284 if (! value.startsWith(" mesos://" )) {
256285 System .err.println(" Cluster dispatcher only supports mesos (uri begins with mesos://)" )
@@ -283,6 +312,7 @@ object MesosClusterDispatcher {
283312 " Options:\n " +
284313 " -h HOST, --host HOST Hostname to listen on\n " +
285314 " -p PORT, --port PORT Port to listen on (default: 7077)\n " +
315+ " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n " +
286316 " -m --master MASTER URI for connecting to Mesos master\n " )
287317 System .exit(exitCode)
288318 }
0 commit comments