From 9240b77078936dceaaa4a68f6a54c5c0c16aab73 Mon Sep 17 00:00:00 2001 From: Sahil Takiar Date: Mon, 23 Jul 2018 12:31:24 -0500 Subject: [PATCH 1/3] [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle Adds a new method to `SparkAppHandle` called `getError` which returns the exception (if present) that caused the underlying Spark app to fail. New tests added to `SparkLauncherSuite` for the new method. --- .../spark/launcher/SparkLauncherSuite.java | 102 ++++++++++++++++-- .../spark/launcher/ChildProcAppHandle.java | 22 +++- .../spark/launcher/InProcessAppHandle.java | 13 +++ .../spark/launcher/OutputRedirector.java | 25 +++++ .../apache/spark/launcher/SparkAppHandle.java | 8 ++ project/MimaExcludes.scala | 5 +- 6 files changed, 162 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 6a1a38c1a54f4..773c390175b6d 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -41,6 +41,8 @@ public class SparkLauncherSuite extends BaseSuite { private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d"); + private static final String EXCEPTION_MESSAGE = "dummy-exception"; + private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE); private final SparkLauncher launcher = new SparkLauncher(); @@ -130,17 +132,8 @@ public void testInProcessLauncher() throws Exception { try { inProcessLauncherTestImpl(); } finally { - Properties p = new Properties(); - for (Map.Entry e : properties.entrySet()) { - p.put(e.getKey(), e.getValue()); - } - System.setProperties(p); - // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. - // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. - // See SPARK-23019 and SparkContext.stop() for details. - eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> { - assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty()); - }); + restoreSystemProperties(properties); + waitForSparkContextShutdown(); } } @@ -227,6 +220,82 @@ public void testInProcessLauncherDoesNotKillJvm() throws Exception { assertEquals(SparkAppHandle.State.LOST, handle.getState()); } + @Test + public void testInProcessLauncherGetError() throws Exception { + // Because this test runs SparkLauncher in process and in client mode, it pollutes the system + // properties, and that can cause test failures down the test pipeline. So restore the original + // system properties after this test runs. + Map properties = new HashMap<>(System.getProperties()); + + SparkAppHandle handle = null; + try { + handle = new InProcessLauncher() + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(ErrorInProcessTestApp.class.getName()) + .addAppArgs("hello") + .startApplication(); + + final SparkAppHandle _handle = handle; + eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> { + assertEquals(SparkAppHandle.State.FAILED, _handle.getState()); + }); + + assertNotNull(handle.getError()); + assertTrue(handle.getError().isPresent()); + assertSame(handle.getError().get(), DUMMY_EXCEPTION); + } finally { + if (handle != null) { + handle.kill(); + } + restoreSystemProperties(properties); + waitForSparkContextShutdown(); + } + } + + @Test + public void testSparkLauncherGetError() throws Exception { + SparkAppHandle handle = null; + try { + handle = new SparkLauncher() + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(ErrorInProcessTestApp.class.getName()) + .addAppArgs("hello") + .startApplication(); + + final SparkAppHandle _handle = handle; + eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> { + assertEquals(SparkAppHandle.State.FAILED, _handle.getState()); + }); + + assertNotNull(handle.getError()); + assertTrue(handle.getError().isPresent()); + assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE)); + } finally { + if (handle != null) { + handle.kill(); + } + } + } + + private void restoreSystemProperties(Map properties) { + Properties p = new Properties(); + for (Map.Entry e : properties.entrySet()) { + p.put(e.getKey(), e.getValue()); + } + System.setProperties(p); + } + + private void waitForSparkContextShutdown() throws Exception { + // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet. + // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM. + // See SPARK-23019 and SparkContext.stop() for details. + eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> { + assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty()); + }); + } + public static class SparkLauncherTestApp { public static void main(String[] args) throws Exception { @@ -264,4 +333,15 @@ public static void main(String[] args) throws Exception { } + /** + * Similar to {@link InProcessTestApp} except it throws an exception + */ + public static class ErrorInProcessTestApp { + + public static void main(String[] args) { + assertNotEquals(0, args.length); + assertEquals(args[0], "hello"); + throw DUMMY_EXCEPTION; + } + } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 5609f8492f4f4..41742da7faf27 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,6 +18,7 @@ package org.apache.spark.launcher; import java.io.InputStream; +import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle { private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); private volatile Process childProc; - private OutputRedirector redirector; + private volatile OutputRedirector redirector; ChildProcAppHandle(LauncherServer server) { super(server); @@ -46,6 +47,25 @@ public synchronized void disconnect() { } } + /** + * Parses the logs of {@code spark-submit} and returns the last exception thrown. + * + *

+ * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, its difficult to + * accurately retrieve the full {@link Throwable} from the {@code spark-submit} process. + * This method parses the logs of the sub-process and provides a best-effort attempt at + * returning the last exception thrown by the {@code spark-submit} process. Only the exception + * message is parsed, the associated stacktrace is meaningless. + *

+ * + * @return an {@link Optional} containing a {@link RuntimeException} with the parsed + * exception, otherwise returns a {@link Optional#EMPTY} + */ + @Override + public Optional getError() { + return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty(); + } + @Override public synchronized void kill() { if (!isDisposed()) { diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java index 15fbca0facef2..ba09050c756d2 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java @@ -17,7 +17,9 @@ package org.apache.spark.launcher; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle { // Avoid really long thread names. private static final int MAX_APP_NAME_LEN = 16; + private volatile Throwable error; + private Thread app; InProcessAppHandle(LauncherServer server) { @@ -51,6 +55,11 @@ public synchronized void kill() { } } + @Override + public Optional getError() { + return Optional.ofNullable(error); + } + synchronized void start(String appName, Method main, String[] args) { CommandBuilderUtils.checkState(app == null, "Handle already started."); @@ -62,7 +71,11 @@ synchronized void start(String appName, Method main, String[] args) { try { main.invoke(null, (Object) args); } catch (Throwable t) { + if (t instanceof InvocationTargetException) { + t = t.getCause(); + } LOG.log(Level.WARNING, "Application failed with exception.", t); + error = t; setState(State.FAILED); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 6f4b0bb38e031..5e7664b635cd5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -37,6 +37,7 @@ class OutputRedirector { private final ChildProcAppHandle callback; private volatile boolean active; + private volatile Throwable error; OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this(in, loggerName, tf, null); @@ -61,6 +62,10 @@ private void redirect() { while ((line = reader.readLine()) != null) { if (active) { sink.info(line.replaceFirst("\\s*$", "")); + if (error == null && containsIgnoreCase(line, "Error") || containsIgnoreCase(line, + "Exception")) { + error = new RuntimeException(line); + } } } } catch (IOException e) { @@ -85,4 +90,24 @@ boolean isAlive() { return thread.isAlive(); } + Throwable getError() { + return error; + } + + /** + * Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)} + */ + private static boolean containsIgnoreCase(String str, String searchStr) { + if (str == null || searchStr == null) { + return false; + } + int len = searchStr.length(); + int max = str.length() - len; + for (int i = 0; i <= max; i++) { + if (str.regionMatches(true, i, searchStr, 0, len)) { + return true; + } + } + return false; + } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index cefb4d1a95fb6..afec270e2b11c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -17,6 +17,8 @@ package org.apache.spark.launcher; +import java.util.Optional; + /** * A handle to a running Spark application. *

@@ -100,6 +102,12 @@ public boolean isFinal() { */ void disconnect(); + /** + * If the application failed due to an error, return the underlying error. If the app + * succeeded, this method returns an empty {@link Optional}. + */ + Optional getError(); + /** * Listener for updates to a handle's state. The callbacks do not receive information about * what exactly has changed, just that an update has occurred. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4f250c9943edb..46a42bc0e531e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -106,7 +106,10 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), // [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"), + + // [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError") ) // Exclude rules for 2.3.x From e58fc919355c48d2d3b1cacb4d0ee18036cacbc6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 4 Dec 2018 16:41:44 -0800 Subject: [PATCH 2/3] Feedback. --- .../apache/spark/launcher/ChildProcAppHandle.java | 12 +++++------- .../org/apache/spark/launcher/OutputRedirector.java | 3 +-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 41742da7faf27..7dfcf0e66734a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -49,14 +49,12 @@ public synchronized void disconnect() { /** * Parses the logs of {@code spark-submit} and returns the last exception thrown. - * *

- * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, its difficult to - * accurately retrieve the full {@link Throwable} from the {@code spark-submit} process. - * This method parses the logs of the sub-process and provides a best-effort attempt at - * returning the last exception thrown by the {@code spark-submit} process. Only the exception - * message is parsed, the associated stacktrace is meaningless. - *

+ * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, it's difficult to + * accurately retrieve the full {@link Throwable} from the {@code spark-submit} process. + * This method parses the logs of the sub-process and provides a best-effort attempt at + * returning the last exception thrown by the {@code spark-submit} process. Only the exception + * message is parsed, the associated stacktrace is meaningless. * * @return an {@link Optional} containing a {@link RuntimeException} with the parsed * exception, otherwise returns a {@link Optional#EMPTY} diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 5e7664b635cd5..d8cfcfeb48e20 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -62,8 +62,7 @@ private void redirect() { while ((line = reader.readLine()) != null) { if (active) { sink.info(line.replaceFirst("\\s*$", "")); - if (error == null && containsIgnoreCase(line, "Error") || containsIgnoreCase(line, - "Exception")) { + if (containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) { error = new RuntimeException(line); } } From c9ab9bcc378168ff3430d8885899ccd74afe7b32 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 6 Dec 2018 10:27:30 -0800 Subject: [PATCH 3/3] Fix exception parsing. --- .../main/java/org/apache/spark/launcher/OutputRedirector.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index d8cfcfeb48e20..0f097f8313925 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -62,7 +62,8 @@ private void redirect() { while ((line = reader.readLine()) != null) { if (active) { sink.info(line.replaceFirst("\\s*$", "")); - if (containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) { + if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) && + !line.contains("at ")) { error = new RuntimeException(line); } }