diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index b7283adcada56..2dec2df4b9d6c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -36,7 +36,17 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.Text; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair; import org.apache.hadoop.metrics2.util.Metrics2Util.TopN; import org.apache.hadoop.security.AccessControlException; @@ -47,6 +57,7 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.InvocationRaisingIOE; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +107,10 @@ private String formatTokenId(TokenIdent id) { * Access to currentKey is protected by this object lock */ private DelegationKey currentKey; + /** + * Metrics to track token management operations. + */ + private DelegationTokenSecretManagerMetrics metrics; private long keyUpdateInterval; private long tokenMaxLifetime; @@ -134,6 +149,7 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval, this.tokenRenewInterval = delegationTokenRenewInterval; this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; this.storeTokenTrackingId = false; + this.metrics = DelegationTokenSecretManagerMetrics.create(); } /** should be called before this object is used */ @@ -430,14 +446,14 @@ protected synchronized byte[] createPassword(TokenIdent identifier) { DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); try { - storeToken(identifier, tokenInfo); + metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo)); } catch (IOException ioe) { LOG.error("Could not store token " + formatTokenId(identifier) + "!!", ioe); } return password; } - + /** @@ -555,7 +571,7 @@ public synchronized long renewToken(Token token, throw new InvalidToken("Renewal request for unknown token " + formatTokenId(id)); } - updateToken(id, info); + metrics.trackUpdateToken(() -> updateToken(id, info)); return renewTime; } @@ -591,8 +607,10 @@ public synchronized TokenIdent cancelToken(Token token, if (info == null) { throw new InvalidToken("Token not found " + formatTokenId(id)); } - removeTokenForOwnerStats(id); - removeStoredToken(id); + metrics.trackRemoveToken(() -> { + removeTokenForOwnerStats(id); + removeStoredToken(id); + }); return id; } @@ -825,4 +843,97 @@ protected void syncTokenOwnerStats() { addTokenForOwnerStats(id); } } + + protected DelegationTokenSecretManagerMetrics getMetrics() { + return metrics; + } + + /** + * DelegationTokenSecretManagerMetrics tracks token management operations + * and publishes them through the metrics interfaces. + */ + @Metrics(about="Delegation token secret manager metrics", context="token") + static class DelegationTokenSecretManagerMetrics implements DurationTrackerFactory { + private static final Logger LOG = LoggerFactory.getLogger( + DelegationTokenSecretManagerMetrics.class); + + final static String STORE_TOKEN_STAT = "storeToken"; + final static String UPDATE_TOKEN_STAT = "updateToken"; + final static String REMOVE_TOKEN_STAT = "removeToken"; + final static String TOKEN_FAILURE_STAT = "tokenFailure"; + + private final MetricsRegistry registry; + private final IOStatisticsStore ioStatistics; + + @Metric("Rate of storage of delegation tokens and latency (milliseconds)") + private MutableRate storeToken; + @Metric("Rate of update of delegation tokens and latency (milliseconds)") + private MutableRate updateToken; + @Metric("Rate of removal of delegation tokens and latency (milliseconds)") + private MutableRate removeToken; + @Metric("Counter of delegation tokens operation failures") + private MutableCounterLong tokenFailure; + + static DelegationTokenSecretManagerMetrics create() { + return DefaultMetricsSystem.instance().register(new DelegationTokenSecretManagerMetrics()); + } + + DelegationTokenSecretManagerMetrics() { + ioStatistics = IOStatisticsBinding.iostatisticsStore() + .withDurationTracking(STORE_TOKEN_STAT, UPDATE_TOKEN_STAT, REMOVE_TOKEN_STAT) + .withCounters(TOKEN_FAILURE_STAT) + .build(); + registry = new MetricsRegistry("DelegationTokenSecretManagerMetrics"); + LOG.debug("Initialized {}", registry); + } + + public void trackStoreToken(InvocationRaisingIOE invocation) throws IOException { + trackInvocation(invocation, STORE_TOKEN_STAT, storeToken); + } + + public void trackUpdateToken(InvocationRaisingIOE invocation) throws IOException { + trackInvocation(invocation, UPDATE_TOKEN_STAT, updateToken); + } + + public void trackRemoveToken(InvocationRaisingIOE invocation) throws IOException { + trackInvocation(invocation, REMOVE_TOKEN_STAT, removeToken); + } + + public void trackInvocation(InvocationRaisingIOE invocation, String statistic, + MutableRate metric) throws IOException { + try { + long start = Time.monotonicNow(); + IOStatisticsBinding.trackDurationOfInvocation(this, statistic, invocation); + metric.add(Time.monotonicNow() - start); + } catch (Exception ex) { + tokenFailure.incr(); + throw ex; + } + } + + @Override + public DurationTracker trackDuration(String key, long count) { + return ioStatistics.trackDuration(key, count); + } + + protected MutableRate getStoreToken() { + return storeToken; + } + + protected MutableRate getUpdateToken() { + return updateToken; + } + + protected MutableRate getRemoveToken() { + return removeToken; + } + + protected MutableCounterLong getTokenFailure() { + return tokenFailure; + } + + protected IOStatisticsStore getIoStatistics() { + return ioStatistics; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java index 8bc881ae5d1da..ca0b18bb1df28 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java @@ -30,6 +30,12 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.hadoop.fs.statistics.IOStatisticAssertions; +import org.apache.hadoop.fs.statistics.MeanStatistic; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assert; import org.apache.hadoop.io.DataInputBuffer; @@ -155,6 +161,55 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) { return allKeys.get(id.getMasterKeyId()); } } + + public static class TestFailureDelegationTokenSecretManager + extends TestDelegationTokenSecretManager { + private boolean throwError = false; + private long errorSleepMillis; + + public TestFailureDelegationTokenSecretManager(long errorSleepMillis) { + super(24*60*60*1000, 10*1000, 1*1000, 60*60*1000); + this.errorSleepMillis = errorSleepMillis; + } + + public void setThrowError(boolean throwError) { + this.throwError = throwError; + } + + private void sleepAndThrow() throws IOException { + try { + Thread.sleep(errorSleepMillis); + throw new IOException("Test exception"); + } catch (InterruptedException e) { + } + } + + @Override + protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate) + throws IOException { + if (throwError) { + sleepAndThrow(); + } + super.storeNewToken(ident, renewDate); + } + + @Override + protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException { + if (throwError) { + sleepAndThrow(); + } + super.removeStoredToken(ident); + } + + @Override + protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate) + throws IOException { + if (throwError) { + sleepAndThrow(); + } + super.updateStoredToken(ident, renewDate); + } + } public static class TokenSelector extends AbstractDelegationTokenSelector{ @@ -579,4 +634,85 @@ public void testEmptyToken() throws IOException { assertEquals(token1, token2); assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString()); } + + @Test + public void testDelegationTokenSecretManagerMetrics() throws Exception { + TestDelegationTokenSecretManager dtSecretManager = + new TestDelegationTokenSecretManager(24*60*60*1000, + 10*1000, 1*1000, 60*60*1000); + try { + dtSecretManager.startThreads(); + + final Token token = callAndValidateMetrics( + dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken", + () -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1); + + callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(), + "updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1); + + callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(), + "removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1); + } finally { + dtSecretManager.stopThreads(); + } + } + + @Test + public void testDelegationTokenSecretManagerMetricsFailures() throws Exception { + int errorSleepMillis = 200; + TestFailureDelegationTokenSecretManager dtSecretManager = + new TestFailureDelegationTokenSecretManager(errorSleepMillis); + + try { + dtSecretManager.startThreads(); + + final Token token = + generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"); + + dtSecretManager.setThrowError(true); + + callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false, + errorSleepMillis, + () -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker")); + + callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true, + errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker")); + + callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true, + errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker")); + } finally { + dtSecretManager.stopThreads(); + } + } + + private T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager, + MutableRate metric, String statName, Callable callable, int expectedCount) + throws Exception { + MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic( + dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean"); + assertEquals(expectedCount - 1, metric.lastStat().numSamples()); + assertEquals(expectedCount - 1, stat.getSamples()); + T returnedObject = callable.call(); + assertEquals(expectedCount, metric.lastStat().numSamples()); + assertEquals(expectedCount, stat.getSamples()); + return returnedObject; + } + + private void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager, + String statName, int expectedStatCount, int expectedMetricCount, boolean expectError, + int errorSleepMillis, Callable callable) throws Exception { + MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure(); + MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic( + dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean"); + assertEquals(expectedMetricCount - 1, counter.value()); + assertEquals(expectedStatCount - 1, failureStat.getSamples()); + if (expectError) { + LambdaTestUtils.intercept(IOException.class, callable); + } else { + callable.call(); + } + assertEquals(expectedMetricCount, counter.value()); + assertEquals(expectedStatCount, failureStat.getSamples()); + assertTrue(failureStat.getSum() >= errorSleepMillis); + } }