diff --git a/core/src/main/scala/org/apache/spark/SparkApp.scala b/core/src/main/scala/org/apache/spark/SparkApp.scala new file mode 100644 index 000000000000..9d8bb01070fe --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkApp.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * An interface that can be implemented by applications launched by SparkSubmit + * which exposes the Spark job configuration explicitly. + */ +private[spark] trait SparkApp { + this: Singleton => + + /** + * Method executed by SparkSubmit to run the application. + * + * @param args - all arguments for SparkApp. + * @param conf - Spark Configuration. + */ + def sparkMain(args: Array[String], conf: Map[String, String]): Unit + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77005aa9040b..fd4fdff5cb3e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -691,10 +691,6 @@ object SparkSubmit extends CommandLineUtils { addJarToClasspath(jar, loader) } - for ((key, value) <- sysProps) { - System.setProperty(key, value) - } - var mainClass: Class[_] = null try { @@ -725,9 +721,15 @@ object SparkSubmit extends CommandLineUtils { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } - val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) - if (!Modifier.isStatic(mainMethod.getModifiers)) { - throw new IllegalStateException("The main method in the given main class must be static") + val sparkAppMainMethod = mainClass.getMethods().find(_.getName == "sparkMain") + val childSparkConf = sysProps.filter { p => p._1.startsWith("spark.") }.toMap + + // If running a SparkApp we can explicitly pass in the confs separately. + // If we aren't running a SparkApp they get passed via the system properties. + if (sparkAppMainMethod.isEmpty) { + sysProps.foreach { case (key, value) => + System.setProperty(key, value) + } } @tailrec @@ -741,7 +743,17 @@ object SparkSubmit extends CommandLineUtils { } try { - mainMethod.invoke(null, childArgs.toArray) + if (sparkAppMainMethod.isDefined) { + sparkAppMainMethod.get.invoke(null, childArgs.toArray, childSparkConf) + } else { + val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) + + if (!Modifier.isStatic(mainMethod.getModifiers)) { + throw new IllegalStateException("The main method in the given main class must be static") + } + + mainMethod.invoke(null, childArgs.toArray) + } } catch { case t: Throwable => findCause(t) match { diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala index a5d41a1eeb47..f6a141e3ce07 100644 --- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -17,11 +17,13 @@ package org.apache.spark.launcher +import java.io.IOException import java.net.{InetAddress, Socket} import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging import org.apache.spark.launcher.LauncherProtocol._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} /** * A class that can be used to talk to a launcher server. Users should extend this class to @@ -29,23 +31,45 @@ import org.apache.spark.util.{ThreadUtils, Utils} * * See `LauncherServer` for an explanation of how launcher communication works. */ -private[spark] abstract class LauncherBackend { +private[spark] abstract class LauncherBackend extends Logging { private var clientThread: Thread = _ private var connection: BackendConnection = _ private var lastState: SparkAppHandle.State = _ + private var stopOnShutdown: Boolean = false @volatile private var _isConnected = false def connect(): Unit = { val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) + val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_IF_SHUTDOWN).map(_.toBoolean) if (port != None && secret != None) { - val s = new Socket(InetAddress.getLoopbackAddress(), port.get) - connection = new BackendConnection(s) - connection.send(new Hello(secret.get, SPARK_VERSION)) - clientThread = LauncherBackend.threadFactory.newThread(connection) - clientThread.start() - _isConnected = true + connect(port.get, secret.get, stopFlag.getOrElse(false)) + } + } + + def connect(port: Int, secret: String, stopFlag: Boolean): Unit = { + this.stopOnShutdown = stopFlag + val s = new Socket(InetAddress.getLoopbackAddress(), port) + connection = new BackendConnection(s) + connection.send(new Hello(secret, SPARK_VERSION)) + clientThread = LauncherBackend.threadFactory.newThread(connection) + clientThread.start() + _isConnected = true + if (stopOnShutdown) { + logDebug("Adding shutdown hook") // force eager creation of logger + ShutdownHookManager.addShutdownHook( + ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => + logInfo("Invoking onStopRequest() from shutdown hook") + try { + if (_isConnected) { + onStopRequest() + } + } catch { + case ioException: IOException => + logError("Error while running LauncherBackend shutdownHook...", ioException) + } + } } } @@ -110,12 +134,14 @@ private[spark] abstract class LauncherBackend { override def close(): Unit = { try { super.close() + if (stopOnShutdown) { + fireStopRequest() + } } finally { onDisconnected() _isConnected = false } } - } } diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 0c7712374085..8ed3742ee704 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; + +import static java.lang.Thread.sleep; import static org.junit.Assert.*; import static org.junit.Assume.*; @@ -183,12 +185,38 @@ public void testChildProcLauncher() throws Exception { assertEquals(0, app.waitFor()); } + @Test + public void testThreadLauncher() throws Exception { + // This test is failed on Windows due to the failure of initiating executors + // by the path length limitation. See SPARK-18718. + assumeTrue(!Utils.isWindows()); + + launcher = new SparkLauncher(); + launcher + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, + "-Dfoo=bar -Dtest.appender=childproc") + .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) + .setMainClass(SparkLauncherTestApp.class.getName()) + .launchAsThread(true) + .addAppArgs("thread"); + final SparkAppHandle app = launcher.startApplication(); + sleep(3000); + AbstractSparkAppHandle handle = (AbstractSparkAppHandle)app; + handle.waitFor(); + } + public static class SparkLauncherTestApp { public static void main(String[] args) throws Exception { assertEquals(1, args.length); - assertEquals("proc", args[0]); - assertEquals("bar", System.getProperty("foo")); + if ("proc".equalsIgnoreCase(args[0])) { + assertEquals("proc", args[0]); + assertEquals("bar", System.getProperty("foo")); + } else { + assertEquals(null, System.getProperty("foo")); + } assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER)); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java new file mode 100644 index 000000000000..5e25d7553ec4 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractSparkAppHandle.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +abstract class AbstractSparkAppHandle implements SparkAppHandle { + private static final Logger LOG = Logger.getLogger(AbstractSparkAppHandle.class.getName()); + + protected final String secret; + protected final LauncherServer server; + protected boolean disposed; + protected List listeners; + protected State state; + private LauncherConnection connection; + private String appId; + OutputRedirector redirector; + + public AbstractSparkAppHandle(LauncherServer server, String secret) { + this.server = server; + this.secret = secret; + this.state = State.UNKNOWN; + } + + @Override + public synchronized void addListener(Listener l) { + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(l); + } + + @Override + public State getState() { + return state; + } + + @Override + public String getAppId() { + return appId; + } + + @Override + public void stop() { + CommandBuilderUtils.checkState(connection != null, "Application is still not connected."); + try { + connection.send(new LauncherProtocol.Stop()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public synchronized void disconnect() { + if (!disposed) { + disposed = true; + if (connection != null) { + try { + connection.close(); + } catch (IOException ioe) { + // no-op. + } + } + server.unregister(this); + if (redirector != null) { + redirector.stop(); + } + } + } + + String getSecret() { + return secret; + } + + void setConnection(LauncherConnection connection) { + this.connection = connection; + } + + LauncherServer getServer() { + return server; + } + + LauncherConnection getConnection() { + return connection; + } + + void setState(State s) { + if (!state.isFinal()) { + state = s; + fireEvent(false); + } else { + LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", + new Object[]{state, s}); + } + } + + void setAppId(String appId) { + this.appId = appId; + fireEvent(true); + } + + private synchronized void fireEvent(boolean isInfoChanged) { + if (listeners != null) { + for (Listener l : listeners) { + if (isInfoChanged) { + l.infoChanged(this); + } else { + l.stateChanged(this); + } + } + } + } + + abstract void waitFor() throws InterruptedException; +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 12bf29d3b1aa..5f2fd946815b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -26,71 +26,14 @@ /** * Handle implementation for monitoring apps started as a child process. */ -class ChildProcAppHandle implements SparkAppHandle { +class ChildProcAppHandle extends AbstractSparkAppHandle { private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); - private final String secret; - private final LauncherServer server; - - private Process childProc; - private boolean disposed; - private LauncherConnection connection; - private List listeners; - private State state; - private String appId; - private OutputRedirector redirector; + protected Process childProc; ChildProcAppHandle(String secret, LauncherServer server) { - this.secret = secret; - this.server = server; - this.state = State.UNKNOWN; - } - - @Override - public synchronized void addListener(Listener l) { - if (listeners == null) { - listeners = new ArrayList<>(); - } - listeners.add(l); - } - - @Override - public State getState() { - return state; - } - - @Override - public String getAppId() { - return appId; - } - - @Override - public void stop() { - CommandBuilderUtils.checkState(connection != null, "Application is still not connected."); - try { - connection.send(new LauncherProtocol.Stop()); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - @Override - public synchronized void disconnect() { - if (!disposed) { - disposed = true; - if (connection != null) { - try { - connection.close(); - } catch (IOException ioe) { - // no-op. - } - } - server.unregister(this); - if (redirector != null) { - redirector.stop(); - } - } + super(server, secret); } @Override @@ -109,53 +52,13 @@ public synchronized void kill() { } } - String getSecret() { - return secret; - } - void setChildProc(Process childProc, String loggerName) { this.childProc = childProc; this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName, SparkLauncher.REDIRECTOR_FACTORY); } - void setConnection(LauncherConnection connection) { - this.connection = connection; + void waitFor() throws InterruptedException { + this.childProc.waitFor(); } - - LauncherServer getServer() { - return server; - } - - LauncherConnection getConnection() { - return connection; - } - - void setState(State s) { - if (!state.isFinal()) { - state = s; - fireEvent(false); - } else { - LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", - new Object[] { state, s }); - } - } - - void setAppId(String appId) { - this.appId = appId; - fireEvent(true); - } - - private synchronized void fireEvent(boolean isInfoChanged) { - if (listeners != null) { - for (Listener l : listeners) { - if (isInfoChanged) { - l.infoChanged(this); - } else { - l.stateChanged(this); - } - } - } - } - } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java new file mode 100644 index 000000000000..79d61e67ab04 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildThreadAppHandle.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.util.logging.Logger; + +public class ChildThreadAppHandle extends AbstractSparkAppHandle { + private static final Logger LOG = Logger.getLogger(ChildThreadAppHandle.class.getName()); + + private Thread childThread; + + public ChildThreadAppHandle(String secret, LauncherServer server) { + super(server, secret); + } + + @Override + public synchronized void kill() { + if (!disposed) { + disconnect(); + } + if (childThread != null) { + try { + childThread.join(3000); + } catch (InterruptedException e) { + try { + childThread.interrupt(); + } catch (Exception inner) { + LOG.info("Failed to stop Thread: " + inner.getMessage()); + } + } finally { + childThread = null; + } + } + } + + void setChildThread(Thread childThread) { + this.childThread = childThread; + } + + @Override + void waitFor() throws InterruptedException { + if (this.childThread.isAlive()) { + this.childThread.join(); + } + } +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java index 042f11cd9e43..84980b6cc3f1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java @@ -32,6 +32,9 @@ final class LauncherProtocol { /** Environment variable where the secret for connecting back to the server is stored. */ static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET"; + /** Environment variable where the stop if Launcher is stored. */ + static final String ENV_LAUNCHER_STOP_IF_SHUTDOWN = "_SPARK_LAUNCHER_STOP_IF_SHUTDOWN"; + static class Message implements Serializable { } diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index 865d4926da6a..242b8985ca4e 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -88,12 +88,32 @@ class LauncherServer implements Closeable { private static volatile LauncherServer serverInstance; + /** + * Creates a handle for an app to be launched using ChildProc. + * This method will start a server if one hasn't been started yet. + * The server is shared for multiple handles, and once all handles + * are disposed of, the server is shut down. + */ + static synchronized ChildProcAppHandle newAppHandle() throws IOException { + return (ChildProcAppHandle) LauncherServer.newAppHandle(false); + } + + /** + * Creates a handle for an app to be launched from a Thread within the current JVM. + * This method will start a server if one hasn't been + * started yet. The server is shared for multiple handles, and once all handles are disposed of, + * the server is shut down. + */ + static synchronized ChildThreadAppHandle newAppThreadHandle() throws IOException { + return (ChildThreadAppHandle) LauncherServer.newAppHandle(true); + } + /** * Creates a handle for an app to be launched. This method will start a server if one hasn't been * started yet. The server is shared for multiple handles, and once all handles are disposed of, * the server is shut down. */ - static synchronized ChildProcAppHandle newAppHandle() throws IOException { + static synchronized AbstractSparkAppHandle newAppHandle(boolean isThreadHandle) throws IOException { LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer(); server.ref(); serverInstance = server; @@ -102,8 +122,7 @@ static synchronized ChildProcAppHandle newAppHandle() throws IOException { while (server.pending.containsKey(secret)) { secret = server.createSecret(); } - - return server.newAppHandle(secret); + return server.newAppHandle(secret, isThreadHandle); } static LauncherServer getServerInstance() { @@ -112,7 +131,7 @@ static LauncherServer getServerInstance() { private final AtomicLong refCount; private final AtomicLong threadIds; - private final ConcurrentMap pending; + private final ConcurrentMap pending; private final List clients; private final ServerSocket server; private final Thread serverThread; @@ -152,9 +171,14 @@ private LauncherServer() throws IOException { * Creates a new app handle. The handle will wait for an incoming connection for a configurable * amount of time, and if one doesn't arrive, it will transition to an error state. */ - ChildProcAppHandle newAppHandle(String secret) { - ChildProcAppHandle handle = new ChildProcAppHandle(secret, this); - ChildProcAppHandle existing = pending.putIfAbsent(secret, handle); + AbstractSparkAppHandle newAppHandle(String secret, boolean isThreadHandle) { + AbstractSparkAppHandle handle; + if (isThreadHandle) { + handle = new ChildThreadAppHandle(secret, this); + } else { + handle = new ChildProcAppHandle(secret, this); + } + AbstractSparkAppHandle existing = pending.putIfAbsent(secret, handle); CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret."); return handle; } @@ -175,6 +199,7 @@ public void close() throws IOException { } } } + if (serverThread != null) { try { serverThread.join(); @@ -210,7 +235,7 @@ int getPort() { * Removes the client handle from the pending list (in case it's still there), and unrefs * the server. */ - void unregister(ChildProcAppHandle handle) { + void unregister(AbstractSparkAppHandle handle) { pending.remove(handle.getSecret()); unref(); } @@ -277,7 +302,7 @@ private String createSecret() { private class ServerConnection extends LauncherConnection { private TimerTask timeout; - private ChildProcAppHandle handle; + private AbstractSparkAppHandle handle; ServerConnection(Socket socket, TimerTask timeout) throws IOException { super(socket); @@ -291,7 +316,7 @@ protected void handle(Message msg) throws IOException { timeout.cancel(); timeout = null; Hello hello = (Hello) msg; - ChildProcAppHandle handle = pending.remove(hello.secret); + AbstractSparkAppHandle handle = pending.remove(hello.secret); if (handle != null) { handle.setConnection(this); handle.setState(SparkAppHandle.State.CONNECTED); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index cefb4d1a95fb..1ba12078d211 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -95,7 +95,9 @@ public boolean isFinal() { void kill(); /** - * Disconnects the handle from the application, without stopping it. After this method is called, + * Disconnects the handle from the application. + * If using {@link SparkLauncher#autoShutdown(boolean)}} option, this method will + * also stop the child Spark application. After this method is called, * the handle will not be able to communicate with the application anymore. */ void disconnect(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index ea56214d2390..071af53c6fb4 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,12 @@ public class SparkLauncher { /** Logger name to use when launching a child process. */ public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"; + static final String LAUNCHER_INTERNAL_PORT = "spark.launcher.internal.port"; + + static final String LAUNCHER_INTERNAL_CHILD_PROCESS_SECRET = "spark.launcher.internal.secret"; + + static final String LAUNCHER_INTERNAL_STOP_ON_SHUTDOWN = "spark.launcher.internal.stop.on.shutdown"; + /** * A special value for the resource that tells Spark to not try to process the app resource as a * file. This is useful when the class being executed is added to the application using other @@ -94,6 +101,11 @@ public class SparkLauncher { static final Map launcherConfig = new HashMap<>(); + private boolean autoShutdown = false; + + /** Flag to decide on launching spark-submit as a child process or a thread **/ + private boolean launchAsThread = false; + /** * Set a configuration value for the launcher library. These config values do not affect the * launched application, but rather the behavior of the launcher library itself when managing @@ -107,6 +119,34 @@ public static void setConfig(String name, String value) { launcherConfig.put(name, value); } + + + /** + * Specifies that Spark Application be stopped if current process goes away. + * It tries stop/kill Spark Application if launching process goes away. + * + * @since 2.3.0 + * @param autoShutdown Whether to shut down the Spark application if the launcher process goes away. + * @return This launcher. + */ + public SparkLauncher autoShutdown(boolean autoShutdown) { + this.autoShutdown = autoShutdown; + return this; + } + + /** + * Specifies that Spark Submit be launched as a daemon thread. + * + * @since 2.3.0 + * @param launchAsThread Whether to launch the Spark application in a new thread in + * the same process. + * @return This launcher. + */ + public SparkLauncher launchAsThread(boolean launchAsThread) { + this.launchAsThread = launchAsThread; + return this; + } + // Visible for testing. final SparkSubmitCommandBuilder builder; File workingDir; @@ -459,6 +499,8 @@ public SparkLauncher redirectToLog(String loggerName) { * @return A process handle for the Spark app. */ public Process launch() throws IOException { + checkArgument(!launchAsThread, + "Use startApplication method to launch application in a thread"); Process childProc = createBuilder().start(); if (redirectToLog) { String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); @@ -467,6 +509,28 @@ public Process launch() throws IOException { return childProc; } + private String getAppName() throws IOException { + String appName = launchAsThread ? null + : builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); + if (appName == null) { + if (builder.appName != null) { + appName = builder.appName; + } else if (builder.mainClass != null) { + int dot = builder.mainClass.lastIndexOf("."); + if (dot >= 0 && dot < builder.mainClass.length() - 1) { + appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); + } else { + appName = builder.mainClass; + } + } else if (builder.appResource != null) { + appName = new File(builder.appResource).getName(); + } else { + appName = String.valueOf(COUNTER.incrementAndGet()); + } + } + return appName; + } + /** * Starts a Spark application. *

@@ -479,7 +543,7 @@ public Process launch() throws IOException { * that happens after that cannot be monitored. If the underlying application is launched as * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process. *

- * Currently, all applications are launched as child processes. The child's stdout and stderr + * If the application is launched as child process, the child's stdout and stderr * are merged and written to a logger (see java.util.logging) only if redirection * has not otherwise been configured on this SparkLauncher. The logger's name can be * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that @@ -488,11 +552,24 @@ public Process launch() throws IOException { * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more * easily into the configuration of commonly-used logging systems. * + * If the application is launched as a thread, the log redirection methods are not supported, + * and the parent process's output and log configuration will be used. + * * @since 1.6.0 * @param listeners Listeners to add to the handle before the app is launched. * @return A handle for the launched application. */ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException { + if (launchAsThread) { + checkArgument(builder.childEnv.isEmpty(), + "Custom environment variables are not supported while launching in a thread"); + return startApplicationAsThread(listeners); + } + return startApplicationAsChildProc(listeners); + } + + private SparkAppHandle startApplicationAsChildProc(SparkAppHandle.Listener[] listeners) + throws IOException { ChildProcAppHandle handle = LauncherServer.newAppHandle(); for (SparkAppHandle.Listener l : listeners) { handle.addListener(l); @@ -503,23 +580,7 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr // Only setup stderr + stdout to logger redirection if user has not otherwise configured output // redirection. if (loggerName == null) { - String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); - if (appName == null) { - if (builder.appName != null) { - appName = builder.appName; - } else if (builder.mainClass != null) { - int dot = builder.mainClass.lastIndexOf("."); - if (dot >= 0 && dot < builder.mainClass.length() - 1) { - appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); - } else { - appName = builder.mainClass; - } - } else if (builder.appResource != null) { - appName = new File(builder.appResource).getName(); - } else { - appName = String.valueOf(COUNTER.incrementAndGet()); - } - } + String appName = getAppName(); String loggerPrefix = getClass().getPackage().getName(); loggerName = String.format("%s.app.%s", loggerPrefix, appName); pb.redirectErrorStream(true); @@ -528,13 +589,46 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(LauncherServer.getServerInstance().getPort())); pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret()); + pb.environment().put(LauncherProtocol.ENV_LAUNCHER_STOP_IF_SHUTDOWN, + String.valueOf(autoShutdown)); + setConf(LAUNCHER_INTERNAL_STOP_ON_SHUTDOWN, String.valueOf(autoShutdown)); try { handle.setChildProc(pb.start(), loggerName); } catch (IOException ioe) { handle.kill(); throw ioe; } + return handle; + } + + private SparkAppHandle startApplicationAsThread(SparkAppHandle.Listener... listeners) + throws IOException { + ChildThreadAppHandle handle = LauncherServer.newAppThreadHandle(); + for (SparkAppHandle.Listener l : listeners) { + handle.addListener(l); + } + String appName = getAppName(); + setConf(LAUNCHER_INTERNAL_PORT, String.valueOf(LauncherServer.getServerInstance().getPort())); + setConf(LAUNCHER_INTERNAL_CHILD_PROCESS_SECRET, handle.getSecret()); + setConf(LAUNCHER_INTERNAL_STOP_ON_SHUTDOWN, String.valueOf(autoShutdown)); + try { + // It is important that SparkSubmit class is available in the classpath. + // Trying to see if method is available in the classpath else throws Exception. + Method main = SparkSubmitRunner.getSparkSubmitMain(); + Thread submitJobThread = new Thread(new SparkSubmitRunner(main, + builder.buildSparkSubmitArgs())); + submitJobThread.setName(appName); + submitJobThread.setDaemon(true); + handle.setChildThread(submitJobThread); + submitJobThread.start(); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Please make sure the spark jar " + + "containing SparkSubmit is in the classpath.", cnfe); + } catch (NoSuchMethodException nsme) { + throw new IOException("Please make sure the version of the Spark jar containing " + + "SparkSubmit is correct.", nsme); + } return handle; } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java new file mode 100644 index 000000000000..d7ec8a4001a0 --- /dev/null +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitRunner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +/** + * This class used to by {@link SparkLauncher#startApplicationAsThread(SparkAppHandle.Listener...)} + *

+ * This class to start Spark applications programmatically as a Thread. This uses reflection and relies + * on the SparkSubmit main class being in the classpath. + *

+ */ +class SparkSubmitRunner implements Runnable { + private Method main; + private final List args; + + SparkSubmitRunner(Method main, List args) { + this.main = main; + this.args = args; + } + + /** + * Trying to see if "org.apache.spark.deploy.SparkSubmit#Main" method is available in the classpath. + * + * @return Method to execute for starting Spark Application + * @throws ClassNotFoundException + * @throws NoSuchMethodException + */ + protected static Method getSparkSubmitMain() throws ClassNotFoundException, NoSuchMethodException { + Class cls = Class.forName("org.apache.spark.deploy.SparkSubmit"); + return cls.getDeclaredMethod("main", String[].class); + } + + @Override + public void run() { + try { + if (main == null) { + main = getSparkSubmitMain(); + } + Object argsObj = args.toArray(new String[args.size()]); + main.invoke(null, argsObj); + } catch (RuntimeException runtimeException) { + throw runtimeException; + } catch (Exception exception){ + throw new RuntimeException(exception); + } + } +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java index d1ac39bdc76a..05abba136037 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java +++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java @@ -25,8 +25,8 @@ * *

* The {@link org.apache.spark.launcher.SparkLauncher#startApplication( - * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide - * a handle to monitor and control the running application: + * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark application + * as a process and provide a handle to monitor and control the running application: *

* *
@@ -49,6 +49,35 @@
  * 
* *

+ * Here is example of launching application in thread mode for YARN cluster mode + * with the auto shutdown option enabled. Showing use case of both + * {@link org.apache.spark.launcher.SparkLauncher#autoShutdown(boolean)} and + * {@link org.apache.spark.launcher.SparkLauncher#launchAsThread(boolean)} methods + *

+ * + *
+ * {@code
+ *   import org.apache.spark.launcher.SparkAppHandle;
+ *   import org.apache.spark.launcher.SparkLauncher;
+ *
+ *   public class MyLauncher {
+ *     public static void main(String[] args) throws Exception {
+ *       SparkAppHandle handle = new SparkLauncher()
+ *         .setAppResource("/my/app.jar")
+ *         .setMainClass("my.spark.app.Main")
+ *         .setMaster("yarn")
+ *         .setDeployMode("cluster")
+ *         .autoShutdown(true)
+ *         .launchAsThread(true)
+ *         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ *         .startApplication();
+ *       // Use handle API to monitor / control application.
+ *     }
+ *   }
+ * }
+ * 
+ * + *

* It's also possible to launch a raw child process, using the * {@link org.apache.spark.launcher.SparkLauncher#launch()} method: *

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 864c834d110f..22fe4d66743a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -41,6 +41,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -174,6 +175,11 @@ private[spark] class ApplicationMaster( sys.props.remove(e.key) } + // Clean up the spark launcher configuration so it doesn't show up in the Web UI for security. + YarnCommandBuilderUtils.LAUNCHER_CONFIGS.foreach { e => + sparkConf.remove(e) + sys.props.remove(e) + } resources.toMap } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b817570c0abf..b58cc639394d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.{SecurityManager, SparkApp, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager @@ -144,7 +144,7 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { - launcherBackend.connect() + YarnCommandBuilderUtils.launcherBackendConnect(launcherBackend, sparkConf) // Setup the credentials before doing anything else, // so we have don't have issues at any point. setupCredentials() @@ -634,10 +634,11 @@ private[spark] class Client( remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE, LOCALIZED_CONF_DIR, statCache, appMasterOnly = false) - // Clear the cache-related entries from the configuration to avoid them polluting the - // UI's environment page. This works for client mode; for cluster mode, this is handled - // by the AM. + // Clear the cache-related and spark-launcher entries from the configuration to avoid them + // polluting the UI's environment page. This works for client mode; for cluster mode, this + // is handled by the AM. CACHE_CONFIGS.foreach(sparkConf.remove) + YarnCommandBuilderUtils.LAUNCHER_CONFIGS.foreach(sparkConf.remove) localResources } @@ -1129,9 +1130,11 @@ private[spark] class Client( } -private object Client extends Logging { +private object Client extends SparkApp with Logging { - def main(argStrings: Array[String]) { + override def sparkMain( + args: Array[String], + conf: scala.collection.immutable.Map[String, String]): Unit = { if (!sys.props.contains("SPARK_SUBMIT")) { logWarning("WARNING: This client is deprecated and will be removed in a " + "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") @@ -1140,13 +1143,18 @@ private object Client extends Logging { // Set an env variable indicating we are running in YARN mode. // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") + val sparkConf = new SparkConf + for ((key, value) <- conf if key.startsWith("spark.")) { + sparkConf.set(key, value, true) + } + // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. sparkConf.remove("spark.jars") sparkConf.remove("spark.files") - val args = new ClientArguments(argStrings) - new Client(args, sparkConf).run() + val argsForClient = new ClientArguments(args) + new Client(argsForClient, sparkConf).run() } // Alias for the user jar @@ -1436,5 +1444,4 @@ private object Client extends Logging { def isLocalUri(uri: String): Boolean = { uri.startsWith(s"$LOCAL_SCHEME:") } - } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala index 0c3d080cca25..89757ae1fa04 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala @@ -17,10 +17,9 @@ package org.apache.spark.launcher -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer import scala.util.Properties +import org.apache.spark.SparkConf /** * Exposes methods from the launcher library that are used by the YARN backend. */ @@ -38,4 +37,25 @@ private[spark] object YarnCommandBuilderUtils { CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true) } + def launcherBackendConnect(launcherBackend: LauncherBackend, sparkConf: SparkConf): Unit = { + val launcherServerPort: Int = sparkConf.get(SparkLauncher.LAUNCHER_INTERNAL_PORT, "0").toInt + val launcherServerSecret: String = + sparkConf.get(SparkLauncher.LAUNCHER_INTERNAL_CHILD_PROCESS_SECRET, "") + val launcherServerStopIfShutdown: Boolean = + sparkConf.get(SparkLauncher.LAUNCHER_INTERNAL_STOP_ON_SHUTDOWN, "false").toBoolean + if (launcherServerSecret != null && launcherServerSecret != "" && launcherServerPort != 0) { + launcherBackend.connect( + launcherServerPort, + launcherServerSecret, + launcherServerStopIfShutdown) + } else { + launcherBackend.connect() + } + } + + private[spark] val LAUNCHER_CONFIGS = Seq( + SparkLauncher.LAUNCHER_INTERNAL_CHILD_PROCESS_SECRET, + SparkLauncher.LAUNCHER_INTERNAL_PORT) + } + diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 9c3b18e4ec5f..3b6cf19a3b01 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -56,7 +56,7 @@ abstract class BaseYarnClusterSuite |log4j.logger.org.spark_project.jetty=WARN """.stripMargin - private var yarnCluster: MiniYARNCluster = _ + protected var yarnCluster: MiniYARNCluster = _ protected var tempDir: File = _ private var fakeSparkJar: File = _ protected var hadoopConfDir: File = _ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 99fb58a28934..e01c039f7472 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -27,7 +27,10 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicationState} +import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -35,7 +38,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging -import org.apache.spark.launcher._ +import org.apache.spark.launcher.{SparkAppHandle, _} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -79,6 +82,13 @@ class YarnClusterSuite extends BaseYarnClusterSuite { | return 42 """.stripMargin + private def getYarnClient = { + var yarnClient = YarnClient.createYarnClient() + yarnClient.init(yarnCluster.getConfig) + yarnClient.start() + yarnClient + } + test("run Spark in yarn-client mode") { testBasicYarnApp(true) } @@ -162,29 +172,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } test("monitor app using launcher library") { - val env = new JHashMap[String, String]() - env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath()) - - val propsFile = createConfFile() - val handle = new SparkLauncher(env) - .setSparkHome(sys.props("spark.test.home")) - .setConf("spark.ui.enabled", "false") - .setPropertiesFile(propsFile) - .setMaster("yarn") - .setDeployMode("client") - .setAppResource(SparkLauncher.NO_RESOURCE) - .setMainClass(mainClassName(YarnLauncherTestApp.getClass)) - .startApplication() - + val handle = launchSparkAppWithConf(false, false, "client") try { - eventually(timeout(30 seconds), interval(100 millis)) { - handle.getState() should be (SparkAppHandle.State.RUNNING) - } - - handle.getAppId() should not be (null) - handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30 seconds), interval(100 millis)) { handle.getState() should be (SparkAppHandle.State.KILLED) } @@ -201,6 +191,66 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } + test("monitor app running in thread using launcher library") { + val handle = launchSparkAppWithConf(true, false, "cluster") + try { + handle.stop() + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.KILLED) + } + } finally { + handle.kill() + } + } + + test("monitor app using launcher library for proc with auto shutdown") { + val handle = launchSparkAppWithConf(false, true, "cluster") + try { + handle.disconnect() + val applicationId = ConverterUtils.toApplicationId(handle.getAppId) + val yarnClient: YarnClient = getYarnClient + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.LOST) + var status = yarnClient.getApplicationReport(applicationId).getFinalApplicationStatus() + status should be (FinalApplicationStatus.KILLED) + } + } finally { + handle.kill() + } + } + + test("monitor app using launcher library for thread with auto shutdown") { + val handle = launchSparkAppWithConf(true, true, "cluster") + try { + handle.disconnect() + val applicationId = ConverterUtils.toApplicationId(handle.getAppId) + val yarnClient: YarnClient = getYarnClient + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.LOST) + var status = yarnClient.getApplicationReport(applicationId).getFinalApplicationStatus + status should be (FinalApplicationStatus.KILLED) + } + } finally { + handle.kill() + } + } + + test("monitor app using launcher library for thread without auto shutdown") { + val handle = launchSparkAppWithConf(true, false, "cluster") + try { + handle.disconnect() + val applicationId = ConverterUtils.toApplicationId(handle.getAppId) + val yarnClient: YarnClient = getYarnClient + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.LOST) + var status = yarnClient.getApplicationReport(applicationId).getYarnApplicationState + status should not be (YarnApplicationState.KILLED) + } + } finally { + handle.kill() + } + } + private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), @@ -290,6 +340,36 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, executorResult, "OVERRIDDEN") } + private def launchSparkAppWithConf( + launchAsThread: Boolean, + autoShutdown: Boolean, + deployMode: String): SparkAppHandle = { + val env = new JHashMap[String, String]() + env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath()) + + val propsFile = createConfFile() + val launcher = if (launchAsThread == true) new SparkLauncher() else new SparkLauncher(env) + .setSparkHome(sys.props("spark.test.home")) + + val handle = launcher.setConf("spark.ui.enabled", "false") + .setPropertiesFile(propsFile) + .setMaster("yarn") + .setDeployMode(deployMode) + .launchAsThread(launchAsThread) + .autoShutdown(autoShutdown) + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(mainClassName(YarnLauncherTestApp.getClass)) + .startApplication() + + eventually(timeout(30 seconds), interval(100 millis)) { + handle.getState() should be(SparkAppHandle.State.RUNNING) + } + + handle.getAppId() should not be (null) + handle.getAppId() should startWith("application_") + handle + } + } private[spark] class SaveExecutorInfo extends SparkListener {