diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 6a1a38c1a54f4..773c390175b6d 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -41,6 +41,8 @@ public class SparkLauncherSuite extends BaseSuite { private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d"); + private static final String EXCEPTION_MESSAGE = "dummy-exception"; + private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE); private final SparkLauncher launcher = new SparkLauncher(); @@ -130,17 +132,8 @@ public void testInProcessLauncher() throws Exception { try { inProcessLauncherTestImpl(); } finally { - Properties p = new Properties(); - for (Map.Entry e : properties.entrySet()) { - p.put(e.getKey(), e.getValue()); - } - System.setProperties(p); - // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. - // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. - // See SPARK-23019 and SparkContext.stop() for details. - eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> { - assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty()); - }); + restoreSystemProperties(properties); + waitForSparkContextShutdown(); } } @@ -227,6 +220,82 @@ public void testInProcessLauncherDoesNotKillJvm() throws Exception { assertEquals(SparkAppHandle.State.LOST, handle.getState()); } + @Test + public void testInProcessLauncherGetError() throws Exception { + // Because this test runs SparkLauncher in process and in client mode, it pollutes the system + // properties, and that can cause test failures down the test pipeline. So restore the original + // system properties after this test runs. + Map properties = new HashMap<>(System.getProperties()); + + SparkAppHandle handle = null; + try { + handle = new InProcessLauncher() + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(ErrorInProcessTestApp.class.getName()) + .addAppArgs("hello") + .startApplication(); + + final SparkAppHandle _handle = handle; + eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> { + assertEquals(SparkAppHandle.State.FAILED, _handle.getState()); + }); + + assertNotNull(handle.getError()); + assertTrue(handle.getError().isPresent()); + assertSame(handle.getError().get(), DUMMY_EXCEPTION); + } finally { + if (handle != null) { + handle.kill(); + } + restoreSystemProperties(properties); + waitForSparkContextShutdown(); + } + } + + @Test + public void testSparkLauncherGetError() throws Exception { + SparkAppHandle handle = null; + try { + handle = new SparkLauncher() + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(ErrorInProcessTestApp.class.getName()) + .addAppArgs("hello") + .startApplication(); + + final SparkAppHandle _handle = handle; + eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> { + assertEquals(SparkAppHandle.State.FAILED, _handle.getState()); + }); + + assertNotNull(handle.getError()); + assertTrue(handle.getError().isPresent()); + assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE)); + } finally { + if (handle != null) { + handle.kill(); + } + } + } + + private void restoreSystemProperties(Map properties) { + Properties p = new Properties(); + for (Map.Entry e : properties.entrySet()) { + p.put(e.getKey(), e.getValue()); + } + System.setProperties(p); + } + + private void waitForSparkContextShutdown() throws Exception { + // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. + // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. + // See SPARK-23019 and SparkContext.stop() for details. + eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> { + assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty()); + }); + } + public static class SparkLauncherTestApp { public static void main(String[] args) throws Exception { @@ -264,4 +333,15 @@ public static void main(String[] args) throws Exception { } + /** + * Similar to {@link InProcessTestApp} except it throws an exception + */ + public static class ErrorInProcessTestApp { + + public static void main(String[] args) { + assertNotEquals(0, args.length); + assertEquals(args[0], "hello"); + throw DUMMY_EXCEPTION; + } + } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 5609f8492f4f4..41742da7faf27 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,6 +18,7 @@ package org.apache.spark.launcher; import java.io.InputStream; +import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle { private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); private volatile Process childProc; - private OutputRedirector redirector; + private volatile OutputRedirector redirector; ChildProcAppHandle(LauncherServer server) { super(server); @@ -46,6 +47,25 @@ public synchronized void disconnect() { } } + /** + * Parses the logs of {@code spark-submit} and returns the last exception thrown. + * + *

+ * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, its difficult to + * accurately retrieve the full {@link Throwable} from the {@code spark-submit} process. + * This method parses the logs of the sub-process and provides a best-effort attempt at + * returning the last exception thrown by the {@code spark-submit} process. Only the exception + * message is parsed, the associated stacktrace is meaningless. + *

+ * + * @return an {@link Optional} containing a {@link RuntimeException} with the parsed + * exception, otherwise returns a {@link Optional#EMPTY} + */ + @Override + public Optional getError() { + return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty(); + } + @Override public synchronized void kill() { if (!isDisposed()) { diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java index 15fbca0facef2..ba09050c756d2 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java @@ -17,7 +17,9 @@ package org.apache.spark.launcher; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle { // Avoid really long thread names. private static final int MAX_APP_NAME_LEN = 16; + private volatile Throwable error; + private Thread app; InProcessAppHandle(LauncherServer server) { @@ -51,6 +55,11 @@ public synchronized void kill() { } } + @Override + public Optional getError() { + return Optional.ofNullable(error); + } + synchronized void start(String appName, Method main, String[] args) { CommandBuilderUtils.checkState(app == null, "Handle already started."); @@ -62,7 +71,11 @@ synchronized void start(String appName, Method main, String[] args) { try { main.invoke(null, (Object) args); } catch (Throwable t) { + if (t instanceof InvocationTargetException) { + t = t.getCause(); + } LOG.log(Level.WARNING, "Application failed with exception.", t); + error = t; setState(State.FAILED); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 6f4b0bb38e031..5e7664b635cd5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -37,6 +37,7 @@ class OutputRedirector { private final ChildProcAppHandle callback; private volatile boolean active; + private volatile Throwable error; OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this(in, loggerName, tf, null); @@ -61,6 +62,10 @@ private void redirect() { while ((line = reader.readLine()) != null) { if (active) { sink.info(line.replaceFirst("\\s*$", "")); + if (error == null && containsIgnoreCase(line, "Error") || containsIgnoreCase(line, + "Exception")) { + error = new RuntimeException(line); + } } } } catch (IOException e) { @@ -85,4 +90,24 @@ boolean isAlive() { return thread.isAlive(); } + Throwable getError() { + return error; + } + + /** + * Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)} + */ + private static boolean containsIgnoreCase(String str, String searchStr) { + if (str == null || searchStr == null) { + return false; + } + int len = searchStr.length(); + int max = str.length() - len; + for (int i = 0; i <= max; i++) { + if (str.regionMatches(true, i, searchStr, 0, len)) { + return true; + } + } + return false; + } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index cefb4d1a95fb6..afec270e2b11c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -17,6 +17,8 @@ package org.apache.spark.launcher; +import java.util.Optional; + /** * A handle to a running Spark application. *

@@ -100,6 +102,12 @@ public boolean isFinal() { */ void disconnect(); + /** + * If the application failed due to an error, return the underlying error. If the app + * succeeded, this method returns an empty {@link Optional}. + */ + Optional getError(); + /** * Listener for updates to a handle's state. The callbacks do not receive information about * what exactly has changed, just that an update has occurred. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4f250c9943edb..46a42bc0e531e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -106,7 +106,10 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), // [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"), + + // [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError") ) // Exclude rules for 2.3.x