diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd index f301606933a9..e2c69616d0a8 100644 --- a/bin/spark-submit.cmd +++ b/bin/spark-submit.cmd @@ -20,4 +20,4 @@ rem rem This is the entry point for running Spark submit. To avoid polluting the rem environment, it just launches a new cmd to do the real work. -cmd /V /E /C "%~dp0spark-submit2.cmd" %* +cmd /V /E /C ""%~dp0spark-submit2.cmd" %*" diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index f4376dedea72..19b12fbfaca7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -181,6 +181,11 @@ private[deploy] class DriverRunner( Files.append(header, stderr, StandardCharsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } + + if (Utils.isWindows) { + Utils.shortenClasspath(builder) + } + runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 06066248ea5d..82983f8bc39d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -145,6 +145,11 @@ private[deploy] class ExecutorRunner( val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() + + if (Utils.isWindows) { + Utils.shortenClasspath(builder) + } + val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") logInfo(s"Launch command: $formattedCommand") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ea49991493fd..4a4f7f3d6cac 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -24,6 +24,7 @@ import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.charset.StandardCharsets import java.nio.file.Files +import java.util import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection @@ -109,7 +110,7 @@ private[spark] object Utils extends Logging { } /** Deserialize a Long value (used for [[org.apache.spark.api.python.PythonPartitioner]]) */ - def deserializeLongValue(bytes: Array[Byte]) : Long = { + def deserializeLongValue(bytes: Array[Byte]): Long = { // Note: we assume that we are given a Long value encoded in network (big-endian) byte order var result = bytes(7) & 0xFFL result = result + ((bytes(6) & 0xFFL) << 8) @@ -1090,6 +1091,50 @@ private[spark] object Utils extends Logging { bytesToString(megabytes * 1024L * 1024L) } + + /** + * Create a jar file at the given path, containing a manifest with a classpath + * that references all specified entries. + */ + def createShortClassPath(tempDir: File, classPath: String): String = { + if (isWindows) { + val env = new util.HashMap[String, String](System.getenv()) + val javaCps = FileUtil + .createJarWithClassPath(classPath, new Path(tempDir.getAbsolutePath), env) + val javaCpStr = javaCps(0) + javaCps(1) + logInfo("Shorten the class path to: " + javaCpStr) + javaCpStr + } else { + classPath + } + } + + def createShortClassPath(classPath: String): String = { + val tempDir = createTempDir("classpaths") + createShortClassPath(tempDir, classPath) + } + + /** + * Create a jar file at the given path, containing a manifest with a classpath + * that references all specified entries. + */ + def shortenClasspath(builder: ProcessBuilder): Unit = { + if (builder.command.asScala.mkString("\"", "\" \"", "\"").length > 8190) { + logWarning("Cmd too long, try to shorten the classpath") + // look for the class path + // note that environment set in teh ProcessBuilder is process-local. So it + // won't pollute the environment + val command = builder.command() + val idxCp = command.indexOf("-cp") + if (idxCp > 0 && idxCp + 1 < command.size()) { + val classPath = command.get(idxCp + 1) + val shortPath = createShortClassPath(classPath) + command.set(idxCp + 1, shortPath) + } + } + } + + /** * Execute a command and return the process running the command. */ @@ -1103,6 +1148,11 @@ private[spark] object Utils extends Logging { for ((key, value) <- extraEnvironment) { environment.put(key, value) } + + if (Utils.isWindows) { + Utils.shortenClasspath(builder) + } + val process = builder.start() if (redirectStderr) { val threadName = "redirect stderr for command " + command(0) diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 3e47bfc274cb..02543407b6f0 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -17,16 +17,20 @@ package org.apache.spark.launcher; +import java.io.File; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.SystemUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; import static org.junit.Assert.*; +import org.apache.spark.util.Utils; + /** * These tests require the Spark assembly to be built before they can be run. */ @@ -99,15 +103,28 @@ public void testChildProcLauncher() throws Exception { String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)) .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Dfoo=bar -Dtest.appender=childproc") - .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow") .setMainClass(SparkLauncherTestApp.class.getName()) .addAppArgs("proc"); - final Process app = launcher.launch(); - new OutputRedirector(app.getInputStream(), TF); - new OutputRedirector(app.getErrorStream(), TF); - assertEquals(0, app.waitFor()); + File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); + try { + if (SystemUtils.IS_OS_WINDOWS) { + launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, + Utils.createShortClassPath(tempDir, System.getProperty("java.class.path"))); + } else { + launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")); + } + final Process app = launcher.launch(); + + new OutputRedirector(app.getInputStream(), TF); + new OutputRedirector(app.getErrorStream(), TF); + assertEquals(0, app.waitFor()); + } finally { + if(tempDir.exists()) { + Utils.deleteRecursively(tempDir); + } + } } public static class SparkLauncherTestApp { diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala index 713560d3ddfa..ffca8c58f6dc 100644 --- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.util.Utils class LauncherBackendSuite extends SparkFunSuite with Matchers { @@ -33,6 +34,8 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers { "local" -> "local", "standalone/client" -> "local-cluster[1,1,1024]") + val tempDir = Utils.createTempDir() + tests.foreach { case (name, master) => test(s"$name: launcher handle") { testWithMaster(master) @@ -42,16 +45,22 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers { private def testWithMaster(master: String): Unit = { val env = new java.util.HashMap[String, String]() env.put("SPARK_PRINT_LAUNCH_COMMAND", "1") - val handle = new SparkLauncher(env) + + val launcher = new SparkLauncher(env) .setSparkHome(sys.props("spark.test.home")) - .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) .setConf("spark.ui.enabled", "false") .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console") .setMaster(master) .setAppResource("spark-internal") .setMainClass(TestApp.getClass.getName().stripSuffix("$")) - .startApplication() + if (Utils.isWindows) { + launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, + Utils.createShortClassPath(tempDir, System.getProperty("java.class.path"))) + } else { + launcher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) + } + val handle = launcher.startApplication() try { eventually(timeout(30 seconds), interval(100 millis)) { handle.getAppId() should not be (null) diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index c7488082ca89..ea46c2da9ba5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -18,8 +18,10 @@ package org.apache.spark.launcher; import java.io.BufferedReader; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.InputStreamReader; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -28,8 +30,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; import java.util.Properties; import java.util.regex.Pattern; +import java.util.regex.Matcher; import static org.apache.spark.launcher.CommandBuilderUtils.*; @@ -53,6 +59,8 @@ abstract class AbstractCommandBuilder { final Map childEnv; final Map conf; + private static File classPathShortDir = new File(System.getProperty("java.io.tmpdir"), "spark-classpath"); + // The merged configuration for the application. Cached to avoid having to read / parse // properties files multiple times. private Map effectiveConfig; @@ -115,10 +123,159 @@ List buildJavaCommand(String extraClassPath) throws IOException { } cmd.add("-cp"); - cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath))); + List classPathEntries = buildClassPath(extraClassPath); + String classPath = null; + if(isWindows()) { + String[] jarCp = createJarWithClassPath( + classPathEntries.toArray(new String[classPathEntries.size()]), + classPathShortDir, + new HashMap<>(System.getenv())); + classPath = jarCp[0] + File.pathSeparator + jarCp[1]; + } else { + classPath = join(File.pathSeparator, classPathEntries); + } + cmd.add(classPath); return cmd; } + /** + * Create a jar file at the given path, containing a manifest with a classpath + * that references all specified entries. + * + * Some platforms may have an upper limit on command line length. For example, + * the maximum command line length on Windows is 8191 characters, but the + * length of the classpath may exceed this. To work around this limitation, + * use this method to create a small intermediate jar with a manifest that + * contains the full classpath. It returns the absolute path to the new jar, + * which the caller may set as the classpath for a new process. + * + * Environment variable evaluation is not supported within a jar manifest, so + * this method expands environment variables before inserting classpath entries + * to the manifest. The method parses environment variables according to + * platform-specific syntax (%VAR% on Windows, or $VAR otherwise). On Windows, + * environment variables are case-insensitive. For example, %VAR% and %var% + * evaluate to the same value. + * + * Specifying the classpath in a jar manifest does not support wildcards, so + * this method expands wildcards internally. Any classpath entry that ends + * with * is translated to all files at that path with extension .jar or .JAR. + * + * This method is adapted from the Hadoop-common-project FSUtils#createJarWithClassPath + * Reimplement the mehtod here to avoid heavy dependencies + * + * @param classPathEntries String input classpath to bundle into the jar manifest + * @param workingDir Path to working directory to save jar + * @param callerEnv Map caller's environment variables to use + * for expansion + * @return String[] with absolute path to new jar in position 0 and + * unexpanded wild card entry path in position 1 + * @throws IOException if there is an I/O error while writing the jar file + */ + public static String[] createJarWithClassPath(String[] classPathEntries, File workingDir, + Map callerEnv) throws IOException { + // Replace environment variables, case-insensitive on Windows + Map env = new HashMap<>(); + if (isWindows()) { + for(Map.Entry entry : callerEnv.entrySet()) { + env.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } else { + env = callerEnv; + } + + // expand the environment variables + Pattern envVarPattern = isWindows() ? Pattern.compile("%(.*?)%") : + Pattern.compile("\\$([A-Za-z_]{1}[A-Za-z0-9_]*)"); + for (int i = 0; i < classPathEntries.length; ++i) { + String template = classPathEntries[i]; + StringBuffer sb = new StringBuffer(); + Matcher matcher = envVarPattern.matcher(template); + while (matcher.find()) { + String replacement = env.get(matcher.group(1)); + if (replacement == null) { + replacement = ""; + } + matcher.appendReplacement(sb, Matcher.quoteReplacement(replacement)); + } + matcher.appendTail(sb); + classPathEntries[i] = sb.toString(); + } + + if (!workingDir.exists()) { + workingDir.mkdirs(); + } + + StringBuilder unexpandedWildcardClasspath = new StringBuilder(); + // Append all entries + List classPathEntryList = new ArrayList( + classPathEntries.length); + for (String classPathEntry: classPathEntries) { + if (classPathEntry.length() == 0) { + continue; + } + if (classPathEntry.endsWith("*")) { + boolean foundWildCardJar = false; + // Append all jars that match the wildcard + File[] files = new File(classPathEntry.substring(0, classPathEntry.length() - 2)).listFiles(); + if (files != null) { + for (File f : files) { + if (f.getName().toLowerCase().endsWith("jar")) { + foundWildCardJar = true; + classPathEntryList.add(f.getAbsoluteFile().toURI().toURL().toExternalForm()); + } + } + } + + if (!foundWildCardJar) { + unexpandedWildcardClasspath.append(File.pathSeparator); + unexpandedWildcardClasspath.append(classPathEntry); + } + } else { + // Append just this entry + File fileCpEntry = new File(classPathEntry); + String classPathEntryUrl = fileCpEntry.toURI().toURL() + .toExternalForm(); + + // File.toURI only appends trailing '/' if it can determine that it is a + // directory that already exists. (See JavaDocs.) If this entry had a + // trailing '/' specified by the caller, then guarantee that the + // classpath entry in the manifest has a trailing '/', and thus refers to + // a directory instead of a file. This can happen if the caller is + // creating a classpath jar referencing a directory that hasn't been + // created yet, but will definitely be created before running. + if (classPathEntry.endsWith("/") && + !classPathEntryUrl.endsWith("/")) { + classPathEntryUrl = classPathEntryUrl + "/"; + } + classPathEntryList.add(classPathEntryUrl); + } + } + String jarClassPath = join(" ", classPathEntryList); + + // Create the manifest + Manifest jarManifest = new Manifest(); + jarManifest.getMainAttributes().putValue( + Attributes.Name.MANIFEST_VERSION.toString(), "1.0"); + jarManifest.getMainAttributes().putValue( + Attributes.Name.CLASS_PATH.toString(), jarClassPath); + + // Write the manifest to output JAR file + File classPathJar = File.createTempFile("classpath-", ".jar", workingDir); + FileOutputStream fos = null; + BufferedOutputStream bos = null; + JarOutputStream jos = null; + try { + fos = new FileOutputStream(classPathJar); + bos = new BufferedOutputStream(fos); + jos = new JarOutputStream(bos, jarManifest); + } finally { + jos.close(); + } + String[] jarCp = {classPathJar.getCanonicalPath(), + unexpandedWildcardClasspath.toString()}; + return jarCp; + } + void addOptionString(List cmd, String options) { if (!isEmpty(options)) { for (String opt : parseOptionString(options)) { diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index c7e8b2e03a9f..3a641342aded 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarFile; +import java.util.jar.Manifest; import java.util.regex.Pattern; import org.junit.AfterClass; @@ -66,7 +69,7 @@ public void testCliParser() throws Exception { parser.DRIVER_MEMORY, "42g", parser.DRIVER_CLASS_PATH, - "/driverCp", + CommandBuilderUtils.isWindows() ? "D:/driverCp" : "/driverCp", parser.DRIVER_JAVA_OPTIONS, "extraJavaOpt", parser.CONF, @@ -78,7 +81,25 @@ public void testCliParser() throws Exception { assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()), File.pathSeparator, "/driverLibPath")); - assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp")); + if (!CommandBuilderUtils.isWindows()) + { + assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp")); + } else { + String[] cp = findArgValue(cmd, "-cp").split(File.pathSeparator); + JarFile jarFile = null; + try { + jarFile = new JarFile(cp[0]); + Manifest jarManifest = jarFile.getManifest(); + Attributes attrs = jarManifest.getMainAttributes(); + String classPath = attrs.getValue(Attributes.Name.CLASS_PATH); + assertTrue(contains("file:/D:/driverCp", classPath.split(" "))); + } finally { + if (jarFile != null) { + jarFile.close(); + } + + } + } assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g")); assertTrue("Command should contain user-defined conf.", Collections.indexOfSubList(cmd, Arrays.asList(parser.CONF, "spark.randomOption=foo")) > 0); @@ -187,7 +208,7 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th if (!useDefaultPropertyFile) { launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath()); launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g"); - launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver"); + launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, CommandBuilderUtils.isWindows() ? "D:/driver": "/driver"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native"); } else { @@ -220,10 +241,30 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th } String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator)); - if (isDriver) { - assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp)); + if (!CommandBuilderUtils.isWindows()) { + if (isDriver) { + assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp)); + } else { + assertFalse("Driver classpath should not be in command.", contains("/driver", cp)); + } } else { - assertFalse("Driver classpath should not be in command.", contains("/driver", cp)); + JarFile jarFile = null; + try { + jarFile = new JarFile(cp[0]); + Manifest jarManifest = jarFile.getManifest(); + Attributes attrs = jarManifest.getMainAttributes(); + String classPath = attrs.getValue(Attributes.Name.CLASS_PATH); + if (isDriver) { + assertTrue(contains("file:/D:/driver", classPath.split(" "))); + } else { + assertFalse(contains("file:/D:/driver", classPath.split(" "))); + } + } finally { + if (jarFile != null) { + jarFile.close(); + } + + } } String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());