Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
synchronized (InProcessTestApp.LOCK) {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
// we wait until we know that the connection between the app and the launcher has been
// established before allowing the app to finish.
final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
});

InProcessTestApp.LOCK.wait(5000);
}

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
Expand Down Expand Up @@ -193,10 +205,26 @@ public static void main(String[] args) throws Exception {

public static class InProcessTestApp {

/**
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
* the InProcessAppHandle to dispose of itself even before the child connection was properly
* established, so no state changes would be detected for the application and its final
* state would be LOST.
*
* It's not really possible to fix that race safely in the handle code itself without changing
* the way in-process apps talk to the launcher library, so we work around that in the test by
* synchronizing on this object.
*/
public static final Object LOCK = new Object();

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();

synchronized (LOCK) {
LOCK.notifyAll();
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());

private final LauncherServer server;

private LauncherServer.ServerConnection connection;
private List<Listener> listeners;
private AtomicReference<State> state;
private String appId;
private volatile String appId;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
Expand All @@ -44,7 +44,7 @@ protected AbstractAppHandle(LauncherServer server) {
@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
listeners = new CopyOnWriteArrayList<>();
}
listeners.add(l);
}
Expand All @@ -71,16 +71,14 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!isDisposed()) {
if (connection != null) {
try {
connection.closeAndWait();
} catch (IOException ioe) {
// no-op.
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
dispose();
}
dispose();
}

void setConnection(LauncherServer.ServerConnection connection) {
Expand All @@ -97,10 +95,25 @@ boolean isDisposed() {

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*
* This method should be called only when there's a reasonable expectation that the communication
* with the child application is not needed anymore, either because the code managing the handle
* has said so, or because the child application is finished.
*/
synchronized void dispose() {
if (!isDisposed()) {
// First wait for all data from the connection to be read. Then unregister the handle.
// Otherwise, unregistering might cause the server to be stopped and all child connections
// to be closed.
if (connection != null) {
try {
connection.waitForClose();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);

// Set state to LOST if not yet final.
setState(State.LOST, false);
this.disposed = true;
Expand All @@ -127,11 +140,13 @@ void setState(State s, boolean force) {
current = state.get();
}

LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
if (s != State.LOST) {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
}
}

synchronized void setAppId(String appId) {
void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void monitorChild() {
}
}

disconnect();
dispose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more document to disconnect and dispose? So that people can understand the difference between them clearly and have a better understanding of changes like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disconnect() is actually a public method and already documented in the SparkAppHandle interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm still not able to figure out what's the difference between them after reading the doc, do you mind leave a short description here? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the documentation for dispose.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
setState(State.FAILED);
}

disconnect();
dispose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we call disconnect here, we would close the connection, and then wait the close to finish in dispose. If we call dispose directly, we also close and wait the connection(in waitForClose). What the actual difference here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order in which the connection is closed. waitForClose will wait for the connection to be closed by the remote side (the finished app) before closing it itself, like disconnect does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see

});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void run() {
};
ServerConnection clientConnection = new ServerConnection(client, timeout);
Thread clientThread = factory.newThread(clientConnection);
clientConnection.setConnectionThread(clientThread);
synchronized (clients) {
clients.add(clientConnection);
}
Expand Down Expand Up @@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private volatile Thread connectionThread;
volatile AbstractAppHandle handle;
private volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}

@Override
public void run() {
this.connectionThread = Thread.currentThread();
super.run();
void setConnectionThread(Thread t) {
this.connectionThread = t;
}

@Override
Expand Down Expand Up @@ -361,19 +360,30 @@ public void close() throws IOException {
}

/**
* Close the connection and wait for any buffered data to be processed before returning.
* Wait for the remote side to close the connection so that any pending data is processed.
* This ensures any changes reported by the child application take effect.
*
* This method allows a short period for the above to happen (same amount of time as the
* connection timeout, which is configurable). This should be fine for well-behaved
* applications, where they close the connection arond the same time the app handle detects the
* app has finished.
*
* In case the connection is not closed within the grace period, this method forcefully closes
* it and any subsequent data that may arrive will be ignored.
*/
public void closeAndWait() throws IOException {
close();

public void waitForClose() throws IOException {
Thread connThread = this.connectionThread;
if (Thread.currentThread() != connThread) {
try {
connThread.join();
connThread.join(getConnectionTimeout());
} catch (InterruptedException ie) {
// Ignore.
}

if (connThread.isAlive()) {
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
assertTrue(stopMsg instanceof Stop);
} finally {
handle.kill();
close(client);
handle.kill();
client.clientThread.join();
}
}
Expand Down