|
21 | 21 | import java.util.ArrayList; |
22 | 22 | import java.util.Arrays; |
23 | 23 | import java.util.HashMap; |
24 | | -import java.util.Iterator; |
25 | 24 | import java.util.List; |
26 | 25 | import java.util.Map; |
27 | 26 | import java.util.Properties; |
28 | | -import java.util.regex.Matcher; |
29 | | -import java.util.regex.Pattern; |
30 | 27 |
|
31 | 28 | import static org.apache.spark.launcher.CommandBuilderUtils.*; |
32 | 29 |
|
@@ -89,10 +86,44 @@ public List<String> buildCommand(Map<String, String> env) throws IOException { |
89 | 86 | if (PYSPARK_SHELL.equals(appResource)) { |
90 | 87 | return buildPySparkShellCommand(env); |
91 | 88 | } else { |
92 | | - return super.buildSparkSubmitCommand(env); |
| 89 | + return buildSparkSubmitCommand(env); |
93 | 90 | } |
94 | 91 | } |
95 | 92 |
|
| 93 | + private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException { |
| 94 | + // Load the properties file and check whether spark-submit will be running the app's driver |
| 95 | + // or just launching a cluster app. When running the driver, the JVM's argument will be |
| 96 | + // modified to cover the driver's configuration. |
| 97 | + Properties props = loadPropertiesFile(); |
| 98 | + boolean isClientMode = isClientMode(props); |
| 99 | + String extraClassPath = isClientMode ? find(DRIVER_EXTRA_CLASSPATH, conf, props) : null; |
| 100 | + |
| 101 | + List<String> cmd = buildJavaCommand(extraClassPath); |
| 102 | + addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); |
| 103 | + addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS")); |
| 104 | + |
| 105 | + if (isClientMode) { |
| 106 | + // Figuring out where the memory value come from is a little tricky due to precedence. |
| 107 | + // Precedence is observed in the following order: |
| 108 | + // - explicit configuration (setConf()), which also covers --driver-memory cli argument. |
| 109 | + // - properties file. |
| 110 | + // - SPARK_DRIVER_MEMORY env variable |
| 111 | + // - SPARK_MEM env variable |
| 112 | + // - default value (512m) |
| 113 | + String memory = firstNonEmpty(find(DRIVER_MEMORY, conf, props), |
| 114 | + System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); |
| 115 | + cmd.add("-Xms" + memory); |
| 116 | + cmd.add("-Xmx" + memory); |
| 117 | + addOptionString(cmd, find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props)); |
| 118 | + mergeEnvPathList(env, getLibPathEnvName(), find(DRIVER_EXTRA_LIBRARY_PATH, conf, props)); |
| 119 | + } |
| 120 | + |
| 121 | + addPermGenSizeOpt(cmd); |
| 122 | + cmd.add("org.apache.spark.deploy.SparkSubmit"); |
| 123 | + cmd.addAll(buildSparkSubmitArgs()); |
| 124 | + return cmd; |
| 125 | + } |
| 126 | + |
96 | 127 | private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException { |
97 | 128 | // For backwards compatibility, if a script is specified in |
98 | 129 | // the pyspark command line, then run it using spark-submit. |
@@ -139,6 +170,14 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO |
139 | 170 | return pyargs; |
140 | 171 | } |
141 | 172 |
|
| 173 | + private boolean isClientMode(Properties userProps) { |
| 174 | + String userMaster = firstNonEmpty(master, (String) userProps.get(SPARK_MASTER)); |
| 175 | + return userMaster == null || |
| 176 | + "client".equals(deployMode) || |
| 177 | + "yarn-client".equals(userMaster) || |
| 178 | + (deployMode == null && !userMaster.startsWith("yarn-")); |
| 179 | + } |
| 180 | + |
142 | 181 | /** |
143 | 182 | * Quotes a string so that it can be used in a command string and be parsed back into a single |
144 | 183 | * argument by python's "shlex.split()" function. |
@@ -172,19 +211,25 @@ protected boolean handle(String opt, String value) { |
172 | 211 | driverArgs.add(opt); |
173 | 212 | driverArgs.add(value); |
174 | 213 | } else if (opt.equals(DRIVER_MEMORY)) { |
175 | | - setConf(DRIVER_MEMORY, value); |
| 214 | + setConf(SparkLauncher.DRIVER_MEMORY, value); |
176 | 215 | driverArgs.add(opt); |
177 | 216 | driverArgs.add(value); |
178 | 217 | } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { |
179 | | - setConf(DRIVER_EXTRA_JAVA_OPTIONS, value); |
| 218 | + setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); |
180 | 219 | driverArgs.add(opt); |
181 | 220 | driverArgs.add(value); |
182 | 221 | } else if (opt.equals(DRIVER_LIBRARY_PATH)) { |
183 | | - setConf(DRIVER_EXTRA_LIBRARY_PATH, value); |
| 222 | + setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); |
184 | 223 | driverArgs.add(opt); |
185 | 224 | driverArgs.add(value); |
186 | 225 | } else if (opt.equals(DRIVER_CLASS_PATH)) { |
187 | | - setConf(DRIVER_EXTRA_CLASSPATH, value); |
| 226 | + setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); |
| 227 | + driverArgs.add(opt); |
| 228 | + driverArgs.add(value); |
| 229 | + } else if (opt.equals(CONF)) { |
| 230 | + String[] conf = value.split("=", 2); |
| 231 | + checkArgument(conf.length == 2, "Invalid argument to %s: %s", CONF, value); |
| 232 | + handleConf(conf[0], conf[1]); |
188 | 233 | driverArgs.add(opt); |
189 | 234 | driverArgs.add(value); |
190 | 235 | } else if (opt.equals(CLASS)) { |
@@ -227,6 +272,18 @@ protected void handleExtraArgs(List<String> extra) { |
227 | 272 | } |
228 | 273 | } |
229 | 274 |
|
| 275 | + private void handleConf(String key, String value) { |
| 276 | + List<String> driverJvmKeys = Arrays.asList( |
| 277 | + SparkLauncher.DRIVER_EXTRA_CLASSPATH, |
| 278 | + SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, |
| 279 | + SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, |
| 280 | + SparkLauncher.DRIVER_MEMORY); |
| 281 | + |
| 282 | + if (driverJvmKeys.contains(key)) { |
| 283 | + setConf(key, value); |
| 284 | + } |
| 285 | + } |
| 286 | + |
230 | 287 | } |
231 | 288 |
|
232 | 289 | } |
0 commit comments