Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 91 additions & 11 deletions core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
public class SparkLauncherSuite extends BaseSuite {

private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
private static final String EXCEPTION_MESSAGE = "dummy-exception";
private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE);

private final SparkLauncher launcher = new SparkLauncher();

Expand Down Expand Up @@ -130,17 +132,8 @@ public void testInProcessLauncher() throws Exception {
try {
inProcessLauncherTestImpl();
} finally {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
restoreSystemProperties(properties);
waitForSparkContextShutdown();
}
}

Expand Down Expand Up @@ -227,6 +220,82 @@ public void testInProcessLauncherDoesNotKillJvm() throws Exception {
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}

@Test
public void testInProcessLauncherGetError() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
// system properties after this test runs.
Map<Object, Object> properties = new HashMap<>(System.getProperties());

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(ErrorInProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication();

final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
});

assertNotNull(handle.getError());
assertTrue(handle.getError().isPresent());
assertSame(handle.getError().get(), DUMMY_EXCEPTION);
} finally {
if (handle != null) {
handle.kill();
}
restoreSystemProperties(properties);
waitForSparkContextShutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really necessary since ErrorInProcessTestApp does not create a context; but not a big deal either.

}
}

@Test
public void testSparkLauncherGetError() throws Exception {
SparkAppHandle handle = null;
try {
handle = new SparkLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(ErrorInProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication();

final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
});

assertNotNull(handle.getError());
assertTrue(handle.getError().isPresent());
assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE));
} finally {
if (handle != null) {
handle.kill();
}
}
}

private void restoreSystemProperties(Map<Object, Object> properties) {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
}

private void waitForSparkContextShutdown() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
}

public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -264,4 +333,15 @@ public static void main(String[] args) throws Exception {

}

/**
* Similar to {@link InProcessTestApp} except it throws an exception
*/
public static class ErrorInProcessTestApp {

public static void main(String[] args) {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
throw DUMMY_EXCEPTION;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.launcher;

import java.io.InputStream;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());

private volatile Process childProc;
private OutputRedirector redirector;
private volatile OutputRedirector redirector;

ChildProcAppHandle(LauncherServer server) {
super(server);
Expand All @@ -46,6 +47,25 @@ public synchronized void disconnect() {
}
}

/**
* Parses the logs of {@code spark-submit} and returns the last exception thrown.
*
* <p>
* Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, its difficult to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's

also, small nit: keep the text at the same indent level as the p tags (as is done in other classes).

* accurately retrieve the full {@link Throwable} from the {@code spark-submit} process.
* This method parses the logs of the sub-process and provides a best-effort attempt at
* returning the last exception thrown by the {@code spark-submit} process. Only the exception
* message is parsed, the associated stacktrace is meaningless.
* </p>
*
* @return an {@link Optional} containing a {@link RuntimeException} with the parsed
* exception, otherwise returns a {@link Optional#EMPTY}
*/
@Override
public Optional<Throwable> getError() {
return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty();
}

@Override
public synchronized void kill() {
if (!isDisposed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.launcher;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle {
// Avoid really long thread names.
private static final int MAX_APP_NAME_LEN = 16;

private volatile Throwable error;

private Thread app;

InProcessAppHandle(LauncherServer server) {
Expand All @@ -51,6 +55,11 @@ public synchronized void kill() {
}
}

@Override
public Optional<Throwable> getError() {
return Optional.ofNullable(error);
}

synchronized void start(String appName, Method main, String[] args) {
CommandBuilderUtils.checkState(app == null, "Handle already started.");

Expand All @@ -62,7 +71,11 @@ synchronized void start(String appName, Method main, String[] args) {
try {
main.invoke(null, (Object) args);
} catch (Throwable t) {
if (t instanceof InvocationTargetException) {
t = t.getCause();
}
LOG.log(Level.WARNING, "Application failed with exception.", t);
error = t;
setState(State.FAILED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class OutputRedirector {
private final ChildProcAppHandle callback;

private volatile boolean active;
private volatile Throwable error;

OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
Expand All @@ -61,6 +62,10 @@ private void redirect() {
while ((line = reader.readLine()) != null) {
if (active) {
sink.info(line.replaceFirst("\\s*$", ""));
if (error == null && containsIgnoreCase(line, "Error") || containsIgnoreCase(line,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group the conditions so people don't have to remember how precedence work. Also it looks better if you keep the whole call to containsIgnoreCase in the same line.

Wouldn't you want to keep the last error, though, instead of the first one?

"Exception")) {
error = new RuntimeException(line);
}
}
}
} catch (IOException e) {
Expand All @@ -85,4 +90,24 @@ boolean isAlive() {
return thread.isAlive();
}

Throwable getError() {
return error;
}

/**
* Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)}
*/
private static boolean containsIgnoreCase(String str, String searchStr) {
if (str == null || searchStr == null) {
return false;
}
int len = searchStr.length();
int max = str.length() - len;
for (int i = 0; i <= max; i++) {
if (str.regionMatches(true, i, searchStr, 0, len)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.launcher;

import java.util.Optional;

/**
* A handle to a running Spark application.
* <p>
Expand Down Expand Up @@ -100,6 +102,12 @@ public boolean isFinal() {
*/
void disconnect();

/**
* If the application failed due to an error, return the underlying error. If the app
* succeeded, this method returns an empty {@link Optional}.
*/
Optional<Throwable> getError();

/**
* Listener for updates to a handle's state. The callbacks do not receive information about
* what exactly has changed, just that an update has occurred.
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ object MimaExcludes {
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),

// [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"),

// [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add at the top of the list (then you don't have to touch existing entries).

)

// Exclude rules for 2.3.x
Expand Down