diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 4c85a8b56141a..2225591a4ff75 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Properties; -import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.*; import static org.junit.Assume.*; @@ -122,8 +121,7 @@ public void testChildProcLauncher() throws Exception { assertEquals(0, app.waitFor()); } - // TODO: [SPARK-23020] Re-enable this - @Ignore + @Test public void testInProcessLauncher() throws Exception { // Because this test runs SparkLauncher in process and in client mode, it pollutes the system // properties, and that can cause test failures down the test pipeline. So restore the original @@ -159,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception { SparkAppHandle handle = null; try { - handle = new InProcessLauncher() - .setMaster("local") - .setAppResource(SparkLauncher.NO_RESOURCE) - .setMainClass(InProcessTestApp.class.getName()) - .addAppArgs("hello") - .startApplication(listener); + synchronized (InProcessTestApp.LOCK) { + handle = new InProcessLauncher() + .setMaster("local") + .setAppResource(SparkLauncher.NO_RESOURCE) + .setMainClass(InProcessTestApp.class.getName()) + .addAppArgs("hello") + .startApplication(listener); + + // SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here + // we wait until we know that the connection between the app and the launcher has been + // established before allowing the app to finish. + final SparkAppHandle _handle = handle; + eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> { + assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState()); + }); + + InProcessTestApp.LOCK.wait(5000); + } waitFor(handle); assertEquals(SparkAppHandle.State.FINISHED, handle.getState()); @@ -195,10 +205,26 @@ public static void main(String[] args) throws Exception { public static class InProcessTestApp { + /** + * SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause + * the InProcessAppHandle to dispose of itself even before the child connection was properly + * established, so no state changes would be detected for the application and its final + * state would be LOST. + * + * It's not really possible to fix that race safely in the handle code itself without changing + * the way in-process apps talk to the launcher library, so we work around that in the test by + * synchronizing on this object. + */ + public static final Object LOCK = new Object(); + public static void main(String[] args) throws Exception { assertNotEquals(0, args.length); assertEquals(args[0], "hello"); new SparkContext().stop(); + + synchronized (LOCK) { + LOCK.notifyAll(); + } } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java index 84a25a5254151..9cbebdaeb33d3 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java @@ -18,22 +18,22 @@ package org.apache.spark.launcher; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; abstract class AbstractAppHandle implements SparkAppHandle { - private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); + private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName()); private final LauncherServer server; private LauncherServer.ServerConnection connection; private List listeners; private AtomicReference state; - private String appId; + private volatile String appId; private volatile boolean disposed; protected AbstractAppHandle(LauncherServer server) { @@ -44,7 +44,7 @@ protected AbstractAppHandle(LauncherServer server) { @Override public synchronized void addListener(Listener l) { if (listeners == null) { - listeners = new ArrayList<>(); + listeners = new CopyOnWriteArrayList<>(); } listeners.add(l); } @@ -71,16 +71,14 @@ public void stop() { @Override public synchronized void disconnect() { - if (!isDisposed()) { - if (connection != null) { - try { - connection.closeAndWait(); - } catch (IOException ioe) { - // no-op. - } + if (connection != null && connection.isOpen()) { + try { + connection.close(); + } catch (IOException ioe) { + // no-op. } - dispose(); } + dispose(); } void setConnection(LauncherServer.ServerConnection connection) { @@ -97,10 +95,25 @@ boolean isDisposed() { /** * Mark the handle as disposed, and set it as LOST in case the current state is not final. + * + * This method should be called only when there's a reasonable expectation that the communication + * with the child application is not needed anymore, either because the code managing the handle + * has said so, or because the child application is finished. */ synchronized void dispose() { if (!isDisposed()) { + // First wait for all data from the connection to be read. Then unregister the handle. + // Otherwise, unregistering might cause the server to be stopped and all child connections + // to be closed. + if (connection != null) { + try { + connection.waitForClose(); + } catch (IOException ioe) { + // no-op. + } + } server.unregister(this); + // Set state to LOST if not yet final. setState(State.LOST, false); this.disposed = true; @@ -127,11 +140,13 @@ void setState(State s, boolean force) { current = state.get(); } - LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", - new Object[] { current, s }); + if (s != State.LOST) { + LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", + new Object[] { current, s }); + } } - synchronized void setAppId(String appId) { + void setAppId(String appId) { this.appId = appId; fireEvent(true); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 5e3c95676ecbe..5609f8492f4f4 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -112,7 +112,7 @@ void monitorChild() { } } - disconnect(); + dispose(); } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java index b8030e0063a37..4b740d3fad20e 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java @@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) { setState(State.FAILED); } - disconnect(); + dispose(); }); app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName)); diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index f4ecd52fdeab8..607879fd02ea9 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -238,6 +238,7 @@ public void run() { }; ServerConnection clientConnection = new ServerConnection(client, timeout); Thread clientThread = factory.newThread(clientConnection); + clientConnection.setConnectionThread(clientThread); synchronized (clients) { clients.add(clientConnection); } @@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection { private TimerTask timeout; private volatile Thread connectionThread; - volatile AbstractAppHandle handle; + private volatile AbstractAppHandle handle; ServerConnection(Socket socket, TimerTask timeout) throws IOException { super(socket); this.timeout = timeout; } - @Override - public void run() { - this.connectionThread = Thread.currentThread(); - super.run(); + void setConnectionThread(Thread t) { + this.connectionThread = t; } @Override @@ -361,19 +360,30 @@ public void close() throws IOException { } /** - * Close the connection and wait for any buffered data to be processed before returning. + * Wait for the remote side to close the connection so that any pending data is processed. * This ensures any changes reported by the child application take effect. + * + * This method allows a short period for the above to happen (same amount of time as the + * connection timeout, which is configurable). This should be fine for well-behaved + * applications, where they close the connection arond the same time the app handle detects the + * app has finished. + * + * In case the connection is not closed within the grace period, this method forcefully closes + * it and any subsequent data that may arrive will be ignored. */ - public void closeAndWait() throws IOException { - close(); - + public void waitForClose() throws IOException { Thread connThread = this.connectionThread; if (Thread.currentThread() != connThread) { try { - connThread.join(); + connThread.join(getConnectionTimeout()); } catch (InterruptedException ie) { // Ignore. } + + if (connThread.isAlive()) { + LOG.log(Level.WARNING, "Timed out waiting for child connection to close."); + close(); + } } } diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index 024efac33c391..d16337a319be3 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) { Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS); assertTrue(stopMsg instanceof Stop); } finally { - handle.kill(); close(client); + handle.kill(); client.clientThread.join(); } }