Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.hadoop.yarn.server.resourcemanager.security;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
Expand All @@ -31,6 +37,8 @@
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -69,24 +77,18 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.jupiter.api.Timeout;

@RunWith(Parameterized.class)
public class TestAMRMTokens {

private static final Logger LOG =
LoggerFactory.getLogger(TestAMRMTokens.class);

private final Configuration conf;
private Configuration conf;
private static final int maxWaitAttempts = 50;
private static final int rolling_interval_sec = 13;
private static final long am_expire_ms = 4000;

@Parameters
public static Collection<Object[]> configs() {
Configuration conf = new Configuration();
Configuration confWithSecurity = new Configuration();
Expand All @@ -95,8 +97,8 @@ public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity } });
}

public TestAMRMTokens(Configuration conf) {
this.conf = conf;
public void initTestAMRMTokens(Configuration pConf) {
this.conf = pConf;
UserGroupInformation.setConfiguration(conf);
}

Expand All @@ -107,8 +109,10 @@ public TestAMRMTokens(Configuration conf) {
* @throws Exception
*/
@SuppressWarnings("unchecked")
@Test
public void testTokenExpiry() throws Exception {
@ParameterizedTest
@MethodSource("configs")
public void testTokenExpiry(Configuration pConf) throws Exception {
initTestAMRMTokens(pConf);
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
YarnConfiguration.
Expand Down Expand Up @@ -139,7 +143,7 @@ public void testTokenExpiry() throws Exception {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.containerTokens);
assertNotNull(containerManager.containerTokens);

RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
Expand Down Expand Up @@ -192,7 +196,7 @@ public void testTokenExpiry() throws Exception {
Thread.sleep(100);
count++;
}
Assert.assertTrue(attempt.getState() == RMAppAttemptState.FINISHED);
assertTrue(attempt.getState() == RMAppAttemptState.FINISHED);

// Now simulate trying to allocate. RPC call itself should throw auth
// exception.
Expand All @@ -202,13 +206,13 @@ public void testTokenExpiry() throws Exception {
Records.newRecord(AllocateRequest.class);
try {
rmClient.allocate(allocateRequest);
Assert.fail("You got to be kidding me! "
fail("You got to be kidding me! "
+ "Using App tokens after app-finish should fail!");
} catch (Throwable t) {
LOG.info("Exception found is ", t);
// The exception will still have the earlier appAttemptId as it picks it
// up from the token.
Assert.assertTrue(t.getCause().getMessage().contains(
assertTrue(t.getCause().getMessage().contains(
applicationAttemptId.toString()
+ " not found in AMRMTokenSecretManager."));
}
Expand All @@ -227,9 +231,10 @@ public void testTokenExpiry() throws Exception {
*
* @throws Exception
*/
@Test
public void testMasterKeyRollOver() throws Exception {

@ParameterizedTest
@MethodSource("configs")
public void testMasterKeyRollOver(Configuration pConf) throws Exception {
initTestAMRMTokens(pConf);
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
rolling_interval_sec);
Expand All @@ -247,7 +252,7 @@ public void testMasterKeyRollOver() throws Exception {
AMRMTokenSecretManager appTokenSecretManager =
rm.getRMContext().getAMRMTokenSecretManager();
MasterKeyData oldKey = appTokenSecretManager.getMasterKey();
Assert.assertNotNull(oldKey);
assertNotNull(oldKey);
try {
MockNM nm1 = rm.registerNode("localhost:1234", 5120);

Expand All @@ -260,7 +265,7 @@ public void testMasterKeyRollOver() throws Exception {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.containerTokens);
assertNotNull(containerManager.containerTokens);

RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
Expand All @@ -285,7 +290,7 @@ public void testMasterKeyRollOver() throws Exception {
// One allocate call.
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
Assert.assertTrue(
assertTrue(
rmClient.allocate(allocateRequest).getAMCommand() == null);

// Wait for enough time and make sure the roll_over happens
Expand All @@ -296,15 +301,13 @@ public void testMasterKeyRollOver() throws Exception {
}

MasterKeyData newKey = appTokenSecretManager.getMasterKey();
Assert.assertNotNull(newKey);
Assert.assertFalse("Master key should have changed!",
oldKey.equals(newKey));
assertNotNull(newKey);
assertFalse(oldKey.equals(newKey), "Master key should have changed!");

// Another allocate call with old AMRMToken. Should continue to work.
rpc.stopProxy(rmClient, conf); // To avoid using cached client
rmClient = createRMClient(rm, conf, rpc, currentUser);
Assert
.assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);

waitCount = 0;
while(waitCount++ <= maxWaitAttempts) {
Expand All @@ -319,9 +322,9 @@ public void testMasterKeyRollOver() throws Exception {
Thread.sleep(200);
}
// active the nextMasterKey, and replace the currentMasterKey
Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey));
Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey));
Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null);
assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey));
assertTrue(appTokenSecretManager.getMasterKey().equals(newKey));
assertTrue(appTokenSecretManager.getNextMasterKeyData() == null);

// Create a new Token
Token<AMRMTokenIdentifier> newToken =
Expand All @@ -332,18 +335,16 @@ public void testMasterKeyRollOver() throws Exception {
rpc.stopProxy(rmClient, conf); // To avoid using cached client
rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class);
Assert
.assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);

// Should not work by using the old AMRMToken.
rpc.stopProxy(rmClient, conf); // To avoid using cached client
try {
currentUser.addToken(amRMToken);
rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class);
Assert
.assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
Assert.fail("The old Token should not work");
assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
fail("The old Token should not work");
} catch (Exception ex) {
// expect exception
}
Expand All @@ -355,8 +356,11 @@ public void testMasterKeyRollOver() throws Exception {
}
}

@Test (timeout = 20000)
public void testAMRMMasterKeysUpdate() throws Exception {
@ParameterizedTest
@MethodSource("configs")
@Timeout(value = 20)
public void testAMRMMasterKeysUpdate(Configuration pConf) throws Exception {
initTestAMRMTokens(pConf);
final AtomicReference<AMRMTokenSecretManager> spySecretMgrRef =
new AtomicReference<AMRMTokenSecretManager>();
MockRM rm = new MockRM(conf) {
Expand Down Expand Up @@ -387,7 +391,7 @@ protected AMRMTokenSecretManager createAMRMTokenSecretManager(
// Do allocate. Should not update AMRMToken
AllocateResponse response =
am.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNull(response.getAMRMToken());
assertNull(response.getAMRMToken());
Token<AMRMTokenIdentifier> oldToken = rm.getRMContext().getRMApps()
.get(app.getApplicationId())
.getRMAppAttempt(am.getApplicationAttemptId()).getAMRMToken();
Expand All @@ -396,13 +400,13 @@ protected AMRMTokenSecretManager createAMRMTokenSecretManager(
// Do allocate again. the AM should get the latest AMRMToken
rm.getRMContext().getAMRMTokenSecretManager().rollMasterKey();
response = am.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(response.getAMRMToken());
assertNotNull(response.getAMRMToken());

Token<AMRMTokenIdentifier> amrmToken =
ConverterUtils.convertFromYarn(response.getAMRMToken(), new Text(
response.getAMRMToken().getService()));

Assert.assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm
assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm
.getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey()
.getKeyId());

Expand All @@ -413,19 +417,19 @@ protected AMRMTokenSecretManager createAMRMTokenSecretManager(
am.getApplicationAttemptId().toString(), new String[0]);
ugi.addTokenIdentifier(oldToken.decodeIdentifier());
response = am.doAllocateAs(ugi, Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(response.getAMRMToken());
assertNotNull(response.getAMRMToken());
verify(spySecretMgr, never()).createAndGetAMRMToken(isA(ApplicationAttemptId.class));

// Do allocate again with the updated token and verify we do not
// receive a new token to use.
response = am.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNull(response.getAMRMToken());
assertNull(response.getAMRMToken());

// Activate the next master key. Since there is new master key generated
// in AMRMTokenSecretManager. The AMRMToken will not get updated for AM
rm.getRMContext().getAMRMTokenSecretManager().activateNextMasterKey();
response = am.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNull(response.getAMRMToken());
assertNull(response.getAMRMToken());
rm.stop();
}

Expand Down
Loading