1818package org .apache .spark .scheduler .cluster .mesos
1919
2020import java .text .SimpleDateFormat
21-
2221import java .util .concurrent .atomic .AtomicLong
2322import java .util .concurrent .{LinkedBlockingQueue , TimeUnit }
2423import java .util .{List => JList }
2524import java .util .{Collections , Date }
26-
2725import org .apache .mesos .{SchedulerDriver , Scheduler }
2826import org .apache .mesos .Protos ._
29-
3027import org .apache .spark .deploy .DriverDescription
3128import org .apache .spark .deploy .master .DriverState
3229import org .apache .spark .deploy .master .DriverState .DriverState
33-
3430import org .apache .spark .SparkConf
3531import org .apache .spark .util .Utils
36-
3732import scala .collection .mutable
3833import scala .collection .JavaConversions ._
3934import org .apache .mesos .Protos .Environment .Variable
35+ import org .apache .spark .SparkException
36+ import java .io .File
4037
38+ case class DriverRequest (desc : DriverDescription , conf : SparkConf )
4139
4240private [spark] class DriverSubmission (
4341 val submissionId : String ,
44- val desc : DriverDescription ,
42+ val req : DriverRequest ,
4543 val submitDate : Date ) {
4644
4745 def canEqual (other : Any ): Boolean = other.isInstanceOf [DriverSubmission ]
@@ -76,7 +74,7 @@ private[spark] case class ClusterSchedulerState(
7674 finishedDrivers : Iterable [ClusterTaskState ])
7775
7876private [spark] trait ClusterScheduler {
79- def submitDriver (desc : DriverDescription ): SubmitResponse
77+ def submitDriver (desc : DriverRequest ): SubmitResponse
8078 def killDriver (submissionId : String ): KillResponse
8179 def getStatus (submissionId : String ): StatusResponse
8280 def getState (): ClusterSchedulerState
@@ -88,7 +86,6 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
8886 val master = conf.get(" spark.master" )
8987 val appName = conf.get(" spark.app.name" )
9088 val capacity = conf.getInt(" spark.mesos.driver.capacity" , 200 )
91- val executorUri = conf.getOption(" spark.executor.uri" )
9289 val stateLock = new Object
9390 val launchedDrivers = new mutable.HashMap [String , ClusterTaskState ]()
9491
@@ -105,10 +102,10 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
105102 createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet())
106103 }
107104
108- def submitDriver (desc : DriverDescription ): SubmitResponse = {
105+ def submitDriver (req : DriverRequest ): SubmitResponse = {
109106 val submitDate : Date = new Date ()
110107 val submissionId : String = newDriverId(submitDate)
111- val submission = new DriverSubmission (submissionId, desc , submitDate)
108+ val submission = new DriverSubmission (submissionId, req , submitDate)
112109 if (queue.offer(submission)) {
113110 SubmitResponse (submissionId, true , None )
114111 } else {
@@ -147,9 +144,16 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
147144 markRegistered()
148145 }
149146
150- private def buildCommand (desc : DriverDescription ): CommandInfo = {
147+ private def buildCommand (req : DriverRequest ): CommandInfo = {
148+
149+ val desc = req.desc
150+
151+ val cleanedJarUrl = desc.jarUrl.stripPrefix(" file:" )
152+
153+ logInfo(s " jarUrl: $cleanedJarUrl" )
154+
151155 val builder = CommandInfo .newBuilder()
152- .addUris(CommandInfo .URI .newBuilder().setValue(desc.jarUrl ).build())
156+ .addUris(CommandInfo .URI .newBuilder().setValue(cleanedJarUrl ).build())
153157
154158 val entries =
155159 (conf.getOption(" spark.executor.extraLibraryPath" ).toList ++ desc.command.libraryPathEntries)
@@ -160,11 +164,7 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
160164 " "
161165 }
162166
163- val stringBuilder = new StringBuilder
164- stringBuilder
165- .append(desc.command.mainClass)
166- .append(" " )
167- .append(desc.command.arguments.mkString(" \" " , " \" " , " \" " ))
167+ // TODO: add support for more spark-submit parameters
168168
169169 val envBuilder = Environment .newBuilder()
170170 desc.command.environment.foreach {
@@ -175,14 +175,35 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
175175
176176 builder.setEnvironment(envBuilder.build())
177177
178+ val executorUri = req.conf.getOption(" spark.executor.uri" )
178179 if (executorUri.isDefined) {
179180 builder.addUris(CommandInfo .URI .newBuilder().setValue(executorUri.get).build())
181+
180182 val basename = executorUri.get.split('/' ).last.split('.' ).head
183+ val cmd =
184+ Seq (" bin/spark-submit" ,
185+ " --class" , desc.command.mainClass,
186+ " --master" , s " mesos:// ${conf.get(" spark.master" )}" ,
187+ s " ../ ${desc.jarUrl.split(" /" ).last}" )
188+ .mkString(" " )
189+
181190 builder.setValue(
182- s " cd $basename*; $prefixEnv ${stringBuilder.toString()} " )
191+ s " cd $basename*; $prefixEnv $cmd " )
183192 } else {
193+ val executorSparkHome = req.conf.getOption(" spark.mesos.executor.home" )
194+ .getOrElse {
195+ throw new SparkException (" Executor Spark home `spark.mesos.executor.home` is not set!" )
196+ }
197+
198+ val cmd =
199+ Seq (new File (executorSparkHome, " ./bin/spark-submit" ),
200+ " --class" , desc.command.mainClass,
201+ " --master" , s " mesos:// ${conf.get(" spark.master" )}" ,
202+ desc.jarUrl.split(" /" ).last)
203+ .mkString(" " )
204+
184205 builder.setValue(
185- s " $prefixEnv ${stringBuilder.toString()} " )
206+ s " $prefixEnv $cmd " )
186207 }
187208
188209 builder.build
@@ -203,8 +224,8 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
203224
204225 var remainingOffers = offers
205226
206- val driverCpu = submission.desc.cores
207- val driverMem = submission.desc.mem
227+ val driverCpu = submission.req. desc.cores
228+ val driverMem = submission.req. desc.mem
208229
209230 // Should use the passed in driver cpu and memory.
210231 val offerOption = offers.find { o =>
@@ -216,15 +237,18 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
216237 val taskId = TaskID .newBuilder().setValue(submission.submissionId).build()
217238
218239 val cpuResource = Resource .newBuilder()
219- .setName(" cpus" ).setScalar(Value .Scalar .newBuilder().setValue(driverCpu)).build()
240+ .setName(" cpus" ).setType(Value .Type .SCALAR )
241+ .setScalar(Value .Scalar .newBuilder().setValue(driverCpu)).build()
220242
221243 val memResource = Resource .newBuilder()
222- .setName(" mem" ).setScalar(Value .Scalar .newBuilder().setValue(driverMem)).build()
244+ .setName(" mem" ).setType(Value .Type .SCALAR )
245+ .setScalar(Value .Scalar .newBuilder().setValue(driverMem)).build()
223246
224- val commandInfo = buildCommand(submission.desc )
247+ val commandInfo = buildCommand(submission.req )
225248
226249 val taskInfo = TaskInfo .newBuilder()
227250 .setTaskId(taskId)
251+ .setName(s " driver for ${submission.req.desc.command.mainClass}" )
228252 .setSlaveId(offer.getSlaveId)
229253 .setCommand(commandInfo)
230254 .addResources(cpuResource)
0 commit comments