Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.mesos.ui

import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterSubmissionState, MesosClusterRetryState}
import org.apache.spark.ui.{UIUtils, WebUIPage}


private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {

override def render(request: HttpServletRequest): Seq[Node] = {
val driverId = request.getParameter("id")
require(driverId != null && driverId.nonEmpty, "Missing id parameter")

val state = parent.scheduler.getDriverState(driverId)
if (state.isEmpty) {
val content =
<div>
<p>Cannot find driver {driverId}</p>
</div>
return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
}
val driverState = state.get
val driverHeaders = Seq("Driver property", "Value")
val schedulerHeaders = Seq("Scheduler property", "Value")
val commandEnvHeaders = Seq("Command environment variable", "Value")
val launchedHeaders = Seq("Launched property", "Value")
val commandHeaders = Seq("Comamnd property", "Value")
val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
val driverDescription = Iterable.apply(driverState.description)
val submissionState = Iterable.apply(driverState.submissionState)
val command = Iterable.apply(driverState.description.command)
val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
val commandEnv = Iterable.apply(driverState.description.command.environment)
val driverTable =
UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
val commandTable =
UIUtils.listingTable(commandHeaders, commandRow, command)
val commandEnvTable =
UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
val schedulerTable =
UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
val launchedTable =
UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
val retryTable =
UIUtils.listingTable(
retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
val content =
<p>Driver state information for driver id {driverId}</p>
<a href="/">Back to Drivers</a>
<div class="row-fluid">
<div class="span12">
<h4>Driver state: {driverState.state}</h4>
<h4>Driver properties</h4>
{driverTable}
<h4>Driver command</h4>
{commandTable}
<h4>Driver command environment</h4>
{commandEnvTable}
<h4>Scheduler properties</h4>
{schedulerTable}
<h4>Launched state</h4>
{launchedTable}
<h4>Retry state</h4>
{retryTable}
</div>
</div>;

UIUtils.basicSparkPage(content, s"Details for Job $driverId")
}

private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
submissionState.map { state =>
<tr>
<td>Mesos Slave ID</td>
<td>{state.slaveId.getValue}</td>
</tr>
<tr>
<td>Mesos Task ID</td>
<td>{state.taskId.getValue}</td>
</tr>
<tr>
<td>Launch Time</td>
<td>{state.startDate}</td>
</tr>
<tr>
<td>Finish Time</td>
<td>{state.finishDate.map(_.toString).getOrElse("")}</td>
</tr>
<tr>
<td>Last Task Status</td>
<td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
</tr>
}.getOrElse(Seq[Node]())
}

private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
properties.map { case (k, v) =>
<tr>
<td>{k}</td><td>{v}</td>
</tr>
}.toSeq
}

private def commandRow(command: Command): Seq[Node] = {
<tr>
<td>Main class</td><td>{command.mainClass}</td>
</tr>
<tr>
<td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
</tr>
<tr>
<td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
</tr>
<tr>
<td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
</tr>
<tr>
<td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
</tr>
}

private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
<tr>
<td>Name</td><td>{driver.name}</td>
</tr>
<tr>
<td>Id</td><td>{driver.submissionId}</td>
</tr>
<tr>
<td>Cores</td><td>{driver.cores}</td>
</tr>
<tr>
<td>Memory</td><td>{driver.mem}</td>
</tr>
<tr>
<td>Submitted</td><td>{driver.submissionDate}</td>
</tr>
<tr>
<td>Supervise</td><td>{driver.supervise}</td>
</tr>
}

private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
retryState.map { state =>
<tr>
<td>
{state.lastFailureStatus}
</td>
<td>
{state.nextRetry}
</td>
<td>
{state.retries}
</td>
</tr>
}.getOrElse(Seq[Node]())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
}

private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
val id = submission.submissionId
<tr>
<td>{submission.submissionId}</td>
<td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>cpus: {submission.cores}, mem: {submission.mem}</td>
</tr>
}

private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId
<tr>
<td>{state.driverDescription.submissionId}</td>
<td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{state.driverDescription.submissionDate}</td>
<td>{state.driverDescription.command.mainClass}</td>
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
Expand All @@ -77,8 +79,9 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
}

private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
val id = submission.submissionId
<tr>
<td>{submission.submissionId}</td>
<td><a href={s"driver?id=$id"}>{id}</a></td>
<td>{submission.submissionDate}</td>
<td>{submission.command.mainClass}</td>
<td>{submission.retryState.get.lastFailureStatus}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[spark] class MesosClusterUI(

override def initialize() {
attachPage(new MesosClusterPage(this))
attachPage(new DriverPage(this))
attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class MesosRestServer(
new MesosStatusRequestServlet(scheduler, masterConf)
}

private[deploy] class MesosSubmitRequestServlet(
private[mesos] class MesosSubmitRequestServlet(
scheduler: MesosClusterScheduler,
conf: SparkConf)
extends SubmitRequestServlet {
Expand Down Expand Up @@ -139,7 +139,7 @@ private[deploy] class MesosSubmitRequestServlet(
}
}

private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends KillRequestServlet {
protected override def handleKill(submissionId: String): KillSubmissionResponse = {
val k = scheduler.killDriver(submissionId)
Expand All @@ -148,7 +148,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler,
}
}

private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
extends StatusRequestServlet {
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
val d = scheduler.getDriverStatus(submissionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ private[spark] class MesosClusterSubmissionState(
val taskId: TaskID,
val slaveId: SlaveID,
var mesosTaskStatus: Option[TaskStatus],
var startDate: Date)
var startDate: Date,
var finishDate: Option[Date])
extends Serializable {

def copy(): MesosClusterSubmissionState = {
new MesosClusterSubmissionState(
driverDescription, taskId, slaveId, mesosTaskStatus, startDate)
driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate)
}
}

Expand Down Expand Up @@ -95,6 +96,14 @@ private[spark] class MesosClusterSchedulerState(
val finishedDrivers: Iterable[MesosClusterSubmissionState],
val pendingRetryDrivers: Iterable[MesosDriverDescription])

/**
* The full state of a Mesos driver, that is being used to display driver information on the UI.
*/
private[spark] class MesosDriverState(
val state: String,
val description: MesosDriverDescription,
val submissionState: Option[MesosClusterSubmissionState] = None)

/**
* A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode
* as Mesos tasks in a Mesos cluster.
Expand Down Expand Up @@ -233,6 +242,22 @@ private[spark] class MesosClusterScheduler(
s
}

/**
* Gets the driver state to be displayed on the Web UI.
*/
def getDriverState(submissionId: String): Option[MesosDriverState] = {
stateLock.synchronized {
queuedDrivers.find(_.submissionId.equals(submissionId))
.map(d => new MesosDriverState("QUEUED", d))
.orElse(launchedDrivers.get(submissionId)
.map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
.orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
.map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
.orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
.map(d => new MesosDriverState("RETRYING", d)))
}
}

private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity

/**
Expand Down Expand Up @@ -439,7 +464,7 @@ private[spark] class MesosClusterScheduler(
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
None, new Date())
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
Expand Down Expand Up @@ -534,6 +559,7 @@ private[spark] class MesosClusterScheduler(
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
removeFromLaunchedDrivers(taskId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
val (retries, waitTimeSec) = retryState
.map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) }
Expand All @@ -546,6 +572,7 @@ private[spark] class MesosClusterScheduler(
pendingRetryDriversState.persist(taskId, newDriverDescription)
} else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) {
removeFromLaunchedDrivers(taskId)
state.finishDate = Some(new Date())
if (finishedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
finishedDrivers.trimStart(toRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.executor.MesosExecutorBackend
Expand Down Expand Up @@ -56,7 +56,7 @@ private[spark] class MesosSchedulerBackend(

// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus

private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)

@volatile var appId: String = _
Expand Down