|
30 | 30 | import java.util.List; |
31 | 31 | import java.util.Map; |
32 | 32 |
|
| 33 | +import java.util.concurrent.Callable; |
| 34 | +import org.apache.hadoop.fs.statistics.IOStatisticAssertions; |
| 35 | +import org.apache.hadoop.fs.statistics.MeanStatistic; |
| 36 | +import org.apache.hadoop.metrics2.lib.MutableCounterLong; |
| 37 | +import org.apache.hadoop.metrics2.lib.MutableRate; |
| 38 | +import org.apache.hadoop.test.LambdaTestUtils; |
33 | 39 | import org.junit.Assert; |
34 | 40 |
|
35 | 41 | import org.apache.hadoop.io.DataInputBuffer; |
@@ -155,6 +161,55 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) { |
155 | 161 | return allKeys.get(id.getMasterKeyId()); |
156 | 162 | } |
157 | 163 | } |
| 164 | + |
| 165 | + public static class TestFailureDelegationTokenSecretManager |
| 166 | + extends TestDelegationTokenSecretManager { |
| 167 | + private boolean throwError = false; |
| 168 | + private long errorSleepMillis; |
| 169 | + |
| 170 | + public TestFailureDelegationTokenSecretManager(long errorSleepMillis) { |
| 171 | + super(24*60*60*1000, 10*1000, 1*1000, 60*60*1000); |
| 172 | + this.errorSleepMillis = errorSleepMillis; |
| 173 | + } |
| 174 | + |
| 175 | + public void setThrowError(boolean throwError) { |
| 176 | + this.throwError = throwError; |
| 177 | + } |
| 178 | + |
| 179 | + private void sleepAndThrow() throws IOException { |
| 180 | + try { |
| 181 | + Thread.sleep(errorSleepMillis); |
| 182 | + throw new IOException("Test exception"); |
| 183 | + } catch (InterruptedException e) { |
| 184 | + } |
| 185 | + } |
| 186 | + |
| 187 | + @Override |
| 188 | + protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate) |
| 189 | + throws IOException { |
| 190 | + if (throwError) { |
| 191 | + sleepAndThrow(); |
| 192 | + } |
| 193 | + super.storeNewToken(ident, renewDate); |
| 194 | + } |
| 195 | + |
| 196 | + @Override |
| 197 | + protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException { |
| 198 | + if (throwError) { |
| 199 | + sleepAndThrow(); |
| 200 | + } |
| 201 | + super.removeStoredToken(ident); |
| 202 | + } |
| 203 | + |
| 204 | + @Override |
| 205 | + protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate) |
| 206 | + throws IOException { |
| 207 | + if (throwError) { |
| 208 | + sleepAndThrow(); |
| 209 | + } |
| 210 | + super.updateStoredToken(ident, renewDate); |
| 211 | + } |
| 212 | + } |
158 | 213 |
|
159 | 214 | public static class TokenSelector extends |
160 | 215 | AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{ |
@@ -579,4 +634,85 @@ public void testEmptyToken() throws IOException { |
579 | 634 | assertEquals(token1, token2); |
580 | 635 | assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString()); |
581 | 636 | } |
| 637 | + |
| 638 | + @Test |
| 639 | + public void testDelegationTokenSecretManagerMetrics() throws Exception { |
| 640 | + TestDelegationTokenSecretManager dtSecretManager = |
| 641 | + new TestDelegationTokenSecretManager(24*60*60*1000, |
| 642 | + 10*1000, 1*1000, 60*60*1000); |
| 643 | + try { |
| 644 | + dtSecretManager.startThreads(); |
| 645 | + |
| 646 | + final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics( |
| 647 | + dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken", |
| 648 | + () -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1); |
| 649 | + |
| 650 | + callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(), |
| 651 | + "updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1); |
| 652 | + |
| 653 | + callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(), |
| 654 | + "removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1); |
| 655 | + } finally { |
| 656 | + dtSecretManager.stopThreads(); |
| 657 | + } |
| 658 | + } |
| 659 | + |
| 660 | + @Test |
| 661 | + public void testDelegationTokenSecretManagerMetricsFailures() throws Exception { |
| 662 | + int errorSleepMillis = 200; |
| 663 | + TestFailureDelegationTokenSecretManager dtSecretManager = |
| 664 | + new TestFailureDelegationTokenSecretManager(errorSleepMillis); |
| 665 | + |
| 666 | + try { |
| 667 | + dtSecretManager.startThreads(); |
| 668 | + |
| 669 | + final Token<TestDelegationTokenIdentifier> token = |
| 670 | + generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"); |
| 671 | + |
| 672 | + dtSecretManager.setThrowError(true); |
| 673 | + |
| 674 | + callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false, |
| 675 | + errorSleepMillis, |
| 676 | + () -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker")); |
| 677 | + |
| 678 | + callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true, |
| 679 | + errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker")); |
| 680 | + |
| 681 | + callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true, |
| 682 | + errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker")); |
| 683 | + } finally { |
| 684 | + dtSecretManager.stopThreads(); |
| 685 | + } |
| 686 | + } |
| 687 | + |
| 688 | + private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager, |
| 689 | + MutableRate metric, String statName, Callable<T> callable, int expectedCount) |
| 690 | + throws Exception { |
| 691 | + MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic( |
| 692 | + dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean"); |
| 693 | + assertEquals(expectedCount - 1, metric.lastStat().numSamples()); |
| 694 | + assertEquals(expectedCount - 1, stat.getSamples()); |
| 695 | + T returnedObject = callable.call(); |
| 696 | + assertEquals(expectedCount, metric.lastStat().numSamples()); |
| 697 | + assertEquals(expectedCount, stat.getSamples()); |
| 698 | + return returnedObject; |
| 699 | + } |
| 700 | + |
| 701 | + private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager, |
| 702 | + String statName, int expectedStatCount, int expectedMetricCount, boolean expectError, |
| 703 | + int errorSleepMillis, Callable<T> callable) throws Exception { |
| 704 | + MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure(); |
| 705 | + MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic( |
| 706 | + dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean"); |
| 707 | + assertEquals(expectedMetricCount - 1, counter.value()); |
| 708 | + assertEquals(expectedStatCount - 1, failureStat.getSamples()); |
| 709 | + if (expectError) { |
| 710 | + LambdaTestUtils.intercept(IOException.class, callable); |
| 711 | + } else { |
| 712 | + callable.call(); |
| 713 | + } |
| 714 | + assertEquals(expectedMetricCount, counter.value()); |
| 715 | + assertEquals(expectedStatCount, failureStat.getSamples()); |
| 716 | + assertTrue(failureStat.getSum() >= errorSleepMillis); |
| 717 | + } |
582 | 718 | } |
0 commit comments