Skip to content

Commit 6ff8e5c

Browse files
committed
Address comments and add logging.
1 parent df355cd commit 6ff8e5c

File tree

14 files changed

+88
-72
lines changed

14 files changed

+88
-72
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private[master] class Master(
130130
private val restServer =
131131
if (restServerEnabled) {
132132
val port = conf.getInt("spark.master.rest.port", 6066)
133-
Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
133+
Some(new StandaloneRestServer(host, port, conf, self, masterUrl))
134134
} else {
135135
None
136136
}

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
3030

3131
/*
3232
* A dispatcher that is responsible for managing and launching drivers, and is intended to
33-
* be used for Mesos cluster mode. The dispatcher ls launched by the user in the cluster,
33+
* be used for Mesos cluster mode. The dispatcher is launched by the user in the cluster,
3434
* which it launches a [[MesosRestServer]] for listening for driver requests, and launches a
35-
* [[MesoClusterScheduler]] to launch these drivers in the Mesos cluster.
35+
* [[MesosClusterScheduler]] to launch these drivers in the Mesos cluster.
3636
*
3737
* A typical new driver lifecycle is the following:
3838
*
@@ -141,7 +141,7 @@ private[mesos] object MesosClusterDispatcher extends spark.Logging {
141141

142142
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
143143

144-
def parse(args: List[String]): Unit = args match {
144+
private def parse(args: List[String]): Unit = args match {
145145
case ("--host" | "-h") :: value :: tail =>
146146
Utils.checkHost(value, "Please use hostname " + value)
147147
host = value

core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.mesos
1919

2020
import java.util.Date
2121

22-
import scala.collection.mutable
23-
2422
import org.apache.spark.deploy.Command
2523

2624
/**
@@ -34,13 +32,13 @@ import org.apache.spark.deploy.Command
3432
* @param command The command to launch the driver.
3533
* @param schedulerProperties Extra properties to pass the Mesos scheduler
3634
*/
37-
private[spark] class MesosDriverDescription(
35+
private[spark] case class MesosDriverDescription(
3836
val jarUrl: String,
3937
val mem: Int,
40-
val cores: Int,
38+
val cores: Double,
4139
val supervise: Boolean,
4240
val command: Command,
43-
val schedulerProperties: mutable.HashMap[String, String])
41+
val schedulerProperties: Map[String, String])
4442
extends Serializable {
4543

4644
var submissionId: Option[String] = None

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
3030
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
3131
def render(request: HttpServletRequest): Seq[Node] = {
3232
val state = parent.scheduler.getState()
33-
val queuedHeaders = Seq("DriverID", "Submit Date", "Description")
33+
val queuedHeaders = Seq("DriverID", "Submit Date", "Main Class", "Driver resources")
3434
val driverHeaders = queuedHeaders ++
3535
Seq("Start Date", "Mesos Slave ID", "State")
3636
val retryHeaders = Seq("DriverID", "Submit Date", "Description") ++
@@ -61,6 +61,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
6161
<td>{submission.submissionId}</td>
6262
<td>{submission.submissionDate}</td>
6363
<td>{submission.command.mainClass}</td>
64+
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
6465
</tr>
6566
}
6667

@@ -91,21 +92,22 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
9192
return ""
9293
}
9394
val sb = new StringBuilder
94-
sb.append(s"State: ${status.get.getState}")
95+
val s = status.get
96+
sb.append(s"State: ${s.getState}")
9597
if (status.get.hasMessage) {
96-
sb.append(s", Message: ${status.get.getMessage}")
98+
sb.append(s", Message: ${s.getMessage}")
9799
}
98100
if (status.get.hasHealthy) {
99-
sb.append(s", Healthy: ${status.get.getHealthy}")
101+
sb.append(s", Healthy: ${s.getHealthy}")
100102
}
101103
if (status.get.hasSource) {
102-
sb.append(s", Source: ${status.get.getSource}")
104+
sb.append(s", Source: ${s.getSource}")
103105
}
104106
if (status.get.hasReason) {
105-
sb.append(s", Reason: ${status.get.getReason}")
107+
sb.append(s", Reason: ${s.getReason}")
106108
}
107109
if (status.get.hasTimestamp) {
108-
sb.append(s", Time: ${status.get.getTimestamp}")
110+
sb.append(s", Time: ${s.getTimestamp}")
109111
}
110112
sb.toString()
111113
}

core/src/main/scala/org/apache/spark/deploy/rest/RestServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,14 @@ private[spark] abstract class RestServer extends Logging {
5151
val host: String
5252
val requestedPort: Int
5353
val masterConf: SparkConf
54-
val submitRequestServlet: SubmitRequestServlet
55-
val killRequestServlet: KillRequestServlet
56-
val statusRequestServlet: StatusRequestServlet
54+
def submitRequestServlet: SubmitRequestServlet
55+
def killRequestServlet: KillRequestServlet
56+
def statusRequestServlet: StatusRequestServlet
5757

5858
private var _server: Option[Server] = None
5959

6060
// A mapping from URL prefixes to servlets that serve them. Exposed for testing.
61-
protected val baseContext = s"/$RestServer.PROTOCOL_VERSION/submissions"
61+
protected val baseContext = s"/${RestServer.PROTOCOL_VERSION}/submissions"
6262
protected val contextToServlet = Map[String, RestServlet](
6363
s"$baseContext/create/*" -> submitRequestServlet,
6464
s"$baseContext/kill/*" -> killRequestServlet,

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,23 @@ import org.apache.spark.deploy.ClientArguments._
4545
*
4646
* @param host the address this server should bind to
4747
* @param requestedPort the port this server will attempt to bind to
48+
* @param masterConf the conf used by the Master
4849
* @param masterActor reference to the Master actor to which requests can be sent
4950
* @param masterUrl the URL of the Master new drivers will attempt to connect to
50-
* @param masterConf the conf used by the Master
5151
*/
5252
private[deploy] class StandaloneRestServer(
5353
val host: String,
5454
val requestedPort: Int,
55+
val masterConf: SparkConf,
5556
masterActor: ActorRef,
56-
masterUrl: String,
57-
val masterConf: SparkConf)
57+
masterUrl: String)
5858
extends RestServer {
59-
60-
val submitRequestServlet = new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
61-
val killRequestServlet = new StandaloneKillRequestServlet(masterActor, masterConf)
62-
val statusRequestServlet = new StandaloneStatusRequestServlet(masterActor, masterConf)
59+
def submitRequestServlet: SubmitRequestServlet =
60+
new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
61+
def killRequestServlet: KillRequestServlet =
62+
new StandaloneKillRequestServlet(masterActor, masterConf)
63+
def statusRequestServlet: StatusRequestServlet =
64+
new StandaloneStatusRequestServlet(masterActor, masterConf)
6365
}
6466

6567
/**

core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ package org.apache.spark.deploy.rest.mesos
2020
import java.io.File
2121
import javax.servlet.http.HttpServletResponse
2222

23-
import scala.collection.mutable
24-
2523
import org.apache.spark.deploy.Command
2624
import org.apache.spark.deploy.mesos.MesosDriverDescription
2725
import org.apache.spark.deploy.rest._
2826
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
2927
import org.apache.spark.util.Utils
3028
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
3129

30+
import scala.collection.mutable
31+
3232

3333
/**
3434
* A server that responds to requests submitted by the [[RestClient]].
@@ -44,9 +44,12 @@ private[spark] class MesosRestServer(
4444
val masterConf: SparkConf,
4545
scheduler: MesosClusterScheduler)
4646
extends RestServer {
47-
val submitRequestServlet = new MesosSubmitRequestServlet(scheduler, masterConf)
48-
val killRequestServlet = new MesosKillRequestServlet(scheduler, masterConf)
49-
val statusRequestServlet = new MesosStatusRequestServlet(scheduler, masterConf)
47+
def submitRequestServlet: SubmitRequestServlet =
48+
new MesosSubmitRequestServlet(scheduler, masterConf)
49+
def killRequestServlet: KillRequestServlet =
50+
new MesosKillRequestServlet(scheduler, masterConf)
51+
def statusRequestServlet: StatusRequestServlet =
52+
new MesosStatusRequestServlet(scheduler, masterConf)
5053
}
5154

5255
private[mesos] class MesosSubmitRequestServlet(
@@ -56,7 +59,7 @@ private[mesos] class MesosSubmitRequestServlet(
5659

5760
private val DEFAULT_SUPERVISE = false
5861
private val DEFAULT_MEMORY = 512 // mb
59-
private val DEFAULT_CORES = 1
62+
private val DEFAULT_CORES = 1.0
6063

6164
/**
6265
* Build a driver description from the fields specified in the submit request.
@@ -113,11 +116,11 @@ private[mesos] class MesosSubmitRequestServlet(
113116
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
114117
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
115118
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
116-
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
119+
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
117120

118121
new MesosDriverDescription(
119122
appResource, actualDriverMemory, actualDriverCores,
120-
actualSuperviseDriver, command, schedulerProperties)
123+
actualSuperviseDriver, command, schedulerProperties.toMap)
121124
}
122125

123126
protected override def handleSubmit(

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/DriverQueue.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.mesos.MesosDriverDescription
2626
* This queue automatically stores the state after each pop/push
2727
* so it can be recovered later.
2828
* This queue is also bounded and rejects offers when it's full.
29+
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
2930
* @param state Mesos state abstraction to fetch persistent state.
3031
*/
3132
private[mesos] class DriverQueue(state: MesosClusterPersistenceEngine, capacity: Int) {
@@ -42,7 +43,7 @@ private[mesos] class DriverQueue(state: MesosClusterPersistenceEngine, capacity:
4243
count = queue.size
4344
}
4445

45-
def isFull = count >= capacity
46+
def isFull: Boolean = count >= capacity
4647

4748
def size: Int = count
4849

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/LaunchedDrivers.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.mesos.Protos.SlaveID
2222

2323
/**
2424
* Tracks all the launched or running drivers in the Mesos cluster scheduler.
25+
* This class is not thread-safe, and we expect the caller to handle synchronizing state.
2526
* @param state Persistence engine to store tasks state.
2627
*/
2728
private[mesos] class LaunchedDrivers(state: MesosClusterPersistenceEngine) {
@@ -51,19 +52,14 @@ private[mesos] class LaunchedDrivers(state: MesosClusterPersistenceEngine) {
5152
def contains(submissionId: String): Boolean = drivers.contains(submissionId)
5253

5354
def remove(submissionId: String): Option[MesosClusterTaskState] = {
54-
if (pendingRecover.contains(submissionId)) {
55-
pendingRecover.remove(submissionId)
56-
}
57-
55+
pendingRecover.remove(submissionId)
5856
val removedState = drivers.remove(submissionId)
5957
state.expunge(submissionId)
6058
removedState
6159
}
6260

6361
def set(submissionId: String, newState: MesosClusterTaskState): Unit = {
64-
if (pendingRecover.contains(newState.taskId.getValue)) {
65-
pendingRecover.remove(newState.taskId.getValue)
66-
}
62+
pendingRecover.remove(newState.taskId.getValue)
6763
drivers(submissionId) = newState
6864
state.persist(submissionId, newState)
6965
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,20 +329,34 @@ private[spark] class MesosClusterSchedulerDriver(
329329
options
330330
}
331331

332-
private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double)
332+
private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
333+
override def toString(): String = {
334+
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
335+
}
336+
}
333337

334338
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
339+
def printOffers(offers: Iterable[ResourceOffer]): String = {
340+
val builder = new StringBuilder()
341+
offers.foreach { o =>
342+
builder.append(o).append("\n")
343+
}
344+
builder.toString()
345+
}
346+
335347
val currentOffers = offers.map { o =>
336348
new ResourceOffer(
337349
o,
338350
getResource(o.getResourcesList, "cpus"),
339351
getResource(o.getResourcesList, "mem"))
340352
}
353+
logTrace(s"Received offers from Mesos: ${printOffers(currentOffers)}")
341354
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
342355
val currentTime = new Date()
356+
343357
def scheduleTasks(
344358
taskFunc: () => (Option[MesosDriverDescription], Option[RetryState]),
345-
scheduledCallback: (String) => Unit) {
359+
scheduledCallback: (String) => Unit): Unit = {
346360
var nextItem = taskFunc()
347361
// TODO: We should not stop scheduling at the very first task
348362
// that cannot be scheduled. Instead we should exhaust the
@@ -352,11 +366,14 @@ private[spark] class MesosClusterSchedulerDriver(
352366
val (submission, retryState) = (nextItem._1.get, nextItem._2)
353367
val driverCpu = submission.cores
354368
val driverMem = submission.mem
369+
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
355370
val offerOption = currentOffers.find { o =>
356371
o.cpu >= driverCpu && o.mem >= driverMem
357372
}
358373

359374
if (offerOption.isEmpty) {
375+
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId.get}," +
376+
s"cpu: $driverCpu, mem: $driverMem")
360377
return
361378
}
362379

@@ -371,9 +388,10 @@ private[spark] class MesosClusterSchedulerDriver(
371388
.setName("mem").setType(Value.Type.SCALAR)
372389
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
373390
val commandInfo = buildCommand(submission)
391+
val appName = submission.schedulerProperties("spark.app.name")
374392
val taskInfo = TaskInfo.newBuilder()
375393
.setTaskId(taskId)
376-
.setName(s"driver for ${submission.command.mainClass}")
394+
.setName(s"Driver for $appName")
377395
.setSlaveId(offer.offer.getSlaveId)
378396
.setCommand(commandInfo)
379397
.addResources(cpuResource)
@@ -387,6 +405,9 @@ private[spark] class MesosClusterSchedulerDriver(
387405
tasks(offer.offer.getId)
388406
}
389407
queuedTasks += taskInfo
408+
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
409+
submission.submissionId.get)
410+
390411
launchedDrivers.set(
391412
submission.submissionId.get,
392413
new MesosClusterTaskState(submission, taskId, offer.offer.getSlaveId,

0 commit comments

Comments
 (0)