Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
*/
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException;

/**
* The Router Supports incrementDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int incrementDelegationTokenSeqNum();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;

Expand Down Expand Up @@ -110,6 +111,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;
private AtomicInteger sequenceNum;

private final MonotonicClock clock = new MonotonicClock();

Expand All @@ -126,6 +128,7 @@ public void init(Configuration conf) {
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
sequenceNum = new AtomicInteger();
}

@Override
Expand Down Expand Up @@ -534,6 +537,11 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
return RouterRMTokenResponse.newInstance(resultToken);
}

@Override
public synchronized int incrementDelegationTokenSeqNum() {
return sequenceNum.incrementAndGet();
}

private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws IOException {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,4 +1394,9 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public int incrementDelegationTokenSeqNum() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;

@Metric("Duration for a store new master key call")
private MutableRate storeNewMasterKey;

@Metric("Duration for a remove new master key call")
private MutableRate removeStoredMasterKey;

@Metric("Duration for a get master key by delegation key call")
private MutableRate getMasterKeyByDelegationKey;

@Metric("Duration for a store new token call")
private MutableRate storeNewToken;

@Metric("Duration for a update stored token call")
private MutableRate updateStoredToken;

@Metric("Duration for a remove stored token call")
private MutableRate removeStoredToken;

@Metric("Duration for a get token by router store token call")
private MutableRate getTokenByRouterStoreToken;

protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");

Expand Down Expand Up @@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT
public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}

public void addStoreNewMasterKeyDuration(long startTime, long endTime) {
storeNewMasterKey.add(endTime - startTime);
}

public void removeStoredMasterKeyDuration(long startTime, long endTime) {
removeStoredMasterKey.add(endTime - startTime);
}

public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) {
getMasterKeyByDelegationKey.add(endTime - startTime);
}

public void getStoreNewTokenDuration(long startTime, long endTime) {
storeNewToken.add(endTime - startTime);
}

public void updateStoredTokenDuration(long startTime, long endTime) {
updateStoredToken.add(endTime - startTime);
}

public void removeStoredTokenDuration(long startTime, long endTime) {
removeStoredToken.add(endTime - startTime);
}

public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) {
getTokenByRouterStoreToken.add(endTime - startTime);
}
}
Loading