Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0,
disablePortRetry = true)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
* @param disablePortRetry if true, server will not retry its port. It's better for the long-run
* server to disable it since the server and client had the agreement of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: alignment is off by 1

* the specific port.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if true, the server will not retry to bind to a different port than the one configured.
This is better for long-running servers since they may have agreed upon specific
ports with the clients.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, since the only place we ever set this to true is in StandaloneWorkerShuffleService, does it make sense to just move the conf setting there along with this comment? (similar to what you did in YARN)

*/
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style here is to indent next line by 2 spaces, or have one parameter per line indented by 4 spaces each.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this change broke the MiMA checks. It doesn't feel like these classes should be public (so maybe an exclusion should be fine here), but you can also work around it by declaring an overloaded method instead.

@aarondav any comments about whether these classes are really meant to be public?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not meant to be public, could make it private[spark] and add exclusion.

disablePortRetry: Boolean = false): TransportConf = {
val conf = _conf.clone

// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
Expand All @@ -55,6 +59,10 @@ object SparkTransportConf {
conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
conf.set("spark.shuffle.io.clientThreads",
conf.get("spark.shuffle.io.clientThreads", numThreads.toString))

if (disablePortRetry) {
conf.set("spark.port.maxRetries", "0")
}

new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void close() {
}

/**
* Attempt to bind to the specified port up to a fixed number of retries.
* Attempt to bind on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
* If all attempts fail after the max number of retries, exit.
*/
private void bindRightPort(int portToBind) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,26 @@ private boolean isAuthenticationEnabled() {
*/
@Override
protected void serviceInit(Configuration conf) {
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
Configuration newConf = new Configuration(conf);

// It's better to let the NodeManager get down rather than take a port retry
// when `spark.shuffle.service.port` has been conflicted during starting
// the Spark Yarn Shuffle Server, because the retry mechanism will make the
// inconsistency of shuffle port and also make client fail to find the port.
newConf.setInt("spark.port.maxRetries", 0);

TransportConf transportConf = new TransportConf(new HadoopConfigProvider(newConf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
boolean authEnabled = newConf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
blockHandler = new ExternalShuffleBlockHandler(transportConf);
RpcHandler rpcHandler = blockHandler;
if (authEnabled) {
secretManager = new ShuffleSecretManager();
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
}

int port = conf.getInt(
int port = newConf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
shuffleServer = transportContext.createServer(port);
Expand Down