Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService {

private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";

// Whether failure during service initialization should stop the NM.
@VisibleForTesting
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to document this so others now about it. There is a section in job scheduling doc about the external shuffle service, otherwise maybe just put in the yarn section. We probably should have pointer from yarn section to the job scheduler section or just a standalone section on the shuffle service, but that is a different issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked around but couldn't find any documentation specific to the YARN shuffle service, but let me look again.

private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
Expand Down Expand Up @@ -119,44 +124,50 @@ private boolean isAuthenticationEnabled() {
* Start the shuffle server with the given configuration.
*/
@Override
protected void serviceInit(Configuration conf) {
protected void serviceInit(Configuration conf) throws Exception {
_conf = conf;

// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

try {
// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
} catch (Exception e) {
logger.error("Failed to initialize external shuffle service", e);
}

List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
secretManager = new ShuffleSecretManager();
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
}
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
secretManager = new ShuffleSecretManager();
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
}

int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
shuffleServer = transportContext.createServer(port, bootstraps);
// the port should normally be fixed, but for tests its useful to find an open port
port = shuffleServer.getPort();
boundPort = port;
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
registeredExecutorFile);
int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
shuffleServer = transportContext.createServer(port, bootstraps);
// the port should normally be fixed, but for tests its useful to find an open port
port = shuffleServer.getPort();
boundPort = port;
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
registeredExecutorFile);
} catch (Exception e) {
if (stopOnFailure) {
throw e;
} else {
noteFailure(e);
}
}
}

@Override
Expand Down
13 changes: 1 addition & 12 deletions docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,7 @@ In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.
slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
through Marathon.

In YARN mode, start the shuffle service on each `NodeManager` as follows:

1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
pre-packaged distribution.
2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
`lib` if you are using a distribution.
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
`org.apache.spark.network.yarn.YarnShuffleService`.
4. Restart all `NodeManager`s in your cluster.
In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service).

All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
`spark.shuffle.service.*` namespaces. For more detail, see the
Expand Down
31 changes: 31 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,37 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p
spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/
```

## Configuring the External Shuffle Service

To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these
instructions:

1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
pre-packaged distribution.
1. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
`lib` if you are using a distribution.
1. Add this jar to the classpath of all `NodeManager`s in your cluster.
1. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
`org.apache.spark.network.yarn.YarnShuffleService`.
1. Restart all `NodeManager`s in your cluster.

The following extra configuration options are available when the shuffle service is running on YARN:

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.yarn.shuffle.stopOnFailure</code></td>
<td><code>false</code></td>
<td>
Whether to stop the NodeManager when there's a failure in the Spark Shuffle Service's
initialization. This prevents application failures caused by running containers on
NodeManagers where the Spark Shuffle Service is not running.
</td>
</tr>
</table>

## Launching your application with Apache Oozie

Apache Oozie can launch Spark applications as part of a workflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
*/
package org.apache.spark.network.yarn

import java.io.{DataOutputStream, File, FileOutputStream}
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission._
import java.util.EnumSet

import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.hadoop.fs.Path
import org.apache.hadoop.service.ServiceStateException
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
Expand All @@ -45,7 +49,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
val localDir = Utils.createTempDir()
yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
}

var s1: YarnShuffleService = null
Expand Down Expand Up @@ -316,4 +320,28 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd

s2.stop()
}
}

test("service throws error if cannot start") {
// Create a different config with a read-only local dir.
val roConfig = new YarnConfiguration(yarnConfig)
val roDir = Utils.createTempDir()
Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)

// Try to start the shuffle service, it should fail.
val service = new YarnShuffleService()

try {
val error = intercept[ServiceStateException] {
service.init(roConfig)
}
assert(error.getCause().isInstanceOf[IOException])
} finally {
service.stop()
Files.setPosixFilePermissions(roDir.toPath(),
EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
}
}

}