Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)

Expand Down
67 changes: 43 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.File
import java.io.{PrintStream, File}
import java.net.URL

import org.apache.spark.executor.ExecutorURLClassLoader
Expand All @@ -32,38 +32,51 @@ import scala.collection.mutable.Map
* modes that Spark supports.
*/
object SparkSubmit {
val YARN = 1
val STANDALONE = 2
val MESOS = 4
val LOCAL = 8
val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

var clusterManager: Int = LOCAL
private var clusterManager: Int = LOCAL

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

// Exposed for testing
private[spark] var printStream: PrintStream = System.err
private[spark] var exitFn: () => Unit = () => System.exit(-1)

private[spark] def printErrorAndExit(str: String) = {
printStream.println("error: " + str)
printStream.println("run with --help for more information or --verbose for debugging output")
exitFn()
}
private[spark] def printWarning(str: String) = printStream.println("warning: " + str)

/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* entries for the child, and the main class for the child
*/
def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
if (appArgs.master.startsWith("yarn")) {
if (appArgs.master.startsWith("local")) {
clusterManager = LOCAL
} else if (appArgs.master.startsWith("yarn")) {
clusterManager = YARN
} else if (appArgs.master.startsWith("spark")) {
clusterManager = STANDALONE
} else if (appArgs.master.startsWith("mesos")) {
clusterManager = MESOS
} else if (appArgs.master.startsWith("local")) {
clusterManager = LOCAL
} else {
System.err.println("master must start with yarn, mesos, spark, or local")
System.exit(1)
printErrorAndExit("master must start with yarn, mesos, spark, or local")
}

// Because "yarn-standalone" and "yarn-client" encapsulate both the master
Expand All @@ -73,12 +86,10 @@ object SparkSubmit {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
System.exit(1)
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
System.exit(1)
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-standalone"
Expand All @@ -95,8 +106,7 @@ object SparkSubmit {
var childMainClass = ""

if (clusterManager == MESOS && deployOnCluster) {
System.err.println("Mesos does not support running the driver on the cluster")
System.exit(1)
printErrorAndExit("Mesos does not support running the driver on the cluster")
}

if (!deployOnCluster) {
Expand Down Expand Up @@ -174,8 +184,17 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}

def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String) {
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {

if (verbose) {
System.err.println(s"Main class:\n$childMainClass")
System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
System.err.println("\n")
}

val loader = new ExecutorURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
Expand All @@ -193,10 +212,10 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
}

def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
if (!localJarFile.exists()) {
System.err.println("Jar does not exist: " + localJar + ". Skipping.")
printWarning(s"Jar $localJar does not exist, skipping.")
}

val url = localJarFile.getAbsoluteFile.toURI.toURL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var verbose: Boolean = false

loadEnvVars()
parseArgs(args.toList)

def loadEnvVars() {
master = System.getenv("MASTER")
deployMode = System.getenv("DEPLOY_MODE")
parseOpts(args.toList)

// Sanity checks
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")

override def toString = {
s"""Parsed arguments:
| master $master
| deployMode $deployMode
| executorMemory $executorMemory
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| driverMemory $driverMemory
| drivercores $driverCores
| supervise $supervise
| queue $queue
| numExecutors $numExecutors
| files $files
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
| name $name
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| verbose $verbose
""".stripMargin
}

def parseArgs(args: List[String]) {
if (args.size == 0) {
printUsageAndExit(1)
System.exit(1)
}
primaryResource = args(0)
parseOpts(args.tail)
private def loadEnvVars() {
Option(System.getenv("MASTER")).map(master = _)
Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
}

def parseOpts(opts: List[String]): Unit = opts match {
private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
parseOpts(tail)
Expand All @@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {

case ("--deploy-mode") :: value :: tail =>
if (value != "client" && value != "cluster") {
System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
System.exit(1)
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
parseOpts(tail)
Expand Down Expand Up @@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case Nil =>
case ("--verbose" | "-v") :: tail =>
verbose = true
parseOpts(tail)

case _ =>
printUsageAndExit(1, opts)
case value :: tail =>
if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
SparkSubmit.printErrorAndExit(error)
}
primaryResource = value
parseOpts(tail)

case Nil =>
}

def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
outStream.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
outStream.println(
"""Usage: spark-submit <primary binary> [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
Expand Down Expand Up @@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
System.exit(exitCode)
SparkSubmit.exitFn()
}
}
61 changes: 59 additions & 2 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,71 @@

package org.apache.spark.deploy

import java.io.{OutputStream, PrintStream}

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.deploy.SparkSubmit._


class SparkSubmitSuite extends FunSuite with ShouldMatchers {

val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}

/** Simple PrintStream that reads data into a buffer */
class BufferPrintStream extends PrintStream(noOpOutputStream) {
var lineBuffer = ArrayBuffer[String]()
override def println(line: String) {
lineBuffer += line
}
}

/** Returns true if the script exits and the given search string is printed. */
def testPrematureExit(input: Array[String], searchString: String): Boolean = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream

@volatile var exitedCleanly = false
SparkSubmit.exitFn = () => exitedCleanly = true

val thread = new Thread {
override def run() = try {
SparkSubmit.main(input)
} catch {
// If exceptions occur after the "exit" has happened, fine to ignore them.
// These represent code paths not reachable during normal execution.
case e: Exception => if (!exitedCleanly) throw e
}
}
thread.start()
thread.join()
printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
}

test("prints usage on empty input") {
val clArgs = Array[String]()
// val appArgs = new SparkSubmitArguments(clArgs)
testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
}

test("prints usage with only --help") {
testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
}

test("handles multiple binary definitions") {
val adjacentJars = Array("foo.jar", "bar.jar")
testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)

val nonAdjacentJars =
Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
}

test("handle binary specified but not class") {
testPrematureExit(Array("foo.jar"), "must specify a main class")
}

test("handles YARN cluster mode") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
object Client {

def main(argStrings: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master yarn\"")

// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
object Client {

def main(argStrings: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master yarn\"")

// Set an env variable indicating we are running in YARN mode.
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
// see Client#setupLaunchEnv().
Expand Down