Skip to content

Commit 37ac799

Browse files
committed
ExternalShuffleBlockResolver can handle blockIds w/out stageAttemptId
1 parent a38d760 commit 37ac799

File tree

3 files changed

+99
-9
lines changed

3 files changed

+99
-9
lines changed

network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,14 @@ public void registerExecutor(
168168
*/
169169
public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
170170
String[] blockIdParts = blockId.split("_");
171-
if (blockIdParts.length != 5) {
171+
if (blockIdParts.length < 4 || blockIdParts.length > 5) {
172172
throw new IllegalArgumentException("Unexpected block id format: " + blockId);
173173
} else if (!blockIdParts[0].equals("shuffle")) {
174174
throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
175175
}
176176
int shuffleId = Integer.parseInt(blockIdParts[1]);
177177
int mapId = Integer.parseInt(blockIdParts[2]);
178178
int reduceId = Integer.parseInt(blockIdParts[3]);
179-
int stageAttemptId = Integer.parseInt(blockIdParts[4]);
180179

181180
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
182181
if (executor == null) {
@@ -188,7 +187,14 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
188187
return getHashBasedShuffleBlockData(executor, blockId);
189188
} else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)
190189
|| "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager".equals(executor.shuffleManager)) {
191-
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, stageAttemptId);
190+
// for backwards compatibility, we also handle legacy shuffle block ids which don't have
191+
// a stageAttemptId
192+
String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0";
193+
if (blockIdParts.length == 5) {
194+
int stageAttemptId = Integer.parseInt(blockIdParts[4]);
195+
baseFileName = baseFileName + "_" + stageAttemptId;
196+
}
197+
return getSortBasedShuffleBlockData(executor, baseFileName, reduceId);
192198
} else {
193199
throw new UnsupportedOperationException(
194200
"Unsupported shuffle manager: " + executor.shuffleManager);
@@ -267,8 +273,9 @@ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor,
267273
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
268274
*/
269275
private ManagedBuffer getSortBasedShuffleBlockData(
270-
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int stageAttemptId) {
271-
String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0_" + stageAttemptId;
276+
ExecutorShuffleInfo executor,
277+
String baseFileName,
278+
int reduceId) {
272279
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
273280
baseFileName + ".index");
274281

network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,16 @@ public void testBadRequests() throws IOException {
102102
// pass
103103
}
104104

105-
// no stageAttemptId
105+
// wrong number of parts (note that we allow a missing stageAttemptId)
106106
try {
107-
resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
107+
resolver.getBlockData("app0", "exec1", "shuffle_1_1_0_0_0");
108+
fail("Should have failed");
109+
} catch (RuntimeException e) {
110+
assertTrue("Bad error message: " + e, e.getMessage().contains("Unexpected block id format"));
111+
}
112+
113+
try {
114+
resolver.getBlockData("app0", "exec1", "shuffle_1_1");
108115
fail("Should have failed");
109116
} catch (RuntimeException e) {
110117
assertTrue("Bad error message: " + e, e.getMessage().contains("Unexpected block id format"));
@@ -145,6 +152,54 @@ private void testReadBlockData(ExternalShuffleBlockResolver resolver, String blo
145152
assertEquals(expected, block0);
146153
}
147154

155+
@Test
156+
public void supportLegacySortShuffleBlockIds() throws IOException {
157+
// In Spark 1.6, the stage attempt ID was added to shuffle block ids (SPARK-8029). However,
158+
// during a rolling upgrade, the shuffle service may be restarted with new code but still
159+
// need to serve old apps. So we make sure we can still handle old blocks
160+
161+
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
162+
resolver.registerExecutor("app0", "exec0",
163+
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
164+
165+
dataContext.insertLegacySortShuffleData(2, 1,
166+
new byte[][]{"legacy".getBytes(), "block".getBytes()});
167+
168+
testReadBlockData(resolver, "shuffle_2_1_0", "legacy");
169+
testReadBlockData(resolver, "shuffle_2_1_1", "block");
170+
171+
// verify everything still works when we also register some blocks which do have a
172+
// stageAttemptId
173+
testSortShuffleBlocks();
174+
175+
testReadBlockData(resolver, "shuffle_2_1_0", "legacy");
176+
testReadBlockData(resolver, "shuffle_2_1_1", "block");
177+
}
178+
179+
@Test
180+
public void supportLegacyHashShuffleBlockIds() throws IOException {
181+
// In Spark 1.6, the stage attempt ID was added to shuffle block ids (SPARK-8029). However,
182+
// during a rolling upgrade, the shuffle service may be restarted with new code but still
183+
// need to serve old apps. So we make sure we can still handle old blocks
184+
185+
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
186+
resolver.registerExecutor("app0", "exec0",
187+
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
188+
189+
dataContext.insertLegacyHashShuffleData(2, 0,
190+
new byte[][] { "more legacy".getBytes(), "hash".getBytes() } );
191+
192+
testReadBlockData(resolver, "shuffle_2_0_0", "more legacy");
193+
testReadBlockData(resolver, "shuffle_2_0_1", "hash");
194+
195+
// verify everything still works when we also register some blocks which do have a
196+
// stageAttemptId
197+
testHashShuffleBlocks();
198+
199+
testReadBlockData(resolver, "shuffle_2_0_0", "more legacy");
200+
testReadBlockData(resolver, "shuffle_2_0_1", "hash");
201+
}
202+
148203
@Test
149204
public void jsonSerializationOfExecutorRegistration() throws IOException {
150205
ObjectMapper mapper = new ObjectMapper();

network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,24 @@ public void cleanup() {
5757
}
5858

5959
/** Creates reducer blocks in a sort-based data format within our local dirs. */
60-
public void insertSortShuffleData(int shuffleId, int mapId, int stageAttemptId,
60+
public void insertSortShuffleData(
61+
int shuffleId,
62+
int mapId,
63+
int stageAttemptId,
6164
byte[][] blocks) throws IOException {
6265
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0_" + stageAttemptId;
66+
insertSortShuffleData(blockId, blocks);
67+
}
68+
69+
public void insertLegacySortShuffleData(
70+
int shuffleId,
71+
int mapId,
72+
byte[][] blocks) throws IOException {
73+
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
74+
insertSortShuffleData(blockId, blocks);
75+
}
6376

77+
private void insertSortShuffleData(String blockId, byte[][] blocks) throws IOException {
6478
OutputStream dataStream = new FileOutputStream(
6579
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
6680
DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
@@ -78,8 +92,22 @@ public void insertSortShuffleData(int shuffleId, int mapId, int stageAttemptId,
7892
indexStream.close();
7993
}
8094

95+
public void insertLegacyHashShuffleData(
96+
int shuffleId,
97+
int mapId,
98+
byte[][] blocks) throws IOException {
99+
for (int i = 0; i < blocks.length; i ++) {
100+
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
101+
Files.write(blocks[i],
102+
ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
103+
}
104+
}
105+
81106
/** Creates reducer blocks in a hash-based data format within our local dirs. */
82-
public void insertHashShuffleData(int shuffleId, int mapId, int stageAttemptId,
107+
public void insertHashShuffleData(
108+
int shuffleId,
109+
int mapId,
110+
int stageAttemptId,
83111
byte[][] blocks) throws IOException {
84112
for (int i = 0; i < blocks.length; i ++) {
85113
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i + "_" + stageAttemptId;

0 commit comments

Comments
 (0)