Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -123,7 +124,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
set(JARS, jars.filter(_ != null))
}

/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
Expand Down Expand Up @@ -201,12 +202,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim)
allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim)
.filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)

set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq)
set(SERIALIZER, classOf[KryoSerializer].getName)
this
}

Expand Down Expand Up @@ -547,20 +548,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
case "yarn-cluster" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "cluster")
set(SUBMIT_DEPLOY_MODE, "cluster")
case "yarn-client" =>
logWarning(warning)
set("spark.master", "yarn")
set("spark.submit.deployMode", "client")
set(SUBMIT_DEPLOY_MODE, "client")
case _ => // Any other unexpected master will be checked when creating scheduler backend.
}
}

if (contains("spark.submit.deployMode")) {
get("spark.submit.deployMode") match {
if (contains(SUBMIT_DEPLOY_MODE)) {
get(SUBMIT_DEPLOY_MODE) match {
case "cluster" | "client" =>
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
"\"client\".")
case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " +
"\"cluster\" or \"client\".")
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging {
def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
Expand Down Expand Up @@ -2640,7 +2640,7 @@ object SparkContext extends Logging {
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" =>
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
conf.getInt(DRIVER_CORES.key, 0)
} else {
0
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,13 @@ object SparkEnv extends Logging {
}
}

// Create an instance of the class named by the given SparkConf property, or defaultClassName
// Create an instance of the class named by the given SparkConf property
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = {
instantiateClass[T](conf.get(propertyName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
logDebug(s"Using serializer: ${serializer.getClass}")

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Arrays
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.internal.config._

private[spark] object RUtils {
// Local path where R binary packages built from R source code contained in the spark
Expand Down Expand Up @@ -63,7 +64,7 @@ private[spark] object RUtils {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
} else {
val sparkConf = SparkEnv.get.conf
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client"))
(sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
}

val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private object FaultToleranceTest extends App with Logging {

private val conf = new SparkConf()
private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
private val zkDir = conf.get(config.Deploy.ZOOKEEPER_DIRECTORY).getOrElse("/spark")

private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
Expand All @@ -87,8 +87,8 @@ private object FaultToleranceTest extends App with Logging {
terminateCluster()

// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
SparkCuratorUtil.deleteRecursive(zk, zkDir + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, zkDir + "/master_status")
}

test("sanity-basic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.zookeeper.KeeperException

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL

private[spark] object SparkCuratorUtil extends Logging {

Expand All @@ -35,7 +36,7 @@ private[spark] object SparkCuratorUtil extends Logging {

def newClient(
conf: SparkConf,
zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
zkUrlConf: String = ZOOKEEPER_URL.key): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
Expand Down
29 changes: 14 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ private[spark] class SparkSubmit extends Logging {
}

if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)
}

// In YARN mode for an R app, add the SparkR package archive and the R package
Expand Down Expand Up @@ -614,11 +614,11 @@ private[spark] class SparkSubmit extends Logging {
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set("spark.jars", jars.mkString(","))
sparkConf.set(JARS, jars)
}

// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
Expand Down Expand Up @@ -681,7 +681,7 @@ private[spark] class SparkSubmit extends Logging {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sparkConf.set("spark.submit.pyFiles", args.pyFiles)
sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
}
} else if (args.isR) {
// Second argument is main class
Expand Down Expand Up @@ -748,18 +748,17 @@ private[spark] class SparkSubmit extends Logging {
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)

(childArgs, childClasspath, sparkConf, childMainClass)
}
Expand Down
48 changes: 25 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
Expand All @@ -56,12 +58,12 @@ private[deploy] class Master(
// For application IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)

private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
private val retainedDrivers = conf.get(RETAINED_DRIVERS)
private val reaperIterations = conf.get(REAPER_ITERATIONS)
private val recoveryMode = conf.get(RECOVERY_MODE)
private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)

val workers = new HashSet[WorkerInfo]
val idToApp = new HashMap[String, ApplicationInfo]
Expand Down Expand Up @@ -113,13 +115,13 @@ private[deploy] class Master(
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
private val spreadOutApps = conf.get(SPREAD_OUT_APPS)

// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
private val defaultCores = conf.get(DEFAULT_CORES)
val reverseProxy = conf.get(UI_REVERSE_PROXY)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
throw new SparkException(s"${DEFAULT_CORES.key} must be positive")
}

// Alternative application submission gateway that is stable across Spark versions
Expand Down Expand Up @@ -151,7 +153,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)

if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
Expand All @@ -168,7 +170,7 @@ private[deploy] class Master(
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
Expand All @@ -179,7 +181,7 @@ private[deploy] class Master(
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
Expand Down Expand Up @@ -233,7 +235,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}, workerTimeoutMs, TimeUnit.MILLISECONDS)
}

case CompleteRecovery => completeRecovery()
Expand Down Expand Up @@ -311,8 +313,8 @@ private[deploy] class Master(
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
Expand Down Expand Up @@ -870,8 +872,8 @@ private[deploy] class Master(
endpointToApp -= app.driver
addressToApp -= app.driver.address

if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
if (completedApps.size >= retainedApplications) {
val toRemove = math.max(retainedApplications / 10, 1)
completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
Expand Down Expand Up @@ -989,14 +991,14 @@ private[deploy] class Master(
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
worker.id, workerTimeoutMs / 1000))
removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
Expand Down Expand Up @@ -1031,8 +1033,8 @@ private[deploy] class Master(
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
if (completedDrivers.size >= retainedDrivers) {
val toRemove = math.max(retainedDrivers / 10, 1)
completedDrivers.trimStart(toRemove)
}
completedDrivers += driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
import org.apache.spark.serializer.Serializer

/**
Expand Down Expand Up @@ -52,11 +53,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {

val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val recoveryDir = conf.get(RECOVERY_DIRECTORY)

def createPersistenceEngine(): PersistenceEngine = {
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
logInfo("Persisting recovery state to directory: " + recoveryDir)
new FileSystemPersistenceEngine(recoveryDir, serializer)
}

def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import org.apache.curator.framework.recipes.leader.{LeaderLatch, LeaderLatchList
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_DIRECTORY

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {

val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/leader_election"

private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
Expand All @@ -38,7 +39,7 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderEle
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch = new LeaderLatch(zk, workingDir)
leaderLatch.addListener(this)
leaderLatch.start()
}
Expand Down
Loading