diff --git a/bin/beeline b/bin/beeline index 09fe366c609f..f91aaa1bf52d 100755 --- a/bin/beeline +++ b/bin/beeline @@ -20,26 +20,5 @@ # Figure out where Spark is installed FWDIR="$(cd `dirname $0`/..; pwd)" -# Find the java binary -if [ -n "${JAVA_HOME}" ]; then - RUNNER="${JAVA_HOME}/bin/java" -else - if [ `command -v java` ]; then - RUNNER="java" - else - echo "JAVA_HOME is not set" >&2 - exit 1 - fi -fi - -# Compute classpath using external script -classpath_output=$($FWDIR/bin/compute-classpath.sh) -if [[ "$?" != "0" ]]; then - echo "$classpath_output" - exit 1 -else - CLASSPATH=$classpath_output -fi - CLASS="org.apache.hive.beeline.BeeLine" -exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@" +exec "$FWDIR/bin/spark-class" $CLASS "$@" diff --git a/bin/run-example b/bin/run-example index 942706d73312..fe2536d91904 100755 --- a/bin/run-example +++ b/bin/run-example @@ -55,4 +55,4 @@ fi --master $EXAMPLE_MASTER \ --class $EXAMPLE_CLASS \ "$SPARK_EXAMPLES_JAR" \ - "$@" + -- "$@" diff --git a/bin/run-example.cmd b/bin/run-example.cmd index 5b2d048d6ed5..cd63dbeb0bac 100644 --- a/bin/run-example.cmd +++ b/bin/run-example.cmd @@ -20,4 +20,4 @@ rem rem This is the entry point for running a Spark example. To avoid polluting rem the environment, it just launches a new cmd to do the real work. -cmd /V /E /C %~dp0run-example2.cmd %* +cmd /V /E /C %~dp0run-example2.cmd -- %* diff --git a/bin/run-example2.cmd b/bin/run-example2.cmd index eadedd7fa61f..cd6567a05efe 100644 --- a/bin/run-example2.cmd +++ b/bin/run-example2.cmd @@ -83,6 +83,6 @@ if defined ARGS set ARGS=%ARGS:~1% call "%FWDIR%bin\spark-submit.cmd" ^ --master %EXAMPLE_MASTER% ^ --class %EXAMPLE_CLASS% ^ - "%SPARK_EXAMPLES_JAR%" %ARGS% + "%SPARK_EXAMPLES_JAR%" -- %ARGS% :exit diff --git a/bin/spark-shell b/bin/spark-shell index 756c8179d12b..d8fbc3dc7975 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -46,11 +46,11 @@ function main(){ # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" - $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@" + $FWDIR/bin/spark-submit spark-shell --class org.apache.spark.repl.Main "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS - $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@" + $FWDIR/bin/spark-submit spark-shell --class org.apache.spark.repl.Main "$@" fi } diff --git a/bin/spark-sql b/bin/spark-sql index bba7f897b19b..81249e80a14c 100755 --- a/bin/spark-sql +++ b/bin/spark-sql @@ -26,11 +26,15 @@ set -o posix # Figure out where Spark is installed FWDIR="$(cd `dirname $0`/..; pwd)" -if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/spark-sql [options]" - $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 +CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver" + +if [[ "$@" = --help ]] || [[ "$@" = -H ]]; then + echo "Usage: ./sbin/spark-sql [options] [-- application options]" + exec "$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + echo + echo "Applicaion options:" + exec "$FWDIR"/bin/spark-submit spark-internal --class $CLASS -- -H 2>&1 | tail -n +3 exit 0 fi -CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver" -exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@ +exec "$FWDIR"/bin/spark-submit spark-internal --class $CLASS $@ diff --git a/bin/spark-submit b/bin/spark-submit index 9e7cecedd032..8d696107edc4 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -42,4 +42,3 @@ if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then fi exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" - diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 318509a67a36..fdc197fe14da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -259,7 +259,7 @@ object SparkSubmit { // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (clusterManager == YARN && deployMode == CLUSTER) { childMainClass = "org.apache.spark.deploy.yarn.Client" - if (args.primaryResource != SPARK_INTERNAL) { + if (isUserJar(args.primaryResource)) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index dd044e629876..fde4aec8419b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -208,9 +208,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { /** Fill in values by parsing user options. */ private def parseOpts(opts: Seq[String]): Unit = { - var inSparkOpts = true + val EQ_SEPARATED_OPT = """(--[^=]+)=(.+)""".r + val WILDCARD_OPT = """(-.*)""".r - // Delineates parsing of Spark options from parsing of user options. parse(opts) def parse(opts: Seq[String]): Unit = opts match { @@ -311,33 +311,29 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { verbose = true parse(tail) - case value :: tail => - if (inSparkOpts) { - value match { - // convert --foo=bar to --foo bar - case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 => - val parts = v.split("=") - parse(Seq(parts(0), parts(1)) ++ tail) - case v if v.startsWith("-") => - val errMessage = s"Unrecognized option '$value'." - SparkSubmit.printErrorAndExit(errMessage) - case v => - primaryResource = - if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) { - Utils.resolveURI(v).toString - } else { - v - } - inSparkOpts = false - isPython = SparkSubmit.isPython(v) - parse(tail) - } + case EQ_SEPARATED_OPT(opt, value) :: tail => + // convert --foo=bar to --foo bar + parse(opt :: value :: tail) + + case "--" :: tail => + if (primaryResource eq null) { + SparkSubmit.printErrorAndExit( + "User application option separator \"--\" must be after primary resource" + + "(i.e., application jar file or Python file).") + } + childArgs ++= tail.filter(_.nonEmpty) + + case WILDCARD_OPT(opt) :: tail => + SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.") + + case resource :: tail => + primaryResource = if (!SparkSubmit.isShell(resource) && !SparkSubmit.isInternal(resource)) { + Utils.resolveURI(resource).toString } else { - if (!value.isEmpty) { - childArgs += value - } - parse(tail) + resource } + isPython = SparkSubmit.isPython(resource) + parse(tail) case Nil => } @@ -349,7 +345,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { outStream.println("Unknown/unsupported param " + unknownParam) } outStream.println( - """Usage: spark-submit [options] [app options] + """Usage: spark-submit [options] [-- application options] + | + |NOTE: + | Start from Spark 1.1, an option separator "--" is required to separate spark-submit + | options and application options. + | |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. | --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 9190b05e2dba..312e66d0c531 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -100,12 +100,29 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--name", "myApp", "--class", "Foo", "userjar.jar", + "--", "some", "--weird", "args") val appArgs = new SparkSubmitArguments(clArgs) appArgs.childArgs should be (Seq("some", "--weird", "args")) } + test("handles extra spark options after user program options") { + val clArgs = Seq( + "--name", "myApp", + "--master", "local", + "userjar.jar", + "--class", "Foo", + "--", + "some", + "--weird", "args") + val appArgs = new SparkSubmitArguments(clArgs) + appArgs.master should be ("local") + appArgs.name should be ("myApp") + appArgs.mainClass should be ("Foo") + appArgs.childArgs should be (Seq("some", "--weird", "args")) + } + test("handles YARN cluster mode") { val clArgs = Seq( "--deploy-mode", "cluster", @@ -122,6 +139,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--name", "beauty", "--conf", "spark.shuffle.spill=false", "thejar.jar", + "--", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) @@ -160,6 +178,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--name", "trill", "--conf", "spark.shuffle.spill=false", "thejar.jar", + "--", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) @@ -192,6 +211,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--driver-cores", "5", "--conf", "spark.shuffle.spill=false", "thejar.jar", + "--", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) @@ -219,6 +239,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--driver-memory", "4g", "--conf", "spark.shuffle.spill=false", "thejar.jar", + "--", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) @@ -241,6 +262,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { "--driver-memory", "4g", "--conf", "spark.shuffle.spill=false", "thejar.jar", + "--", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh index 8398e6f19b51..b55193d6492b 100755 --- a/sbin/start-thriftserver.sh +++ b/sbin/start-thriftserver.sh @@ -26,11 +26,15 @@ set -o posix # Figure out where Spark is installed FWDIR="$(cd `dirname $0`/..; pwd)" -if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-thriftserver [options]" - $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 +CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2" + +if [[ "$1" = --help ]] || [[ "$1" = -h ]]; then + echo "Usage: ./sbin/start-thriftserver.sh [options] [-- application options]" + exec "$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + echo + echo "Applicaion options:" + exec "$FWDIR"/bin/spark-submit spark-internal --class $CLASS -- -H 2>&1 | tail -n +3 exit 0 fi -CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2" -exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@ +exec "$FWDIR"/bin/spark-submit spark-internal --class $CLASS $@ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index ddbc2a79fb51..2361ec51ed4a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -40,7 +40,6 @@ private[hive] object HiveThriftServer2 extends Logging { val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { - logger.warn("Error starting HiveThriftServer2 with given arguments") System.exit(-1) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 69f19f826a80..ce2aa9f96124 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,24 +18,30 @@ package org.apache.spark.sql.hive.thriftserver +import scala.collection.JavaConversions._ + import java.io.{BufferedReader, InputStreamReader, PrintWriter} +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.apache.spark.sql.catalyst.util.getTempFilePath + class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils { - val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli") - val METASTORE_PATH = TestUtils.getMetastorePath("cli") + val METASTORE_PATH = getTempFilePath("metastore") + val WAREHOUSE_PATH = getTempFilePath("warehouse") override def beforeAll() { - val pb = new ProcessBuilder( - "../../bin/spark-sql", - "--master", - "local", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH) - + val connectionUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true" + val args = + s"""../../bin/spark-sql + | --master local + | -- + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$connectionUrl + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH + """.stripMargin.split("\\s").filter(_.nonEmpty) + + val pb = new ProcessBuilder(seqAsJavaList(args)) process = pb.start() outputWriter = new PrintWriter(process.getOutputStream, true) inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index fe3403b3292e..8c765f9bf1b7 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -25,6 +25,7 @@ import java.io.{BufferedReader, InputStreamReader} import java.net.ServerSocket import java.sql.{Connection, DriverManager, Statement} +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.sql.Logging @@ -63,19 +64,19 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt // Forking a new process to start the Hive Thrift server. The reason to do this is it is // hard to clean up Hive resources entirely, so we just start a new process and kill // that process for cleanup. - val defaultArgs = Seq( - "../../sbin/start-thriftserver.sh", - "--master local", - "--hiveconf", - "hive.root.logger=INFO,console", - "--hiveconf", - s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true", - "--hiveconf", - s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH") - val pb = new ProcessBuilder(defaultArgs ++ args) - val environment = pb.environment() - environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString) - environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST) + val connectionUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true" + val defaultArgs = + s"""../../sbin/start-thriftserver.sh + | --master local + | -- + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$HOST + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$PORT + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$connectionUrl + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH + """.stripMargin.split("\\s").filter(_.nonEmpty) + + val pb = new ProcessBuilder(seqAsJavaList(defaultArgs) ++ args) process = pb.start() inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream))