Skip to content

Commit 56caa50

Browse files
committed
Address comments
1 parent c8d1ac3 commit 56caa50

File tree

13 files changed

+89
-57
lines changed

13 files changed

+89
-57
lines changed

core/src/main/scala/org/apache/spark/storage/BlockId.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
5353
def name = "rdd_" + rddId + "_" + splitIndex
5454
}
5555

56+
// Format of the shuffle block ids (including data and index) should be kept in sync with
57+
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
5658
@DeveloperApi
5759
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
5860
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ import org.apache.spark.io.CompressionCodec
3535
import org.apache.spark.network._
3636
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3737
import org.apache.spark.network.netty.{SparkTransportConf, NettyBlockTransferService}
38-
import org.apache.spark.network.shuffle.{ExecutorShuffleConfig, StandaloneShuffleClient}
38+
import org.apache.spark.network.shuffle.{ExecutorShuffleInfo, StandaloneShuffleClient}
3939
import org.apache.spark.network.util.{ConfigProvider, TransportConf}
4040
import org.apache.spark.serializer.Serializer
4141
import org.apache.spark.shuffle.ShuffleManager
42+
import org.apache.spark.shuffle.hash.HashShuffleManager
43+
import org.apache.spark.shuffle.sort.SortShuffleManager
4244
import org.apache.spark.util._
4345

4446
private[spark] sealed trait BlockValues
@@ -91,6 +93,14 @@ private[spark] class BlockManager(
9193
private[spark]
9294
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
9395
private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
96+
// Check that we're not using external shuffle service with consolidated shuffle files.
97+
if (externalShuffleServiceEnabled
98+
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)
99+
&& shuffleManager.isInstanceOf[HashShuffleManager]) {
100+
throw new UnsupportedOperationException("Cannot use external shuffle service with consolidated"
101+
+ " shuffle files in hash-based shuffle. Please disable spark.shuffle.consolidateFiles or "
102+
+ " switch to sort-based shuffle.")
103+
}
94104

95105
val blockManagerId = BlockManagerId(
96106
executorId, blockTransferService.hostName, blockTransferService.port)
@@ -174,14 +184,33 @@ private[spark] class BlockManager(
174184

175185
// Register Executors' configuration with the local shuffle service, if one should exist.
176186
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
177-
logInfo("Registering executor with local external shuffle service.")
178-
// Synchronous and will throw an exception if we cannot connect.
179-
val shuffleConfig = new ExecutorShuffleConfig(
180-
diskBlockManager.localDirs.map(_.toString),
181-
diskBlockManager.subDirsPerLocalDir,
182-
shuffleManager.getClass.getName)
183-
shuffleClient.asInstanceOf[StandaloneShuffleClient].registerWithStandaloneShuffleService(
184-
shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
187+
registerWithExternalShuffleServer()
188+
}
189+
}
190+
191+
private def registerWithExternalShuffleServer() {
192+
logInfo("Registering executor with local external shuffle service.")
193+
val shuffleConfig = new ExecutorShuffleInfo(
194+
diskBlockManager.localDirs.map(_.toString),
195+
diskBlockManager.subDirsPerLocalDir,
196+
shuffleManager.getClass.getName)
197+
198+
val MAX_ATTEMPTS = 3
199+
val SLEEP_TIME_SECS = 5
200+
201+
for (i <- 1 to MAX_ATTEMPTS) {
202+
try {
203+
// Synchronous and will throw an exception if we cannot connect.
204+
shuffleClient.asInstanceOf[StandaloneShuffleClient].registerWithShuffleServer(
205+
shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
206+
return
207+
} catch {
208+
case e: Exception if i < MAX_ATTEMPTS =>
209+
val attemptsRemaining =
210+
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
211+
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
212+
Thread.sleep(SLEEP_TIME_SECS * 1000)
213+
}
185214
}
186215
}
187216

network/common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.spark.network.protocol.ChunkFetchRequest;
3535
import org.apache.spark.network.protocol.RpcRequest;
3636
import org.apache.spark.network.protocol.StreamChunkId;
37-
import org.apache.spark.network.util.JavaUtils;
3837
import org.apache.spark.network.util.NettyUtils;
3938

4039
/**
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222

2323
import com.google.common.base.Objects;
2424

25-
/** Contains all configuration necessary for a single Executor to find its shuffle files. */
26-
public class ExecutorShuffleConfig implements Serializable {
25+
/** Contains all configuration necessary for locating the shuffle files of an executor. */
26+
public class ExecutorShuffleInfo implements Serializable {
2727
/** The base set of local directories that the executor stores its shuffle files in. */
2828
final String[] localDirs;
2929
/** Number of subdirectories created within each localDir. */
3030
final int subDirsPerLocalDir;
3131
/** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
3232
final String shuffleManager;
3333

34-
public ExecutorShuffleConfig(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
34+
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
3535
this.localDirs = localDirs;
3636
this.subDirsPerLocalDir = subDirsPerLocalDir;
3737
this.shuffleManager = shuffleManager;
@@ -53,8 +53,8 @@ public String toString() {
5353

5454
@Override
5555
public boolean equals(Object other) {
56-
if (other != null && other instanceof ExecutorShuffleConfig) {
57-
ExecutorShuffleConfig o = (ExecutorShuffleConfig) other;
56+
if (other != null && other instanceof ExecutorShuffleInfo) {
57+
ExecutorShuffleInfo o = (ExecutorShuffleInfo) other;
5858
return Arrays.equals(localDirs, o.localDirs)
5959
&& Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir)
6060
&& Objects.equal(shuffleManager, o.shuffleManager);

network/shuffle/src/main/java/org/apache/spark/network/shuffle/StandaloneShuffleBlockHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void receive(TransportClient client, byte[] message, RpcResponseCallback
7979

8080
} else if (msgObj instanceof RegisterExecutor) {
8181
RegisterExecutor msg = (RegisterExecutor) msgObj;
82-
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorConfig);
82+
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
8383
callback.onSuccess(new byte[0]);
8484

8585
} else {

network/shuffle/src/main/java/org/apache/spark/network/shuffle/StandaloneShuffleBlockManager.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public class StandaloneShuffleBlockManager {
4444
private final Logger logger = LoggerFactory.getLogger(StandaloneShuffleBlockManager.class);
4545

4646
// Map from "appId-execId" to the executor's configuration.
47-
private final ConcurrentHashMap<String, ExecutorShuffleConfig> executors =
48-
new ConcurrentHashMap<String, ExecutorShuffleConfig>();
47+
private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
48+
new ConcurrentHashMap<String, ExecutorShuffleInfo>();
4949

5050
// Returns an id suitable for a single executor within a single application.
5151
private String getAppExecId(String appId, String execId) {
@@ -56,15 +56,16 @@ private String getAppExecId(String appId, String execId) {
5656
public void registerExecutor(
5757
String appId,
5858
String execId,
59-
ExecutorShuffleConfig executorConfig) {
59+
ExecutorShuffleInfo executorInfo) {
6060
String fullId = getAppExecId(appId, execId);
61-
executors.put(fullId, executorConfig);
61+
logger.info("Registered executor {} with {}", fullId, executorInfo);
62+
executors.put(fullId, executorInfo);
6263
}
6364

6465
/**
6566
* Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
66-
* format "shuffle_ShuffleId_MapId_ReduceId", and additionally make assumptions about how the
67-
* hash and sort based shuffles store their data.
67+
* format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
68+
* assumptions about how the hash and sort based shuffles store their data.
6869
*/
6970
public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
7071
String[] blockIdParts = blockId.split("_");
@@ -77,7 +78,7 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
7778
int mapId = Integer.parseInt(blockIdParts[2]);
7879
int reduceId = Integer.parseInt(blockIdParts[3]);
7980

80-
ExecutorShuffleConfig executor = executors.get(getAppExecId(appId, execId));
81+
ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
8182
if (executor == null) {
8283
throw new RuntimeException(
8384
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -98,17 +99,18 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
9899
* This logic is from FileShuffleBlockManager.
99100
*/
100101
// TODO: Support consolidated hash shuffle files
101-
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleConfig executor, String blockId) {
102+
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
102103
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
103104
return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
104105
}
105106

106107
/**
107108
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
108-
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager.
109+
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
110+
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
109111
*/
110112
private ManagedBuffer getSortBasedShuffleBlockData(
111-
ExecutorShuffleConfig executor, int shuffleId, int mapId, int reduceId) {
113+
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
112114
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
113115
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
114116

network/shuffle/src/main/java/org/apache/spark/network/shuffle/StandaloneShuffleClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ public void fetchBlocks(
6767
* @param host Host of standalone shuffle server.
6868
* @param port Port of standalone shuffle server.
6969
* @param execId This Executor's id.
70-
* @param executorConfig Contains all config necessary for the service to find our shuffle files.
70+
* @param executorInfo Contains all info necessary for the service to find our shuffle files.
7171
*/
72-
public void registerWithStandaloneShuffleService(
72+
public void registerWithShuffleServer(
7373
String host,
7474
int port,
7575
String execId,
76-
ExecutorShuffleConfig executorConfig) {
76+
ExecutorShuffleInfo executorInfo) {
7777
TransportClient client = clientFactory.createClient(host, port);
7878
byte[] registerExecutorMessage =
79-
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorConfig));
79+
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
8080
client.sendRpcSync(registerExecutorMessage, 3000);
8181
}
8282
}

network/shuffle/src/main/java/org/apache/spark/network/shuffle/StandaloneShuffleMessages.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,28 @@ public boolean equals(Object other) {
6767
public static class RegisterExecutor implements Serializable {
6868
public final String appId;
6969
public final String execId;
70-
public final ExecutorShuffleConfig executorConfig;
70+
public final ExecutorShuffleInfo executorInfo;
7171

7272
public RegisterExecutor(
7373
String appId,
7474
String execId,
75-
ExecutorShuffleConfig executorConfig) {
75+
ExecutorShuffleInfo executorInfo) {
7676
this.appId = appId;
7777
this.execId = execId;
78-
this.executorConfig = executorConfig;
78+
this.executorInfo = executorInfo;
7979
}
8080

8181
@Override
8282
public int hashCode() {
83-
return Objects.hashCode(appId, execId, executorConfig);
83+
return Objects.hashCode(appId, execId, executorInfo);
8484
}
8585

8686
@Override
8787
public String toString() {
8888
return Objects.toStringHelper(this)
8989
.add("appId", appId)
9090
.add("execId", execId)
91-
.add("executorConfig", executorConfig)
91+
.add("executorInfo", executorInfo)
9292
.toString();
9393
}
9494

@@ -98,7 +98,7 @@ public boolean equals(Object other) {
9898
RegisterExecutor o = (RegisterExecutor) other;
9999
return Objects.equal(appId, o.appId)
100100
&& Objects.equal(execId, o.execId)
101-
&& Objects.equal(executorConfig, o.executorConfig);
101+
&& Objects.equal(executorInfo, o.executorInfo);
102102
}
103103
return false;
104104
}

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleMessagesSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void serializeOpenShuffleBlocks() {
3636

3737
@Test
3838
public void serializeRegisterExecutor() {
39-
RegisterExecutor msg = new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleConfig(
39+
RegisterExecutor msg = new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
4040
new String[] { "/local1", "/local2" }, 32, "MyShuffleManager"));
4141
RegisterExecutor msg2 = JavaUtils.deserialize(JavaUtils.serialize(msg));
4242
assertEquals(msg, msg2);

network/shuffle/src/test/java/org/apache/spark/network/shuffle/StandaloneShuffleBlockHandlerSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void beforeEach() {
5656
public void testRegisterExecutor() {
5757
RpcResponseCallback callback = mock(RpcResponseCallback.class);
5858

59-
ExecutorShuffleConfig config = new ExecutorShuffleConfig(new String[] {"/a", "/b"}, 16, "sort");
59+
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
6060
byte[] registerMessage = JavaUtils.serialize(
6161
new RegisterExecutor("app0", "exec1", config));
6262
handler.receive(client, registerMessage, callback);
@@ -109,7 +109,7 @@ public void testBadMessages() {
109109
}
110110

111111
byte[] unexpectedMessage = JavaUtils.serialize(
112-
new ExecutorShuffleConfig(new String[] {"/a", "/b"}, 16, "sort"));
112+
new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort"));
113113
try {
114114
handler.receive(client, unexpectedMessage, callback);
115115
fail("Should have thrown");

0 commit comments

Comments
 (0)