- { providerConfig.map(e => - {e._1}: {e._2}
) }
+ {providerConfig.map { case (k, v) => - {k}: {v}
}}
{
if (allApps.size > 0) {
@@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val uiAddress = "/history/" + info.id
+ val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56b38ddfc9313..d1a64c1912cb8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.SignalLogger
/**
* A web server that renders SparkUIs of completed applications.
@@ -114,7 +114,7 @@ class HistoryServer(
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
val contextHandler = new ServletContextHandler
- contextHandler.setContextPath("/history")
+ contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
@@ -172,10 +172,12 @@ class HistoryServer(
object HistoryServer extends Logging {
private val conf = new SparkConf
+ val UI_PATH_PREFIX = "/history"
+
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
- val args = new HistoryServerArguments(conf, argStrings)
+ new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
val providerName = conf.getOption("spark.history.provider")
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index be9361b754fc3..25fc76c23e0fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
+ conf.set("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
- if (logDir != null) {
- conf.set("spark.history.fs.logDirectory", logDir)
- }
}
private def printUsageAndExit(exitCode: Int) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 72d0589689e71..d3674427b1271 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -46,6 +46,11 @@ private[spark] class ApplicationInfo(
init()
+ private def readObject(in: java.io.ObjectInputStream): Unit = {
+ in.defaultReadObject()
+ init()
+ }
+
private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index c87b66f047dc8..38db02cd2421b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
class ApplicationSource(val application: ApplicationInfo) extends Source {
- val metricRegistry = new MetricRegistry()
- val sourceName = "%s.%s.%s".format("application", application.desc.name,
+ override val metricRegistry = new MetricRegistry()
+ override val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a304102a49086..cfa2c028a807b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.master
+import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
@@ -30,25 +31,25 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.hadoop.fs.FileSystem
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
import context.dispatcher // to use Akka's scheduler.schedule()
@@ -57,6 +58,7 @@ private[spark] class Master(
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
+ val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
@@ -72,9 +74,7 @@ private[spark] class Master(
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
var nextAppNumber = 0
-
val appIdToUI = new HashMap[String, SparkUI]
- val fileSystemsUsed = new HashSet[FileSystem]
val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -154,19 +154,20 @@ private[spark] class Master(
}
override def postStop() {
+ masterMetricsSystem.report()
+ applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel()
}
webUi.stop()
- fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
context.stop(leaderElectionAgent)
}
- override def receive = {
+ override def receiveWithLogging = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
@@ -644,10 +645,7 @@ private[spark] class Master(
waitingApps -= app
// If application events are logged, use them to rebuild the UI
- if (!rebuildSparkUI(app)) {
- // Avoid broken links if the UI is not reconstructed
- app.desc.appUiUrl = ""
- }
+ rebuildSparkUI(app)
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
@@ -669,29 +667,49 @@ private[spark] class Master(
*/
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
- val eventLogDir = app.desc.eventLogDir.getOrElse { return false }
+ val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
+ val eventLogDir = app.desc.eventLogDir.getOrElse {
+ // Event logging is not enabled for this application
+ app.desc.appUiUrl = notFoundBasePath
+ return false
+ }
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
- if (!eventLogPaths.isEmpty) {
- try {
- val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
- val ui = new SparkUI(
- new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
- replayBus.replay()
- app.desc.appUiUrl = ui.basePath
- appIdToUI(app.id) = ui
- webUi.attachSparkUI(ui)
- return true
- } catch {
- case e: Exception =>
- logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
- }
- } else {
- logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
+
+ if (eventLogPaths.isEmpty) {
+ // Event logging is enabled for this application, but no event logs are found
+ val title = s"Application history not found (${app.id})"
+ var msg = s"No event logs found for application $appName in $eventLogDir."
+ logWarning(msg)
+ msg += " Did you specify the correct logging directory?"
+ msg = URLEncoder.encode(msg, "UTF-8")
+ app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
+ return false
+ }
+
+ try {
+ val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
+ val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
+ HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+ replayBus.replay()
+ appIdToUI(app.id) = ui
+ webUi.attachSparkUI(ui)
+ // Application UI is successfully rebuilt, so link the Master UI to it
+ app.desc.appUiUrl = ui.basePath
+ true
+ } catch {
+ case e: Exception =>
+ // Relay exception message to application UI page
+ val title = s"Application history load error (${app.id})"
+ val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
+ var msg = s"Exception in replaying log for application $appName!"
+ logError(msg, e)
+ msg = URLEncoder.encode(msg, "UTF-8")
+ app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
+ false
}
- false
}
/** Generate a new app ID given a app's submission date */
@@ -744,11 +762,16 @@ private[spark] class Master(
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
+ if (completedDrivers.size >= RETAINED_DRIVERS) {
+ val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
+ completedDrivers.trimStart(toRemove)
+ }
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
+ schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index a87781fb93850..4b0dbbe543d3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -38,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (conf.contains("master.ui.port")) {
- webUiPort = conf.get("master.ui.port").toInt
+ if (conf.contains("spark.master.ui.port")) {
+ webUiPort = conf.get("spark.master.ui.port").toInt
}
parse(args.toList)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 36c1b87b7f684..9c3f79f1244b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
private[spark] class MasterSource(val master: Master) extends Source {
- val metricRegistry = new MetricRegistry()
- val sourceName = "master"
+ override val metricRegistry = new MetricRegistry()
+ override val sourceName = "master"
// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 34fa1429c86de..4588c130ef439 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -28,7 +28,7 @@ import org.json4s.JValue
import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
-import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
new file mode 100644
index 0000000000000..d8daff3e7fb9c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import java.net.URLDecoder
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[spark] class HistoryNotFoundPage(parent: MasterWebUI)
+ extends WebUIPage("history/not-found") {
+
+ /**
+ * Render a page that conveys failure in loading application history.
+ *
+ * This accepts 3 HTTP parameters:
+ * msg = message to display to the user
+ * title = title of the page
+ * exception = detailed description of the exception in loading application history (if any)
+ *
+ * Parameters "msg" and "exception" are assumed to be UTF-8 encoded.
+ */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val titleParam = request.getParameter("title")
+ val msgParam = request.getParameter("msg")
+ val exceptionParam = request.getParameter("exception")
+
+ // If no parameters are specified, assume the user did not enable event logging
+ val defaultTitle = "Event logging is not enabled"
+ val defaultContent =
+
+
+ No event logs were found for this application! To
+
enable event logging,
+ set
spark.eventLog.enabled to true and
+
spark.eventLog.dir to the directory to which your
+ event logs are written.
+
+
+
+ val title = Option(titleParam).getOrElse(defaultTitle)
+ val content = Option(msgParam)
+ .map { msg => URLDecoder.decode(msg, "UTF-8") }
+ .map { msg =>
+
++
+ Option(exceptionParam)
+ .map { e => URLDecoder.decode(e, "UTF-8") }
+ .map { e =>
{e} }
+ .getOrElse(Seq.empty)
+ }.getOrElse(defaultContent)
+
+ UIUtils.basicSparkPage(content, title)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a18b39fc95d64..d86ec1e03e45c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -21,14 +21,14 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.AkkaUtils
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
- extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
+ extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
@@ -38,6 +38,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
/** Initialize all components of the server. */
def initialize() {
attachPage(new ApplicationPage(this))
+ attachPage(new HistoryNotFoundPage(this))
attachPage(new MasterPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 4af5bc3afad6c..687e492a0d6fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
- val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
// Exists for backwards compatibility with older Spark versions
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
- Seq()
+ Seq()
}
val permGenOpt = Seq("-XX:MaxPermSize=128m")
@@ -71,11 +70,11 @@ object CommandUtils extends Logging {
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment=command.environment)
+ extraEnvironment = command.environment)
val userClassPath = command.classPathEntries ++ Seq(classPath)
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
+ permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 662d37871e7a6..5caaf6bea3575 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
+ * This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
@@ -81,7 +82,7 @@ private[spark] class DriverRunner(
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
- driverDesc.command.extraJavaOptions)
+ driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 467317dd9b44c..7be89f9aff0f3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender
/**
* Manages the execution of one executor process.
+ * This is currently only used in standalone mode.
*/
private[spark] class ExecutorRunner(
val appId: String,
@@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
}
/**
- * kill executor process, wait for exit and notify worker to update resource status
+ * Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
*/
@@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
}
def getCommandSeq = {
- val command = Command(appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
- appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
- appDesc.command.extraJavaOptions)
+ val command = Command(
+ appDesc.command.mainClass,
+ appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+ appDesc.command.environment,
+ appDesc.command.classPathEntries,
+ appDesc.command.libraryPathEntries,
+ appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ce425443051b0..80fde7e4b2624 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -51,7 +51,7 @@ private[spark] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
- extends Actor with Logging {
+ extends Actor with ActorLogReceive with Logging {
import context.dispatcher
Utils.checkHost(host, "Expected hostname")
@@ -71,7 +71,7 @@ private[spark] class Worker(
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
-
+ val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
@@ -81,7 +81,13 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
- val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
+ val sparkHome =
+ if (testing) {
+ assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
+ new File(sys.props("spark.test.home"))
+ } else {
+ new File(sys.env.get("SPARK_HOME").getOrElse("."))
+ }
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -130,7 +136,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
+ webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()
@@ -181,7 +187,7 @@ private[spark] class Worker(
}
}
- override def receive = {
+ override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
@@ -233,9 +239,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host,
- appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, conf, ExecutorState.RUNNING)
+ self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -357,6 +361,7 @@ private[spark] class Worker(
}
override def postStop() {
+ metricsSystem.report()
registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
@@ -368,7 +373,8 @@ private[spark] class Worker(
private[spark] object Worker extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
- val args = new WorkerArguments(argStrings)
+ val conf = new SparkConf
+ val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index dc5158102054e..1e295aaa48c30 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -20,11 +20,12 @@ package org.apache.spark.deploy.worker
import java.lang.management.ManagementFactory
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
+import org.apache.spark.SparkConf
/**
* Command-line parser for the worker.
*/
-private[spark] class WorkerArguments(args: Array[String]) {
+private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 0
var webUiPort = 8081
@@ -46,6 +47,9 @@ private[spark] class WorkerArguments(args: Array[String]) {
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
}
+ if (conf.contains("spark.worker.ui.port")) {
+ webUiPort = conf.get("spark.worker.ui.port").toInt
+ }
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index b7ddd8c816cbc..df1e01b23b932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
private[spark] class WorkerSource(val worker: Worker) extends Source {
- val sourceName = "worker"
- val metricRegistry = new MetricRegistry()
+ override val sourceName = "worker"
+ override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 530c147000904..6d0d0bbe5ecec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, Di
import org.apache.spark.Logging
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
+import org.apache.spark.util.ActorLogReceive
/**
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
* Provides fate sharing between a worker and its associated child processes.
*/
-private[spark] class WorkerWatcher(workerUrl: String) extends Actor
- with Logging {
+private[spark] class WorkerWatcher(workerUrl: String)
+ extends Actor with ActorLogReceive with Logging {
+
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
- override def receive = {
+ override def receiveWithLogging = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index b389cb546de6c..ecb358c399819 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.worker.ui
-import java.io.File
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
@@ -25,7 +24,7 @@ import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
-import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
+import org.apache.spark.util.logging.RollingFileAppender
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
@@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val (logDir, params) = (appId, executorId, driverId) match {
+ val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
+ (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/", s"driverId=$d")
+ (s"${workDir.getPath}/$d/", s"driverId=$d", d)
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
@@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w