Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Properties;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
Expand Down Expand Up @@ -122,8 +121,7 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

// TODO: [SPARK-23020] Re-enable this
@Ignore
@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
Expand Down Expand Up @@ -159,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
synchronized (InProcessTestApp.LOCK) {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
// we wait until we know that the connection between the app and the launcher has been
// established before allowing the app to finish.
final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
});

InProcessTestApp.LOCK.wait(5000);
}

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
Expand Down Expand Up @@ -195,10 +205,26 @@ public static void main(String[] args) throws Exception {

public static class InProcessTestApp {

/**
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
* the InProcessAppHandle to dispose of itself even before the child connection was properly
* established, so no state changes would be detected for the application and its final
* state would be LOST.
*
* It's not really possible to fix that race safely in the handle code itself without changing
* the way in-process apps talk to the launcher library, so we work around that in the test by
* synchronizing on this object.
*/
public static final Object LOCK = new Object();

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();

synchronized (LOCK) {
LOCK.notifyAll();
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());

private final LauncherServer server;

private LauncherServer.ServerConnection connection;
private List<Listener> listeners;
private AtomicReference<State> state;
private String appId;
private volatile String appId;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
Expand All @@ -44,7 +44,7 @@ protected AbstractAppHandle(LauncherServer server) {
@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
listeners = new CopyOnWriteArrayList<>();
}
listeners.add(l);
}
Expand All @@ -71,16 +71,14 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!isDisposed()) {
if (connection != null) {
try {
connection.closeAndWait();
} catch (IOException ioe) {
// no-op.
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
dispose();
}
dispose();
}

void setConnection(LauncherServer.ServerConnection connection) {
Expand All @@ -97,10 +95,25 @@ boolean isDisposed() {

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*
* This method should be called only when there's a reasonable expectation that the communication
* with the child application is not needed anymore, either because the code managing the handle
* has said so, or because the child application is finished.
*/
synchronized void dispose() {
if (!isDisposed()) {
// First wait for all data from the connection to be read. Then unregister the handle.
// Otherwise, unregistering might cause the server to be stopped and all child connections
// to be closed.
if (connection != null) {
try {
connection.waitForClose();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);

// Set state to LOST if not yet final.
setState(State.LOST, false);
this.disposed = true;
Expand All @@ -127,11 +140,13 @@ void setState(State s, boolean force) {
current = state.get();
}

LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
if (s != State.LOST) {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
}
}

synchronized void setAppId(String appId) {
void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void monitorChild() {
}
}

disconnect();
dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
setState(State.FAILED);
}

disconnect();
dispose();
});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void run() {
};
ServerConnection clientConnection = new ServerConnection(client, timeout);
Thread clientThread = factory.newThread(clientConnection);
clientConnection.setConnectionThread(clientThread);
synchronized (clients) {
clients.add(clientConnection);
}
Expand Down Expand Up @@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private volatile Thread connectionThread;
volatile AbstractAppHandle handle;
private volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}

@Override
public void run() {
this.connectionThread = Thread.currentThread();
super.run();
void setConnectionThread(Thread t) {
this.connectionThread = t;
}

@Override
Expand Down Expand Up @@ -361,19 +360,30 @@ public void close() throws IOException {
}

/**
* Close the connection and wait for any buffered data to be processed before returning.
* Wait for the remote side to close the connection so that any pending data is processed.
* This ensures any changes reported by the child application take effect.
*
* This method allows a short period for the above to happen (same amount of time as the
* connection timeout, which is configurable). This should be fine for well-behaved
* applications, where they close the connection arond the same time the app handle detects the
* app has finished.
*
* In case the connection is not closed within the grace period, this method forcefully closes
* it and any subsequent data that may arrive will be ignored.
*/
public void closeAndWait() throws IOException {
close();

public void waitForClose() throws IOException {
Thread connThread = this.connectionThread;
if (Thread.currentThread() != connThread) {
try {
connThread.join();
connThread.join(getConnectionTimeout());
} catch (InterruptedException ie) {
// Ignore.
}

if (connThread.isAlive()) {
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
assertTrue(stopMsg instanceof Stop);
} finally {
handle.kill();
close(client);
handle.kill();
client.clientThread.join();
}
}
Expand Down