Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ while (($#)); do
shift
done

if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client" ]; then
export SPARK_MEM=$DRIVER_MEMORY
DEPLOY_MODE=${DEPLOY_MODE:-"client"}

if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
Expand Down
3 changes: 2 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down Expand Up @@ -322,7 +323,7 @@
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Classes that represent cleaning tasks.
Expand Down Expand Up @@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning() {
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case t: Throwable => logError("Error in cleaning thread", t)
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
Expand All @@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}

Expand All @@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}

Expand All @@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
} catch {
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
Expand Down Expand Up @@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
Expand All @@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand All @@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ object SparkEnv extends Logging {
val jvmInformation = Seq(
("Java Version", "%s (%s)".format(Properties.javaVersion, Properties.javaVendor)),
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
("Scala Version", Properties.versionString)
).sorted

// Spark properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
Array.empty[Byte]
null
}
} catch {

Expand All @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](

var _nextObj = read()

def hasNext = _nextObj.length != 0
def hasNext = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
Expand All @@ -170,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
this.interrupt()
}

override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
SparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
Expand Down Expand Up @@ -281,7 +282,6 @@ private[spark] object PythonRDD {
}
} catch {
case eof: EOFException => {}
case e: Throwable => throw e
}
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
stopDaemon()
startDaemon()
new Socket(daemonHost, daemonPort)
case e: Throwable => throw e
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Client {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object SparkHadoopUtil {
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
Expand Down
39 changes: 23 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ object SparkSubmit {
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
Expand Down Expand Up @@ -115,13 +114,16 @@ object SparkSubmit {
val sysProps = new HashMap[String, String]()
var childMainClass = ""

val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster

if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
if (args.isPython) {
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
Expand Down Expand Up @@ -161,35 +163,35 @@ object SparkSubmit {
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)

// For client mode make any added jars immediately visible on the classpath
Expand All @@ -212,21 +214,22 @@ object SparkSubmit {
}
}

// For standalone mode, add the application jar automatically so the user doesn't have to
// call sc.addJar. TODO: Standalone mode in the cluster
if (clusterManager == STANDALONE) {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
if (args.supervise) {
childArgs += "--supervise"
}

childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
Expand All @@ -243,16 +246,20 @@ object SparkSubmit {
}
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}

(childArgs, childClasspath, sysProps, childMainClass)
}

private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
{
private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class HistoryServer(
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread {
override def run() {
override def run(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
Expand Down Expand Up @@ -154,7 +154,7 @@ class HistoryServer(
numCompletedApplications = logInfos.size

} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
case e: Exception => logError("Exception in checking for event log updates", e)
}
} else {
logWarning("Attempted to check for event log updates before binding the server.")
Expand Down Expand Up @@ -231,8 +231,8 @@ class HistoryServer(
dir.getModificationTime
}
} catch {
case t: Throwable =>
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
case e: Exception =>
logError("Exception in accessing modification time of %s".format(dir.getPath), e)
-1L
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ private[spark] class Master(
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
case e: Exception =>
logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
}
} else {
logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object DriverWrapper {
case workerUrl :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

// Delegate to supplied main class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
Loading