Skip to content

Commit 2e1efcf

Browse files
prabhjyotsinghszilard-nemeth
authored andcommitted
ODP-2637: MAPREDUCE-7434: Fix ShuffleHandler tests. Contributed by Tamas Domok (#49)
(cherry picked from commit 8f6be36) (cherry picked from commit 14a608b) Co-authored-by: Szilard Nemeth <[email protected]>
1 parent 266cce5 commit 2e1efcf

File tree

3 files changed

+47
-28
lines changed

3 files changed

+47
-28
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public void testInvalidMapNoDataFile() {
225225
final ShuffleTest t = createShuffleTest();
226226
final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
227227

228-
String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
228+
String dataFile = getDataFile(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
229229
assertTrue("should delete", new File(dataFile).delete());
230230

231231
FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
3030
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
3131
import static org.junit.Assert.assertNotEquals;
32+
import static org.junit.Assert.assertNotNull;
3233
import static org.junit.Assert.assertTrue;
3334
import static org.junit.Assert.assertEquals;
3435
import static org.junit.Assert.fail;
@@ -42,6 +43,7 @@
4243
import java.io.File;
4344
import java.io.FileInputStream;
4445
import java.io.IOException;
46+
import java.io.InputStream;
4547
import java.io.InputStreamReader;
4648
import java.net.HttpURLConnection;
4749
import java.net.MalformedURLException;
@@ -160,7 +162,7 @@ public void testMaxConnections() throws Exception {
160162
shuffleHandler.init(conf);
161163
shuffleHandler.start();
162164
final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY);
163-
final SecretKey secretKey = shuffleHandler.addTestApp();
165+
final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
164166

165167
// setup connections
166168
HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
@@ -238,7 +240,7 @@ public void testKeepAlive() throws Exception {
238240
shuffleHandler.init(conf);
239241
shuffleHandler.start();
240242
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
241-
final SecretKey secretKey = shuffleHandler.addTestApp();
243+
final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
242244

243245
HttpURLConnection conn1 = createRequest(
244246
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true),
@@ -279,18 +281,34 @@ public void testMapFileAccess() throws IOException {
279281
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
280282
UserGroupInformation.setConfiguration(conf);
281283

284+
final String randomUser = "randomUser";
285+
final String attempt = "attempt_1111111111111_0004_m_000004_0";
286+
generateMapOutput(randomUser, tempDir.toAbsolutePath().toString(), attempt,
287+
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
288+
282289
ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
283290
shuffleHandler.init(conf);
284291
try {
285292
shuffleHandler.start();
286293
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
287-
final SecretKey secretKey = shuffleHandler.addTestApp();
294+
final SecretKey secretKey = shuffleHandler.addTestApp(randomUser);
288295

289296
HttpURLConnection conn = createRequest(
290-
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false),
297+
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(attempt), false),
291298
secretKey);
292299
conn.connect();
293-
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
300+
301+
InputStream is = null;
302+
try {
303+
is = conn.getInputStream();
304+
} catch (IOException ioe) {
305+
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
306+
is = conn.getErrorStream();
307+
}
308+
}
309+
310+
assertNotNull(is);
311+
BufferedReader in = new BufferedReader(new InputStreamReader(is));
294312
StringBuilder builder = new StringBuilder();
295313
String inputLine;
296314
while ((inputLine = in.readLine()) != null) {
@@ -300,19 +318,19 @@ public void testMapFileAccess() throws IOException {
300318
String receivedString = builder.toString();
301319

302320
//Retrieve file owner name
303-
String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1);
321+
String indexFilePath = getIndexFile(randomUser, tempDir.toAbsolutePath().toString(), attempt);
304322
String owner;
305323
try (FileInputStream fis = new FileInputStream(indexFilePath)) {
306324
owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
307325
}
308326

309327
String message =
310328
"Owner '" + owner + "' for path " + indexFilePath
311-
+ " did not match expected owner '" + TEST_USER + "'";
329+
+ " did not match expected owner '" + randomUser + "'";
312330
assertTrue(String.format("Received string '%s' should contain " +
313331
"message '%s'", receivedString, message),
314332
receivedString.contains(message));
315-
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
333+
assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, conn.getResponseCode());
316334
LOG.info("received: " + receivedString);
317335
assertNotEquals("", receivedString);
318336
} finally {
@@ -335,7 +353,7 @@ public void testRecovery() throws IOException {
335353
shuffle.init(conf);
336354
shuffle.start();
337355
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
338-
final SecretKey secretKey = shuffle.addTestApp();
356+
final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
339357

340358
// verify we are authorized to shuffle
341359
int rc = getShuffleResponseCode(port, secretKey);
@@ -388,7 +406,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
388406
shuffle.init(conf);
389407
shuffle.start();
390408
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
391-
final SecretKey secretKey = shuffle.addTestApp();
409+
final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
392410

393411
// verify we are authorized to shuffle
394412
int rc = getShuffleResponseCode(port, secretKey);
@@ -490,14 +508,14 @@ private static HttpURLConnection createRequest(URL url, SecretKey secretKey) thr
490508

491509
class ShuffleHandlerMock extends ShuffleHandler {
492510

493-
public SecretKey addTestApp() throws IOException {
511+
public SecretKey addTestApp(String user) throws IOException {
494512
DataOutputBuffer outputBuffer = new DataOutputBuffer();
495513
outputBuffer.reset();
496514
Token<JobTokenIdentifier> jt = new Token<>(
497-
"identifier".getBytes(), "password".getBytes(), new Text(TEST_USER),
515+
"identifier".getBytes(), "password".getBytes(), new Text(user),
498516
new Text("shuffleService"));
499517
jt.write(outputBuffer);
500-
initializeApplication(new ApplicationInitializationContext(TEST_USER, TEST_APP_ID,
518+
initializeApplication(new ApplicationInitializationContext(user, TEST_APP_ID,
501519
ByteBuffer.wrap(outputBuffer.getData(), 0,
502520
outputBuffer.getLength())));
503521

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class TestShuffleHandlerBase {
5555
public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0";
5656
public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0";
5757
public static final String TEST_JOB_ID = "job_1111111111111_0001";
58-
public static final String TEST_USER = "testUser";
58+
public static final String TEST_USER = System.getProperty("user.name");
5959
public static final String TEST_DATA_A = "aaaaa";
6060
public static final String TEST_DATA_B = "bbbbb";
6161
public static final String TEST_DATA_C = "ccccc";
@@ -70,11 +70,11 @@ public void setup() throws IOException {
7070
tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
7171
tempDir.toFile().deleteOnExit();
7272

73-
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
73+
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
7474
Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
75-
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
75+
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
7676
Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
77-
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
77+
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
7878
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
7979

8080
outputStreamCaptor.reset();
@@ -101,12 +101,13 @@ public List<String> matchLogs(String pattern) {
101101
return allMatches;
102102
}
103103

104-
public static void generateMapOutput(String tempDir, String attempt, List<String> maps)
104+
public static void generateMapOutput(String user, String tempDir,
105+
String attempt, List<String> maps)
105106
throws IOException {
106107
SpillRecord record = new SpillRecord(maps.size());
107108

108-
assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs());
109-
try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) {
109+
assertTrue(new File(getBasePath(user, tempDir, attempt)).mkdirs());
110+
try (PrintWriter writer = new PrintWriter(getDataFile(user, tempDir, attempt), "UTF-8")) {
110111
long startOffset = 0;
111112
int partition = 0;
112113
for (String map : maps) {
@@ -119,21 +120,21 @@ public static void generateMapOutput(String tempDir, String attempt, List<String
119120
partition++;
120121
writer.write(map);
121122
}
122-
record.writeToFile(new Path(getIndexFile(tempDir, attempt)),
123+
record.writeToFile(new Path(getIndexFile(user, tempDir, attempt)),
123124
new JobConf(new Configuration()));
124125
}
125126
}
126127

127-
public static String getIndexFile(String tempDir, String attempt) {
128-
return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME);
128+
public static String getIndexFile(String user, String tempDir, String attempt) {
129+
return String.format("%s/%s", getBasePath(user, tempDir, attempt), INDEX_FILE_NAME);
129130
}
130131

131-
public static String getDataFile(String tempDir, String attempt) {
132-
return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME);
132+
public static String getDataFile(String user, String tempDir, String attempt) {
133+
return String.format("%s/%s", getBasePath(user, tempDir, attempt), DATA_FILE_NAME);
133134
}
134135

135-
private static String getBasePath(String tempDir, String attempt) {
136-
return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt);
136+
private static String getBasePath(String user, String tempDir, String attempt) {
137+
return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, user, attempt);
137138
}
138139

139140
public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) {

0 commit comments

Comments
 (0)