Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testInvalidMapNoDataFile() {
final ShuffleTest t = createShuffleTest();
final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();

String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
String dataFile = getDataFile(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
assertTrue("should delete", new File(dataFile).delete());

FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
Expand All @@ -42,6 +43,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -160,7 +162,7 @@ public void testMaxConnections() throws Exception {
shuffleHandler.init(conf);
shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp();
final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);

// setup connections
HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
Expand Down Expand Up @@ -238,7 +240,7 @@ public void testKeepAlive() throws Exception {
shuffleHandler.init(conf);
shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp();
final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);

HttpURLConnection conn1 = createRequest(
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true),
Expand Down Expand Up @@ -279,18 +281,34 @@ public void testMapFileAccess() throws IOException {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);

final String randomUser = "randomUser";
final String attempt = "attempt_1111111111111_0004_m_000004_0";
generateMapOutput(randomUser, tempDir.toAbsolutePath().toString(), attempt,
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));

ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
shuffleHandler.init(conf);
try {
shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp();
final SecretKey secretKey = shuffleHandler.addTestApp(randomUser);

HttpURLConnection conn = createRequest(
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false),
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(attempt), false),
secretKey);
conn.connect();
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

InputStream is = null;
try {
is = conn.getInputStream();
} catch (IOException ioe) {
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
is = conn.getErrorStream();
}
}

assertNotNull(is);
BufferedReader in = new BufferedReader(new InputStreamReader(is));
StringBuilder builder = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
Expand All @@ -300,19 +318,19 @@ public void testMapFileAccess() throws IOException {
String receivedString = builder.toString();

//Retrieve file owner name
String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1);
String indexFilePath = getIndexFile(randomUser, tempDir.toAbsolutePath().toString(), attempt);
String owner;
try (FileInputStream fis = new FileInputStream(indexFilePath)) {
owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
}

String message =
"Owner '" + owner + "' for path " + indexFilePath
+ " did not match expected owner '" + TEST_USER + "'";
+ " did not match expected owner '" + randomUser + "'";
assertTrue(String.format("Received string '%s' should contain " +
"message '%s'", receivedString, message),
receivedString.contains(message));
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, conn.getResponseCode());
LOG.info("received: " + receivedString);
assertNotEquals("", receivedString);
} finally {
Expand All @@ -335,7 +353,7 @@ public void testRecovery() throws IOException {
shuffle.init(conf);
shuffle.start();
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffle.addTestApp();
final SecretKey secretKey = shuffle.addTestApp(TEST_USER);

// verify we are authorized to shuffle
int rc = getShuffleResponseCode(port, secretKey);
Expand Down Expand Up @@ -388,7 +406,7 @@ public void testRecoveryFromOtherVersions() throws IOException {
shuffle.init(conf);
shuffle.start();
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffle.addTestApp();
final SecretKey secretKey = shuffle.addTestApp(TEST_USER);

// verify we are authorized to shuffle
int rc = getShuffleResponseCode(port, secretKey);
Expand Down Expand Up @@ -490,14 +508,14 @@ private static HttpURLConnection createRequest(URL url, SecretKey secretKey) thr

class ShuffleHandlerMock extends ShuffleHandler {

public SecretKey addTestApp() throws IOException {
public SecretKey addTestApp(String user) throws IOException {
DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset();
Token<JobTokenIdentifier> jt = new Token<>(
"identifier".getBytes(), "password".getBytes(), new Text(TEST_USER),
"identifier".getBytes(), "password".getBytes(), new Text(user),
new Text("shuffleService"));
jt.write(outputBuffer);
initializeApplication(new ApplicationInitializationContext(TEST_USER, TEST_APP_ID,
initializeApplication(new ApplicationInitializationContext(user, TEST_APP_ID,
ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class TestShuffleHandlerBase {
public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0";
public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0";
public static final String TEST_JOB_ID = "job_1111111111111_0001";
public static final String TEST_USER = "testUser";
public static final String TEST_USER = System.getProperty("user.name");
public static final String TEST_DATA_A = "aaaaa";
public static final String TEST_DATA_B = "bbbbb";
public static final String TEST_DATA_C = "ccccc";
Expand All @@ -70,11 +70,11 @@ public void setup() throws IOException {
tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
tempDir.toFile().deleteOnExit();

generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));

outputStreamCaptor.reset();
Expand All @@ -101,12 +101,13 @@ public List<String> matchLogs(String pattern) {
return allMatches;
}

public static void generateMapOutput(String tempDir, String attempt, List<String> maps)
public static void generateMapOutput(String user, String tempDir,
String attempt, List<String> maps)
throws IOException {
SpillRecord record = new SpillRecord(maps.size());

assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs());
try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) {
assertTrue(new File(getBasePath(user, tempDir, attempt)).mkdirs());
try (PrintWriter writer = new PrintWriter(getDataFile(user, tempDir, attempt), "UTF-8")) {
long startOffset = 0;
int partition = 0;
for (String map : maps) {
Expand All @@ -119,21 +120,21 @@ public static void generateMapOutput(String tempDir, String attempt, List<String
partition++;
writer.write(map);
}
record.writeToFile(new Path(getIndexFile(tempDir, attempt)),
record.writeToFile(new Path(getIndexFile(user, tempDir, attempt)),
new JobConf(new Configuration()));
}
}

public static String getIndexFile(String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME);
public static String getIndexFile(String user, String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(user, tempDir, attempt), INDEX_FILE_NAME);
}

public static String getDataFile(String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME);
public static String getDataFile(String user, String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(user, tempDir, attempt), DATA_FILE_NAME);
}

private static String getBasePath(String tempDir, String attempt) {
return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt);
private static String getBasePath(String user, String tempDir, String attempt) {
return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, user, attempt);
}

public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) {
Expand Down