Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions launcher/src/main/java/org/apache/spark/launcher/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -54,10 +55,12 @@ public static void main(String[] argsArray) throws Exception {
String className = args.remove(0);

boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
Map<String, String> env = new HashMap<>();
List<String> cmd;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
cmd = buildCommand(builder, env, printLaunchCommand);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
Expand All @@ -76,17 +79,12 @@ public static void main(String[] argsArray) throws Exception {
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
builder = new SparkSubmitCommandBuilder(help);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
cmd = buildCommand(builder, env, printLaunchCommand);
}
} else {
builder = new SparkClassCommandBuilder(className, args);
}

Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
cmd = buildCommand(builder, env, printLaunchCommand);
}

if (isWindows()) {
Expand All @@ -101,6 +99,22 @@ public static void main(String[] argsArray) throws Exception {
}
}

/**
* Prepare spark commands with the appropriate command builder.
* If printLaunchCommand is set then the commands will be printed to the stderr.
*/
private static List<String> buildCommand(
AbstractCommandBuilder builder,
Map<String, String> env,
boolean printLaunchCommand) throws IOException, IllegalArgumentException {
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
return cmd;
}

/**
* Prepare a command line for execution from a Windows batch script.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {

final List<String> userArgs;
private final List<String> parsedArgs;
private final boolean requiresAppResource;
// Special command means no appResource and no mainClass required
private final boolean isSpecialCommand;
private final boolean isExample;

/**
Expand All @@ -105,7 +106,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
* spark-submit argument list to be modified after creation.
*/
SparkSubmitCommandBuilder() {
this.requiresAppResource = true;
this.isSpecialCommand = false;
this.isExample = false;
this.parsedArgs = new ArrayList<>();
this.userArgs = new ArrayList<>();
Expand Down Expand Up @@ -138,25 +139,26 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {

case RUN_EXAMPLE:
isExample = true;
appResource = SparkLauncher.NO_RESOURCE;
submitArgs = args.subList(1, args.size());
}

this.isExample = isExample;
OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
this.requiresAppResource = parser.requiresAppResource;
this.isSpecialCommand = parser.isSpecialCommand;
} else {
this.isExample = isExample;
this.requiresAppResource = false;
this.isSpecialCommand = true;
}
}

@Override
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
if (PYSPARK_SHELL.equals(appResource) && requiresAppResource) {
if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
return buildPySparkShellCommand(env);
} else if (SPARKR_SHELL.equals(appResource) && requiresAppResource) {
} else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
return buildSparkRCommand(env);
} else {
return buildSparkSubmitCommand(env);
Expand All @@ -166,18 +168,18 @@ public List<String> buildCommand(Map<String, String> env)
List<String> buildSparkSubmitArgs() {
List<String> args = new ArrayList<>();
OptionParser parser = new OptionParser(false);
final boolean requiresAppResource;
final boolean isSpecialCommand;

// If the user args array is not empty, we need to parse it to detect exactly what
// the user is trying to run, so that checks below are correct.
if (!userArgs.isEmpty()) {
parser.parse(userArgs);
requiresAppResource = parser.requiresAppResource;
isSpecialCommand = parser.isSpecialCommand;
} else {
requiresAppResource = this.requiresAppResource;
isSpecialCommand = this.isSpecialCommand;
}

if (!allowsMixedArguments && requiresAppResource) {
if (!allowsMixedArguments && !isSpecialCommand) {
checkArgument(appResource != null, "Missing application resource.");
}

Expand Down Expand Up @@ -229,7 +231,7 @@ List<String> buildSparkSubmitArgs() {
args.add(join(",", pyFiles));
}

if (isExample) {
if (isExample && !isSpecialCommand) {
checkArgument(mainClass != null, "Missing example class name.");
}

Expand Down Expand Up @@ -421,7 +423,7 @@ private List<String> findExamplesJars() {

private class OptionParser extends SparkSubmitOptionParser {

boolean requiresAppResource = true;
boolean isSpecialCommand = false;
private final boolean errorOnUnknownArgs;

OptionParser(boolean errorOnUnknownArgs) {
Expand Down Expand Up @@ -470,17 +472,14 @@ protected boolean handle(String opt, String value) {
break;
case KILL_SUBMISSION:
case STATUS:
requiresAppResource = false;
isSpecialCommand = true;
parsedArgs.add(opt);
parsedArgs.add(value);
break;
case HELP:
case USAGE_ERROR:
requiresAppResource = false;
parsedArgs.add(opt);
break;
case VERSION:
requiresAppResource = false;
isSpecialCommand = true;
parsedArgs.add(opt);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.launcher;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,14 +28,20 @@

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.junit.Assert.*;

public class SparkSubmitCommandBuilderSuite extends BaseSuite {

private static File dummyPropsFile;
private static SparkSubmitOptionParser parser;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@BeforeClass
public static void setUp() throws Exception {
dummyPropsFile = File.createTempFile("spark", "properties");
Expand Down Expand Up @@ -74,8 +81,11 @@ public void testCliHelpAndNoArg() throws Exception {

@Test
public void testCliKillAndStatus() throws Exception {
testCLIOpts(parser.STATUS);
testCLIOpts(parser.KILL_SUBMISSION);
List<String> params = Arrays.asList("driver-20160531171222-0000");
testCLIOpts(null, parser.STATUS, params);
testCLIOpts(null, parser.KILL_SUBMISSION, params);
testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.STATUS, params);
testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.KILL_SUBMISSION, params);
}

@Test
Expand Down Expand Up @@ -190,6 +200,33 @@ public void testSparkRShell() throws Exception {
env.get("SPARKR_SUBMIT_ARGS"));
}

@Test(expected = IllegalArgumentException.class)
public void testExamplesRunnerNoArg() throws Exception {
List<String> sparkSubmitArgs = Arrays.asList(SparkSubmitCommandBuilder.RUN_EXAMPLE);
Map<String, String> env = new HashMap<>();
buildCommand(sparkSubmitArgs, env);
}

@Test
public void testExamplesRunnerNoMainClass() throws Exception {
testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.HELP, null);
testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.USAGE_ERROR, null);
testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.VERSION, null);
}

@Test
public void testExamplesRunnerWithMasterNoMainClass() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing example class name.");

List<String> sparkSubmitArgs = Arrays.asList(
SparkSubmitCommandBuilder.RUN_EXAMPLE,
parser.MASTER + "=foo"
);
Map<String, String> env = new HashMap<>();
buildCommand(sparkSubmitArgs, env);
}

@Test
public void testExamplesRunner() throws Exception {
List<String> sparkSubmitArgs = Arrays.asList(
Expand Down Expand Up @@ -344,10 +381,17 @@ private List<String> buildCommand(List<String> args, Map<String, String> env) th
return newCommandBuilder(args).buildCommand(env);
}

private void testCLIOpts(String opt) throws Exception {
List<String> helpArgs = Arrays.asList(opt, "driver-20160531171222-0000");
private void testCLIOpts(String appResource, String opt, List<String> params) throws Exception {
List<String> args = new ArrayList<>();
if (appResource != null) {
args.add(appResource);
}
args.add(opt);
if (params != null) {
args.addAll(params);
}
Map<String, String> env = new HashMap<>();
List<String> cmd = buildCommand(helpArgs, env);
List<String> cmd = buildCommand(args, env);
assertTrue(opt + " should be contained in the final cmd.",
cmd.contains(opt));
}
Expand Down