Skip to content

Commit 23c8131

Browse files
author
Marcelo Vanzin
committed
Add config key to enable the "stop NM on failure" behavior.
1 parent 61a40b1 commit 23c8131

File tree

2 files changed

+46
-30
lines changed

2 files changed

+46
-30
lines changed

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {
7070

7171
private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
7272

73+
// Whether failure during service initialization should stop the NM.
74+
@VisibleForTesting
75+
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
76+
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
77+
7378
// An entity that manages the shuffle secret per application
7479
// This is used only if authentication is enabled
7580
private ShuffleSecretManager secretManager;
@@ -122,37 +127,47 @@ private boolean isAuthenticationEnabled() {
122127
protected void serviceInit(Configuration conf) throws Exception {
123128
_conf = conf;
124129

125-
// In case this NM was killed while there were running spark applications, we need to restore
126-
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
127-
// If we don't find one, then we choose a file to use to save the state next time. Even if
128-
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
129-
// when it comes back
130-
registeredExecutorFile =
131-
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
132-
133-
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
134-
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
135-
136-
// If authentication is enabled, set up the shuffle server to use a
137-
// special RPC handler that filters out unauthenticated fetch requests
138-
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
139-
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
140-
if (authEnabled) {
141-
secretManager = new ShuffleSecretManager();
142-
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
143-
}
130+
boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
144131

145-
int port = conf.getInt(
146-
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
147-
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
148-
shuffleServer = transportContext.createServer(port, bootstraps);
149-
// the port should normally be fixed, but for tests its useful to find an open port
150-
port = shuffleServer.getPort();
151-
boundPort = port;
152-
String authEnabledString = authEnabled ? "enabled" : "not enabled";
153-
logger.info("Started YARN shuffle service for Spark on port {}. " +
154-
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
155-
registeredExecutorFile);
132+
try {
133+
// In case this NM was killed while there were running spark applications, we need to restore
134+
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
135+
// If we don't find one, then we choose a file to use to save the state next time. Even if
136+
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
137+
// when it comes back
138+
registeredExecutorFile =
139+
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
140+
141+
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
142+
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
143+
144+
// If authentication is enabled, set up the shuffle server to use a
145+
// special RPC handler that filters out unauthenticated fetch requests
146+
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
147+
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
148+
if (authEnabled) {
149+
secretManager = new ShuffleSecretManager();
150+
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
151+
}
152+
153+
int port = conf.getInt(
154+
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
155+
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
156+
shuffleServer = transportContext.createServer(port, bootstraps);
157+
// the port should normally be fixed, but for tests its useful to find an open port
158+
port = shuffleServer.getPort();
159+
boundPort = port;
160+
String authEnabledString = authEnabled ? "enabled" : "not enabled";
161+
logger.info("Started YARN shuffle service for Spark on port {}. " +
162+
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
163+
registeredExecutorFile);
164+
} catch (Exception e) {
165+
if (stopOnFailure) {
166+
throw e;
167+
} else {
168+
noteFailure(e);
169+
}
170+
}
156171
}
157172

158173
@Override

yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
327327
val roDir = Utils.createTempDir()
328328
Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
329329
roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
330+
roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
330331

331332
// Try to start the shuffle service, it should fail.
332333
val service = new YarnShuffleService()

0 commit comments

Comments
 (0)