diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java index 384295fd09dcc..5645fc67c586a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java @@ -112,4 +112,11 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) */ RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException; + + /** + * The Router Supports incrementDelegationTokenSeqNum. + * + * @return DelegationTokenSeqNum. + */ + int incrementDelegationTokenSeqNum(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 2d210ab37496c..0aa0d5163d81d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.Comparator; @@ -110,6 +111,7 @@ public class MemoryFederationStateStore implements FederationStateStore { private Map policies; private RouterRMDTSecretManagerState routerRMSecretManagerState; private int maxAppsInStateStore; + private AtomicInteger sequenceNum; private final MonotonicClock clock = new MonotonicClock(); @@ -126,6 +128,7 @@ public void init(Configuration conf) { maxAppsInStateStore = conf.getInt( YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); + sequenceNum = new AtomicInteger(); } @Override @@ -534,6 +537,11 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req return RouterRMTokenResponse.newInstance(resultToken); } + @Override + public synchronized int incrementDelegationTokenSeqNum() { + return sequenceNum.incrementAndGet(); + } + private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, boolean isUpdate) throws IOException { Map rmDTState = routerRMSecretManagerState.getTokenState(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 2bf2658944e5f..3aaf1bb878b54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -1394,4 +1394,9 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + @Override + public int incrementDelegationTokenSeqNum() { + return 0; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java index 113e4850a5709..54b8b5f4ddae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java @@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource { @Metric("Duration for a update reservation homeSubCluster call") private MutableRate updateReservationHomeSubCluster; + @Metric("Duration for a store new master key call") + private MutableRate storeNewMasterKey; + + @Metric("Duration for a remove new master key call") + private MutableRate removeStoredMasterKey; + + @Metric("Duration for a get master key by delegation key call") + private MutableRate getMasterKeyByDelegationKey; + + @Metric("Duration for a store new token call") + private MutableRate storeNewToken; + + @Metric("Duration for a update stored token call") + private MutableRate updateStoredToken; + + @Metric("Duration for a remove stored token call") + private MutableRate removeStoredToken; + + @Metric("Duration for a get token by router store token call") + private MutableRate getTokenByRouterStoreToken; + protected static final MetricsInfo RECORD_INFO = info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls"); @@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) { updateReservationHomeSubCluster.add(endTime - startTime); } + + public void addStoreNewMasterKeyDuration(long startTime, long endTime) { + storeNewMasterKey.add(endTime - startTime); + } + + public void removeStoredMasterKeyDuration(long startTime, long endTime) { + removeStoredMasterKey.add(endTime - startTime); + } + + public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) { + getMasterKeyByDelegationKey.add(endTime - startTime); + } + + public void getStoreNewTokenDuration(long startTime, long endTime) { + storeNewToken.add(endTime - startTime); + } + + public void updateStoredTokenDuration(long startTime, long endTime) { + updateStoredToken.add(endTime - startTime); + } + + public void removeStoredTokenDuration(long startTime, long endTime) { + removeStoredToken.add(endTime - startTime); + } + + public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) { + getTokenByRouterStoreToken.add(endTime - startTime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index f4d45f5a723c1..1ed9e410e78e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -17,9 +17,12 @@ package org.apache.hadoop.yarn.server.federation.store.impl; -import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; - import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Calendar; import java.util.List; @@ -27,9 +30,11 @@ import java.util.Comparator; import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -37,6 +42,7 @@ import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto; +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -87,14 +93,18 @@ import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl; import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; @@ -103,11 +113,14 @@ import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster; +import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE; +import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT; +import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; /** * ZooKeeper implementation of {@link FederationStateStore}. - * * The znode structure is as follows: + * * ROOT_DIR_PATH * |--- MEMBERSHIP * | |----- SC1 @@ -121,6 +134,14 @@ * |--- RESERVATION * | |----- RESERVATION1 * | |----- RESERVATION2 + * |--- ROUTER_RM_DT_SECRET_MANAGER_ROOT + * | |----- ROUTER_RM_DELEGATION_TOKENS_ROOT + * | | |----- RM_DELEGATION_TOKEN_1 + * | | |----- RM_DELEGATION_TOKEN_2 + * | | |----- RM_DELEGATION_TOKEN_3 + * | |----- ROUTER_RM_DT_MASTER_KEYS_ROOT + * | | |----- DELEGATION_KEY_1 + * |--- |----- ROUTER_RM_DT_SEQUENTIAL_NUMBER */ public class ZookeeperFederationStateStore implements FederationStateStore { @@ -132,9 +153,26 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private final static String ROOT_ZNODE_NAME_POLICY = "policies"; private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation"; + /** Store Delegation Token Node. */ + private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root"; + private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = + "router_rm_dt_master_keys_root"; + private static final String ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = + "router_rm_delegation_tokens_root"; + private static final String ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = + "router_rm_dt_sequential_number"; + private static final String ROUTER_RM_DELEGATION_KEY_PREFIX = "delegation_key_"; + private static final String ROUTER_RM_DELEGATION_TOKEN_PREFIX = "rm_delegation_token_"; + /** Interface to Zookeeper. */ private ZKCuratorManager zkManager; + /** Store sequenceNum. **/ + private int seqNumBatchSize; + private int currentSeqNum; + private int currentMaxSeqNum; + private SharedCount delTokSeqCounter; + /** Directory to store the state store data. */ private String baseZNode; @@ -144,6 +182,12 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private String reservationsZNode; private int maxAppsInStateStore; + /** Directory to store the delegation token data. **/ + private String routerRMDTSecretManagerRoot; + private String routerRMDTMasterKeysRootPath; + private String routerRMDelegationTokensRootPath; + private String routerRMSequenceNumberPath; + private volatile Clock clock = SystemClock.getInstance(); @VisibleForTesting @@ -152,6 +196,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { @Override public void init(Configuration conf) throws YarnException { + LOG.info("Initializing ZooKeeper connection"); maxAppsInStateStore = conf.getInt( @@ -174,6 +219,15 @@ public void init(Configuration conf) throws YarnException { policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY); reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION); + // delegation token znodes + routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT); + routerRMDTMasterKeysRootPath = getNodePath(routerRMDTSecretManagerRoot, + ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME); + routerRMDelegationTokensRootPath = getNodePath(routerRMDTSecretManagerRoot, + ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); + routerRMSequenceNumberPath = getNodePath(routerRMDTSecretManagerRoot, + ROUTER_RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); + // Create base znode for each entity try { List zkAcl = ZKCuratorManager.getZKAcls(conf); @@ -181,14 +235,48 @@ public void init(Configuration conf) throws YarnException { zkManager.createRootDirRecursively(appsZNode, zkAcl); zkManager.createRootDirRecursively(policiesZNode, zkAcl); zkManager.createRootDirRecursively(reservationsZNode, zkAcl); + zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl); + zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl); + zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl); } catch (Exception e) { String errMsg = "Cannot create base directories: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + + // Distributed sequenceNum. + try { + seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, + ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); + + delTokSeqCounter = new SharedCount(zkManager.getCurator(), routerRMSequenceNumberPath, 0); + + if (delTokSeqCounter != null) { + delTokSeqCounter.start(); + } + + // the first batch range should be allocated during this starting window + // by calling the incrSharedCount + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + + LOG.info("Fetched initial range of seq num, from {} to {} ", + currentSeqNum + 1, currentMaxSeqNum); + } catch (Exception e) { + throw new YarnException("Could not start Sequence Counter.", e); + } } @Override public void close() throws Exception { + + try { + if (delTokSeqCounter != null) { + delTokSeqCounter.close(); + } + } catch (Exception e) { + LOG.error("Could not Stop Delegation Token Counter", e); + } + if (zkManager != null) { zkManager.close(); } @@ -886,45 +974,557 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( return UpdateReservationHomeSubClusterResponse.newInstance(); } + /** + * ZookeeperFederationStateStore Supports Store NewMasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) + public synchronized RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeCreatePath); + + // Write master key data to zk. + try(ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os)) { + delegationKey.write(fsOut); + put(nodeCreatePath, os.toByteArray(), false); + } + + // Get the stored masterKey from zk. + RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath, false); + long end = clock.getTime(); + opDurations.addStoreNewMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKeyFromZK); } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) + public synchronized RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + // Parse the delegationKey from the request and get the ZK storage path. + RouterMasterKey masterKey = request.getRouterMasterKey(); + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeRemovePath); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodeRemovePath)) { + throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!"); + } + + // try to remove masterKey. + zkManager.delete(nodeRemovePath); + long end = clock.getTime(); + opDurations.removeStoredMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKey); + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Get the stored masterKey from zk. + RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath, false); + long end = clock.getTime(); + opDurations.getMasterKeyByDelegationKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(routerMasterKey); + } catch (Exception e) { + throw new YarnException(e); + } + } + + /** + * Get MasterKeyZNodePath based on DelegationKey. + * + * @param delegationKey delegationKey. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) { + return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId()); + } + + /** + * Get MasterKeyZNodePath based on KeyId. + * + * @param keyId master key id. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByKeyId(int keyId) { + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId; + return getNodePath(routerRMDTMasterKeysRootPath, nodeName); } + /** + * Get RouterMasterKey from ZK. + * + * @param nodePath The path where masterKey is stored in zk. + * @param quiet If true is silent mode, no error message is printed at this time, + * if false is non-silent mode, error message is printed at this time. + * + * @return + * @throws IOException + */ + private RouterMasterKey getRouterMasterKeyFromZK(String nodePath, boolean quiet) + throws IOException { + try { + byte[] data = get(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + DelegationKey key = new DelegationKey(); + key.readFields(din); + + return RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + } catch (Exception ex) { + if (!quiet) { + LOG.error("No node in path [" + nodePath + "]"); + } + throw new IOException(ex); + } + } + + /** + * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier. + * + * The stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) + public synchronized RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // add delegationToken + storeOrUpdateRouterRMDT(request, false); + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + long end = clock.getTime(); + opDurations.getStoreNewTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier. + * + * The update stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) + public synchronized RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // get the Token storage path + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + + // updateStoredToken needs to determine whether the zkNode exists. + // If it exists, update the token data. + // If it does not exist, write the new token data directly. + boolean pathExists = true; + if (!exists(nodePath)) { + pathExists = false; + } + + if (pathExists) { + // update delegationToken + storeOrUpdateRouterRMDT(request, true); + } else { + // add new delegationToken + storeNewToken(request); + } + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + long end = clock.getTime(); + opDurations.updateStoredTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier. + * + * The remove stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) + public synchronized RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // get the Token storage path + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + + // If the path to be deleted does not exist, throw an exception directly. + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Check again, first get the data from ZK, + // if the data is not empty, then delete it + RouterStoreToken storeToken = getStoreTokenFromZK(request, false); + if (storeToken != null) { + zkManager.delete(nodePath); + } + + // return deleted token data. + long end = clock.getTime(); + opDurations.removeStoredTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(storeToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * The Router Supports GetTokenByRouterStoreToken. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return RouterRMTokenResponse. + * @throws YarnException if the call to the state store is unsuccessful + * @throws IOException An IO Error occurred + */ @Override public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // We verify the RouterRMTokenRequest to ensure that the request is not empty, + // and that the internal RouterStoreToken is not empty. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // Before get the token, + // we need to determine whether the path where the token is stored exists. + // If it doesn't exist, we will throw an exception. + String nodePath = getStoreTokenZNodePathByTokenRequest(request); + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Get the stored delegationToken from ZK and return. + RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false); + // return deleted token data. + long end = clock.getTime(); + opDurations.getTokenByRouterStoreTokenDuration(start, end); + return RouterRMTokenResponse.newInstance(resultStoreToken); + } catch (YarnException | IOException e) { + throw e; + } catch (Exception e) { + throw new YarnException(e); + } + } + + /** + * Convert MasterKey to DelegationKey. + * + * Before using this function, + * please use FederationRouterRMTokenInputValidator to verify the request. + * By default, the request is not empty, and the internal object is not empty. + * + * @param request RouterMasterKeyRequest + * @return DelegationKey. + */ + private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) { + RouterMasterKey masterKey = request.getRouterMasterKey(); + return convertMasterKeyToDelegationKey(masterKey); + } + + /** + * Convert MasterKey to DelegationKey. + * + * @param masterKey masterKey. + * @return DelegationKey. + */ + private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) { + ByteBuffer keyByteBuf = masterKey.getKeyBytes(); + byte[] keyBytes = new byte[keyByteBuf.remaining()]; + keyByteBuf.get(keyBytes); + return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes); + } + + /** + * Check if a path exists in zk. + * + * @param path Path to be checked. + * @return Returns true if the path exists, false if the path does not exist. + * @throws Exception When an exception to access zk occurs. + */ + @VisibleForTesting + boolean exists(final String path) throws Exception { + return zkManager.exists(path); + } + + /** + * Add or update delegationToken. + * + * Before using this function, + * please use FederationRouterRMTokenInputValidator to verify the request. + * By default, the request is not empty, and the internal object is not empty. + * + * @param request storeToken + * @param isUpdate true, update the token; false, create a new token. + * @throws Exception exception occurs. + */ + private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request, boolean isUpdate) + throws Exception { + + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request); + LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate); + put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate); + } + + /** + * Get ZNode Path of StoreToken. + * + * Before using this method, we should use FederationRouterRMTokenInputValidator + * to verify the request,ensure that the request is not empty, + * and ensure that the object in the request is not empty. + * + * @param request RouterMasterKeyRequest. + * @return RouterRMToken ZNode Path. + * @throws IOException io exception occurs. + */ + private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest request) + throws IOException { + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + return getStoreTokenZNodePathByIdentifier(identifier); + } + + /** + * Get ZNode Path of StoreToken. + * + * @param identifier YARNDelegationTokenIdentifier + * @return RouterRMToken ZNode Path. + */ + private String getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) { + String nodePath = getNodePath(routerRMDelegationTokensRootPath, + ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + return nodePath; + } + + /** + * Get RouterStoreToken from ZK. + * + * @param request RouterMasterKeyRequest. + * @param quiet If true is silent mode, no error message is printed at this time, + * if false is non-silent mode, error message is printed at this time. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request, + boolean quiet) throws IOException { + RouterStoreToken routerStoreToken = request.getRouterStoreToken(); + YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier(); + return getStoreTokenFromZK(identifier, quiet); + } + + /** + * Get RouterStoreToken from ZK. + * + * @param identifier YARN DelegationToken Identifier + * @param quiet Whether it is in quiet mode, + * if it is in quiet mode, no exception information will be output. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier identifier, + boolean quiet) throws IOException { + // get the Token storage path + String nodePath = getStoreTokenZNodePathByIdentifier(identifier); + return getStoreTokenFromZK(nodePath, quiet); + } + + /** + * Get RouterStoreToken from ZK. + * + * @param nodePath Znode location where data is stored. + * @param quiet Whether it is in quiet mode, + * if it is in quiet mode, no exception information will be output. + * @return RouterStoreToken. + * @throws IOException io exception occurs. + */ + private RouterStoreToken getStoreTokenFromZK(String nodePath, boolean quiet) + throws IOException { + try { + byte[] data = get(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + storeToken.readFields(din); + return storeToken; + } catch (Exception ex) { + if (!quiet) { + LOG.error("No node in path [" + nodePath + "]"); + } + throw new IOException(ex); + } + } + + /** + * Increase SequenceNum. For zk, this is a distributed value. + * To ensure data consistency, we will use the synchronized keyword. + * + * For ZookeeperFederationStateStore, in order to reduce the interaction with ZK, + * we will apply for SequenceNum from ZK in batches(Apply when currentSeqNum >= currentMaxSeqNum), + * and assign this value to the variable currentMaxSeqNum. + * + * When calling the method incrementDelegationTokenSeqNum, + * if currentSeqNum < currentMaxSeqNum, we return ++currentMaxSeqNum, + * When currentSeqNum >= currentMaxSeqNum, we re-apply SequenceNum from zk. + * + * @return SequenceNum. + */ + @Override + public synchronized int incrementDelegationTokenSeqNum() { + // The secret manager will keep a local range of seq num which won't be + // seen by peers, so only when the range is exhausted it will ask zk for + // another range again + if (currentSeqNum >= currentMaxSeqNum) { + try { + // after a successful batch request, we can get the range starting point + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum + 1, currentMaxSeqNum); + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug("Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Could not increment shared counter !!", e); + } + } + return ++currentSeqNum; + } + + /** + * Increment the value of the shared variable. + * + * @param sharedCount zk SharedCount. + * @param batchSize batch size. + * @return new SequenceNum. + * @throws Exception exception occurs. + */ + private int incrSharedCount(SharedCount sharedCount, int batchSize) + throws Exception { + while (true) { + // Loop until we successfully increment the counter + VersionedValue versionedValue = sharedCount.getVersionedValue(); + if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + batchSize)) { + return versionedValue.getValue(); + } + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java index d6eade878020f..a210ef8f00862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/RouterStoreToken.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.Records; +import java.io.DataInput; import java.io.IOException; @Private @@ -53,4 +54,8 @@ public static RouterStoreToken newInstance(YARNDelegationTokenIdentifier identif @Private @Unstable public abstract void setRenewDate(Long renewDate); + + public abstract byte[] toByteArray() throws IOException; + + public abstract void readFields(DataInput in) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java index 32e148cb5b7ba..df6030a3f0d68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/RouterStoreTokenPBImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.RouterStoreTokenProtoOrBuilder; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; @@ -168,4 +169,12 @@ private YARNDelegationTokenIdentifierProto convertToProtoFormat( YARNDelegationTokenIdentifier delegationTokenIdentifier) { return delegationTokenIdentifier.getProto(); } + + public byte[] toByteArray() throws IOException { + return builder.build().toByteArray(); + } + + public void readFields(DataInput in) throws IOException { + builder.mergeFrom((DataInputStream) in); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java new file mode 100644 index 0000000000000..4ec78db9deafa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationRouterRMTokenInputValidator.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.records.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FederationRouterRMTokenInputValidator { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationRouterRMTokenInputValidator.class); + + private FederationRouterRMTokenInputValidator() { + } + + /** + * We will check with the RouterRMTokenRequest{@link RouterRMTokenRequest} + * to ensure that the request object is not empty and that the RouterStoreToken is not empty. + * + * @param request RouterRMTokenRequest Request. + * @throws FederationStateStoreInvalidInputException if the request is invalid. + */ + public static void validate(RouterRMTokenRequest request) + throws FederationStateStoreInvalidInputException { + + if (request == null) { + String message = "Missing RouterRMToken Request." + + " Please try again by specifying a router rm token information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + RouterStoreToken storeToken = request.getRouterStoreToken(); + if (storeToken == null) { + String message = "Missing RouterStoreToken." + + " Please try again by specifying a router rm token information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + try { + YARNDelegationTokenIdentifier identifier = storeToken.getTokenIdentifier(); + if (identifier == null) { + String message = "Missing YARNDelegationTokenIdentifier." + + " Please try again by specifying a router rm token information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } catch (Exception e) { + throw new FederationStateStoreInvalidInputException(e); + } + } + + /** + * We will check with the RouterMasterKeyRequest{@link RouterMasterKeyRequest} + * to ensure that the request object is not empty and that the RouterMasterKey is not empty. + * + * @param request RouterMasterKey Request. + * @throws FederationStateStoreInvalidInputException if the request is invalid. + */ + public static void validate(RouterMasterKeyRequest request) + throws FederationStateStoreInvalidInputException { + + // Verify the request to ensure that the request is not empty, + // if the request is found to be empty, an exception will be thrown. + if (request == null) { + String message = "Missing RouterMasterKey Request." + + " Please try again by specifying a router master key request information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + + // Check whether the masterKey is empty, + // if the masterKey is empty, throw an exception message. + RouterMasterKey masterKey = request.getRouterMasterKey(); + if (masterKey == null) { + String message = "Missing RouterMasterKey." + + " Please try again by specifying a router master key information."; + LOG.warn(message); + throw new FederationStateStoreInvalidInputException(message); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 47cb9e9e35cc7..42cac4e1958be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -849,4 +849,13 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentif RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); return stateStore.getTokenByRouterStoreToken(request); } + + /** + * stateStore provides DelegationTokenSeqNum increase. + * + * @return delegationTokenSequenceNumber + */ + public int incrementDelegationTokenSeqNum() { + return stateStore.incrementDelegationTokenSeqNum(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 3786f7cccc0c8..21dc2031c4201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -879,7 +879,7 @@ public void testStoreNewMasterKey() throws Exception { } @Test - public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { + public void testGetMasterKeyByDelegationKey() throws Exception { // store delegation key; DelegationKey key = new DelegationKey(5678, 8765, "keyBytes".getBytes()); Set keySet = new HashSet<>(); @@ -905,7 +905,7 @@ public void testGetMasterKeyByDelegationKey() throws YarnException, IOException } @Test - public void testRemoveStoredMasterKey() throws YarnException, IOException { + public void testRemoveStoredMasterKey() throws Exception { // store delegation key; DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes()); Set keySet = new HashSet<>(); @@ -929,7 +929,7 @@ public void testRemoveStoredMasterKey() throws YarnException, IOException { } @Test - public void testStoreNewToken() throws IOException, YarnException { + public void testStoreNewToken() throws Exception { // prepare parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner1"), new Text("renewer1"), new Text("realuser1")); @@ -952,7 +952,7 @@ public void testStoreNewToken() throws IOException, YarnException { } @Test - public void testUpdateStoredToken() throws IOException, YarnException { + public void testUpdateStoredToken() throws Exception { // prepare saveToken parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner2"), new Text("renewer2"), new Text("realuser2")); @@ -984,7 +984,7 @@ public void testUpdateStoredToken() throws IOException, YarnException { } @Test - public void testRemoveStoredToken() throws IOException, YarnException { + public void testRemoveStoredToken() throws Exception { // prepare saveToken parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner3"), new Text("renewer3"), new Text("realuser3")); @@ -1008,7 +1008,7 @@ public void testRemoveStoredToken() throws IOException, YarnException { } @Test - public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + public void testGetTokenByRouterStoreToken() throws Exception { // prepare saveToken parameters RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( new Text("owner4"), new Text("renewer4"), new Text("realuser4")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index cddcf29ffb079..48bb2ff628f43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -564,32 +564,32 @@ public void testStoreNewMasterKey() throws Exception { } @Test(expected = NotImplementedException.class) - public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { + public void testGetMasterKeyByDelegationKey() throws Exception { super.testGetMasterKeyByDelegationKey(); } @Test(expected = NotImplementedException.class) - public void testRemoveStoredMasterKey() throws YarnException, IOException { + public void testRemoveStoredMasterKey() throws Exception { super.testRemoveStoredMasterKey(); } @Test(expected = NotImplementedException.class) - public void testStoreNewToken() throws IOException, YarnException { + public void testStoreNewToken() throws Exception { super.testStoreNewToken(); } @Test(expected = NotImplementedException.class) - public void testUpdateStoredToken() throws IOException, YarnException { + public void testUpdateStoredToken() throws Exception { super.testUpdateStoredToken(); } @Test(expected = NotImplementedException.class) - public void testRemoveStoredToken() throws IOException, YarnException { + public void testRemoveStoredToken() throws Exception { super.testRemoveStoredToken(); } @Test(expected = NotImplementedException.class) - public void testGetTokenByRouterStoreToken() throws IOException, YarnException { + public void testGetTokenByRouterStoreToken() throws Exception { super.testGetTokenByRouterStoreToken(); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index 18396cb821566..96633659a7eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -17,39 +17,70 @@ package org.apache.hadoop.yarn.server.federation.store.impl; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; -import org.apache.commons.lang3.NotImplementedException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse; +import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals; /** * Unit tests for ZookeeperFederationStateStore. */ -public class TestZookeeperFederationStateStore - extends FederationStateStoreBaseTest { +public class TestZookeeperFederationStateStore extends FederationStateStoreBaseTest { private static final Logger LOG = LoggerFactory.getLogger(TestZookeeperFederationStateStore.class); + private static final String ZNODE_FEDERATIONSTORE = + "/federationstore"; + private static final String ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT = + "/router_rm_dt_secret_manager_root"; + private static final String ZNODE_ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = + "/router_rm_delegation_tokens_root"; + private static final String ZNODE_ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = + "/router_rm_dt_master_keys_root/"; + private static final String ROUTER_RM_DELEGATION_TOKEN_PREFIX = "rm_delegation_token_"; + private static final String ROUTER_RM_DELEGATION_KEY_PREFIX = "delegation_key_"; + + private static final String ZNODE_DT_PREFIX = ZNODE_FEDERATIONSTORE + + ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT + ZNODE_ROUTER_RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME; + private static final String ZNODE_MASTER_KEY_PREFIX = ZNODE_FEDERATIONSTORE + + ZNODE_ROUTER_RM_DT_SECRET_MANAGER_ROOT + ZNODE_ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME; + /** Zookeeper test server. */ private static TestingServer curatorTestingServer; private static CuratorFramework curatorFramework; @@ -171,38 +202,270 @@ public void testMetricsInited() throws Exception { MetricsRecords.assertMetric(record, "UpdateReservationHomeSubClusterNumOps", expectOps); } - @Test(expected = NotImplementedException.class) public void testStoreNewMasterKey() throws Exception { - super.testStoreNewMasterKey(); + + // Manually create a DelegationKey, + // and call the interface storeNewMasterKey to write the data to zk. + DelegationKey key = new DelegationKey(1234, Time.now() + 60 * 60, "keyBytes".getBytes()); + RouterMasterKey paramRouterMasterKey = RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + FederationStateStore stateStore = this.getStateStore(); + + assertTrue(stateStore instanceof ZookeeperFederationStateStore); + + // Compare the data returned by the storeNewMasterKey + // interface with the data queried by zk, and ensure that the data is consistent. + RouterMasterKeyRequest routerMasterKeyRequest = + RouterMasterKeyRequest.newInstance(paramRouterMasterKey); + RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest); + assertNotNull(response); + RouterMasterKey respRouterMasterKey = response.getRouterMasterKey(); + assertNotNull(respRouterMasterKey); + + // Get Data From zk. + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + key.getKeyId(); + String nodePath = ZNODE_MASTER_KEY_PREFIX + nodeName; + RouterMasterKey zkRouterMasterKey = getRouterMasterKeyFromZK(nodePath); + + assertNotNull(zkRouterMasterKey); + assertEquals(paramRouterMasterKey, respRouterMasterKey); + assertEquals(paramRouterMasterKey, zkRouterMasterKey); + assertEquals(zkRouterMasterKey, respRouterMasterKey); } - @Test(expected = NotImplementedException.class) - public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { - super.testGetMasterKeyByDelegationKey(); + public void testGetMasterKeyByDelegationKey() throws Exception { + + // Manually create a DelegationKey, + // and call the interface storeNewMasterKey to write the data to zk. + DelegationKey key = new DelegationKey(5678, Time.now() + 60 * 60, "keyBytes".getBytes()); + RouterMasterKey paramRouterMasterKey = RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + FederationStateStore stateStore = this.getStateStore(); + + assertTrue(stateStore instanceof ZookeeperFederationStateStore); + + // Call the getMasterKeyByDelegationKey interface of stateStore to get the MasterKey data. + RouterMasterKeyRequest routerMasterKeyRequest = + RouterMasterKeyRequest.newInstance(paramRouterMasterKey); + RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest); + assertNotNull(response); + + // Get Data From zk. + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + key.getKeyId(); + String nodePath = ZNODE_MASTER_KEY_PREFIX + nodeName; + RouterMasterKey zkRouterMasterKey = getRouterMasterKeyFromZK(nodePath); + + // Call the getMasterKeyByDelegationKey interface to get the returned result. + // The zk data should be consistent with the returned data. + RouterMasterKeyResponse response1 = + stateStore.getMasterKeyByDelegationKey(routerMasterKeyRequest); + assertNotNull(response1); + RouterMasterKey respRouterMasterKey = response1.getRouterMasterKey(); + assertEquals(paramRouterMasterKey, respRouterMasterKey); + assertEquals(paramRouterMasterKey, zkRouterMasterKey); + assertEquals(zkRouterMasterKey, respRouterMasterKey); + } + + public void testRemoveStoredMasterKey() throws Exception { + + // Manually create a DelegationKey, + // and call the interface storeNewMasterKey to write the data to zk. + DelegationKey key = new DelegationKey(2345, Time.now() + 60 * 60, "keyBytes".getBytes()); + RouterMasterKey paramRouterMasterKey = RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + FederationStateStore stateStore = this.getStateStore(); + + assertTrue(stateStore instanceof ZookeeperFederationStateStore); + + // We need to ensure that the returned result is not empty. + RouterMasterKeyRequest routerMasterKeyRequest = + RouterMasterKeyRequest.newInstance(paramRouterMasterKey); + RouterMasterKeyResponse response = stateStore.storeNewMasterKey(routerMasterKeyRequest); + assertNotNull(response); + + // We will check if delegationToken exists in zk. + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + key.getKeyId(); + String nodePath = ZNODE_MASTER_KEY_PREFIX + nodeName; + assertTrue(curatorFramework.checkExists().forPath(nodePath) != null); + + // Call removeStoredMasterKey to remove the MasterKey data in zk. + RouterMasterKeyResponse response1 = stateStore.removeStoredMasterKey(routerMasterKeyRequest); + assertNotNull(response1); + RouterMasterKey respRouterMasterKey = response1.getRouterMasterKey(); + assertNotNull(respRouterMasterKey); + assertEquals(paramRouterMasterKey, respRouterMasterKey); + + // We have removed the RouterMasterKey data from zk, + // the path should be empty at this point. + assertTrue(curatorFramework.checkExists().forPath(nodePath) == null); + } + + public void testStoreNewToken() throws Exception { + + // We manually generate the DelegationToken, + // and then call the StoreNewToken method to store the Token in zk. + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + FederationStateStore stateStore = this.getStateStore(); + int seqNum = stateStore.incrementDelegationTokenSeqNum(); + identifier.setSequenceNumber(seqNum); + Long renewDate = Time.now(); + + // Store new rm-token + RouterStoreToken paramStoreToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(paramStoreToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + assertNotNull(routerRMTokenResponse); + RouterStoreToken respStoreToken = routerRMTokenResponse.getRouterStoreToken(); + assertNotNull(respStoreToken); + + // Get delegationToken Path + String nodeName = ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber(); + String nodePath = getNodePath(ZNODE_DT_PREFIX, nodeName); + + // Check if the path exists, we expect the result to exist. + assertTrue(curatorFramework.checkExists().forPath(nodePath) != null); + + // Check whether the token (paramStoreToken) + // We generated is consistent with the data stored in zk. + // We expect data to be consistent. + RouterStoreToken zkRouterStoreToken = getStoreTokenFromZK(nodePath); + assertNotNull(zkRouterStoreToken); + assertEquals(paramStoreToken, zkRouterStoreToken); + assertEquals(respStoreToken, zkRouterStoreToken); } - @Test(expected = NotImplementedException.class) - public void testRemoveStoredMasterKey() throws YarnException, IOException { - super.testRemoveStoredMasterKey(); + public void testUpdateStoredToken() throws Exception { + + // We manually generate the DelegationToken, + // and then call the StoreNewToken method to store the Token in zk. + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + FederationStateStore stateStore = this.getStateStore(); + int seqNum = stateStore.incrementDelegationTokenSeqNum(); + identifier.setSequenceNumber(seqNum); + Long renewDate = Time.now(); + + // Store new rm-token() + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + + // We are ready to update some data renewDate2 & sequenceNumber2 + Long renewDate2 = Time.now(); + int sequenceNumber2 = stateStore.incrementDelegationTokenSeqNum(); + identifier.setSequenceNumber(sequenceNumber2); + + // Update rm-token + RouterStoreToken paramStoreToken = RouterStoreToken.newInstance(identifier, renewDate2); + RouterRMTokenRequest updateTokenRequest = RouterRMTokenRequest.newInstance(paramStoreToken); + RouterRMTokenResponse updateTokenResponse = stateStore.updateStoredToken(updateTokenRequest); + Assert.assertNotNull(updateTokenResponse); + RouterStoreToken updateTokenResp = updateTokenResponse.getRouterStoreToken(); + + // Get delegationToken Path + String nodeName = ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber(); + String nodePath = getNodePath(ZNODE_DT_PREFIX, nodeName); + + // Check if the path exists, we expect the result to exist. + assertTrue(curatorFramework.checkExists().forPath(nodePath) != null); + + // Check whether the token (paramStoreToken) + // We generated is consistent with the data stored in zk. + // We expect data to be consistent. + RouterStoreToken zkRouterStoreToken = getStoreTokenFromZK(nodePath); + assertNotNull(zkRouterStoreToken); + assertEquals(paramStoreToken, zkRouterStoreToken); + assertEquals(updateTokenResp, zkRouterStoreToken); } - @Test(expected = NotImplementedException.class) - public void testStoreNewToken() throws IOException, YarnException { - super.testStoreNewToken(); + public void testRemoveStoredToken() throws Exception { + + // We manually generate the DelegationToken, + // and then call the StoreNewToken method to store the Token in zk. + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + FederationStateStore stateStore = this.getStateStore(); + int seqNum = stateStore.incrementDelegationTokenSeqNum(); + identifier.setSequenceNumber(seqNum); + Long renewDate = Time.now(); + + // Store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + + // Get delegationToken Path + String nodeName = ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber(); + String nodePath = getNodePath(ZNODE_DT_PREFIX, nodeName); + + // Check if the path exists, we expect the result to exist. + assertTrue(curatorFramework.checkExists().forPath(nodePath) != null); + + // Remove stored-token + stateStore.removeStoredToken(request); + + // After the data is deleted, the path should not exist in zk + assertTrue(curatorFramework.checkExists().forPath(nodePath) == null); } - @Test(expected = NotImplementedException.class) - public void testUpdateStoredToken() throws IOException, YarnException { - super.testUpdateStoredToken(); + public void testGetTokenByRouterStoreToken() throws Exception { + + // We manually generate the DelegationToken, + // and then call the StoreNewToken method to store the Token in zk. + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier( + new Text("owner2"), new Text("renewer2"), new Text("realuser2")); + FederationStateStore stateStore = this.getStateStore(); + int seqNum = stateStore.incrementDelegationTokenSeqNum(); + identifier.setSequenceNumber(seqNum); + Long renewDate = Time.now(); + + // Store new rm-token + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); + RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); + RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); + Assert.assertNotNull(routerRMTokenResponse); + RouterStoreToken getTokenResp = routerRMTokenResponse.getRouterStoreToken(); + + // Call getTokenByRouterStoreToken And Get Result + RouterRMTokenResponse routerGetRMTokenResponse = stateStore.getTokenByRouterStoreToken(request); + Assert.assertNotNull(routerGetRMTokenResponse); + + // Get delegationToken Path + String nodeName = ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber(); + String nodePath = getNodePath(ZNODE_DT_PREFIX, nodeName); + + // Check whether the token (paramStoreToken) + // We generated is consistent with the data stored in zk. + // We expect data to be consistent. + RouterStoreToken zkRouterStoreToken = getStoreTokenFromZK(nodePath); + assertNotNull(zkRouterStoreToken); + assertEquals(getTokenResp, zkRouterStoreToken); } - @Test(expected = NotImplementedException.class) - public void testRemoveStoredToken() throws IOException, YarnException { - super.testRemoveStoredToken(); + private RouterStoreToken getStoreTokenFromZK(String nodePath) + throws Exception { + byte[] data = curatorFramework.getData().forPath(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class); + storeToken.readFields(din); + return storeToken; } - @Test(expected = NotImplementedException.class) - public void testGetTokenByRouterStoreToken() throws IOException, YarnException { - super.testGetTokenByRouterStoreToken(); + private RouterMasterKey getRouterMasterKeyFromZK(String nodePath) throws Exception { + byte[] data = curatorFramework.getData().forPath(nodePath); + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + DelegationKey zkDT = new DelegationKey(); + zkDT.readFields(din); + RouterMasterKey zkRouterMasterKey = RouterMasterKey.newInstance( + zkDT.getKeyId(), ByteBuffer.wrap(zkDT.getEncodedKey()), zkDT.getExpiryDate()); + return zkRouterMasterKey; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 3cc4dab238618..985196bf75e07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -423,6 +423,11 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req return stateStoreClient.getTokenByRouterStoreToken(request); } + @Override + public int incrementDelegationTokenSeqNum() { + return stateStoreClient.incrementDelegationTokenSeqNum(); + } + /** * Create a thread that cleans up the app. * @param stage rm-start/rm-stop. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java index de74c41fe7928..74dbfa1c79e5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java @@ -251,4 +251,9 @@ public synchronized Map getAllTokens() { } return allTokens; } + + @Override + protected synchronized int incrementDelegationTokenSeqNum() { + return federationFacade.incrementDelegationTokenSeqNum(); + } }