From c26d149462fdf2387b76de4455aafffb1cf8ec58 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 20 Dec 2018 09:31:17 -0700 Subject: [PATCH 1/2] Security: propagate auth result to listeners After #30794, our caching realms limit each principal to a single auth attempt at a time. This prevents hammering of external servers but can cause a significant performance hit when requests need to go through a realm that takes a long time to attempt to authenticate in order to get to the realm that actually authenticates. In order to address this, this change will propagate failed results to listeners if they use the same set of credentials that the authentication attempt used. This does prevent these stalled requests from retrying the authentication attempt but the implementation does allow for new requests to retry the attempt. --- .../support/CachingUsernamePasswordRealm.java | 117 +++++++++--------- .../CachingUsernamePasswordRealmTests.java | 103 ++++++++++----- 2 files changed, 136 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java index 3924370fb33ec..0c27eb68302ea 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.authc.support; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.settings.SecureString; @@ -29,7 +30,7 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm { - private final Cache> cache; + private final Cache> cache; private final ThreadPool threadPool; private final boolean authenticationEnabled; final Hasher cacheHasher; @@ -40,7 +41,7 @@ protected CachingUsernamePasswordRealm(RealmConfig config, ThreadPool threadPool this.threadPool = threadPool; final TimeValue ttl = this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING); if (ttl.getNanos() > 0) { - cache = CacheBuilder.>builder() + cache = CacheBuilder.>builder() .setExpireAfterWrite(ttl) .setMaximumWeight(this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING)) .build(); @@ -121,58 +122,61 @@ public final void authenticate(AuthenticationToken authToken, ActionListener listener) { try { final AtomicBoolean authenticationInCache = new AtomicBoolean(true); - final ListenableFuture listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> { + final ListenableFuture listenableCacheEntry = cache.computeIfAbsent(token.principal(), k -> { authenticationInCache.set(false); return new ListenableFuture<>(); }); if (authenticationInCache.get()) { // there is a cached or an inflight authenticate request - listenableCacheEntry.addListener(ActionListener.wrap(authenticatedUserWithHash -> { - if (authenticatedUserWithHash != null && authenticatedUserWithHash.verify(token.credentials())) { - // cached credential hash matches the credential hash for this forestalled request - handleCachedAuthentication(authenticatedUserWithHash.user, ActionListener.wrap(cacheResult -> { - if (cacheResult.isAuthenticated()) { - logger.debug("realm [{}] authenticated user [{}], with roles [{}]", - name(), token.principal(), cacheResult.getUser().roles()); - } else { - logger.debug("realm [{}] authenticated user [{}] from cache, but then failed [{}]", - name(), token.principal(), cacheResult.getMessage()); - } - listener.onResponse(cacheResult); - }, listener::onFailure)); + listenableCacheEntry.addListener(ActionListener.wrap(cachedResult -> { + final boolean credsMatch = cachedResult.verify(token.credentials()); + if (cachedResult.authenticationResult.isAuthenticated()) { + if (credsMatch) { + // cached credential hash matches the credential hash for this forestalled request + handleCachedAuthentication(cachedResult.user, ActionListener.wrap(cacheResult -> { + if (cacheResult.isAuthenticated()) { + logger.debug("realm [{}] authenticated user [{}], with roles [{}]", + name(), token.principal(), cacheResult.getUser().roles()); + } else { + logger.debug("realm [{}] authenticated user [{}] from cache, but then failed [{}]", + name(), token.principal(), cacheResult.getMessage()); + } + listener.onResponse(cacheResult); + }, listener::onFailure)); + } else { + // its credential hash does not match the + // hash of the credential for this forestalled request. + // clear cache and try to reach the authentication source again because password + // might have changed there and the local cached hash got stale + cache.invalidate(token.principal(), listenableCacheEntry); + authenticateWithCache(token, listener); + } + } else if (credsMatch) { + // not authenticated but instead of hammering reuse the result. a new + // request will trigger a retried auth + listener.onResponse(cachedResult.authenticationResult); } else { - // The inflight request has failed or its credential hash does not match the - // hash of the credential for this forestalled request. - // clear cache and try to reach the authentication source again because password - // might have changed there and the local cached hash got stale cache.invalidate(token.principal(), listenableCacheEntry); authenticateWithCache(token, listener); } - }, e -> { - // the inflight request failed, so try again, but first (always) make sure cache - // is cleared of the failed authentication - cache.invalidate(token.principal(), listenableCacheEntry); - authenticateWithCache(token, listener); - }), threadPool.executor(ThreadPool.Names.GENERIC), threadPool.getThreadContext()); + }, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC), threadPool.getThreadContext()); } else { // attempt authentication against the authentication source doAuthenticate(token, ActionListener.wrap(authResult -> { - if (authResult.isAuthenticated() && authResult.getUser().enabled()) { - // compute the credential hash of this successful authentication request - final UserWithHash userWithHash = new UserWithHash(authResult.getUser(), token.credentials(), cacheHasher); - // notify any forestalled request listeners; they will not reach to the - // authentication request and instead will use this hash for comparison - listenableCacheEntry.onResponse(userWithHash); - } else { - // notify any forestalled request listeners; they will retry the request - listenableCacheEntry.onResponse(null); + if (authResult.isAuthenticated() == false || authResult.getUser().enabled() == false) { + // a new request should trigger a new authentication + cache.invalidate(token.principal(), listenableCacheEntry); } - // notify the listener of the inflight authentication request; this request is not retried + // notify any forestalled request listeners; they will not reach to the + // authentication request and instead will use this result if they contain + // the same credentials + listenableCacheEntry.onResponse(new CachedResult(authResult, cacheHasher, authResult.getUser(), token.credentials())); listener.onResponse(authResult); }, e -> { - // notify any staved off listeners; they will retry the request + cache.invalidate(token.principal(), listenableCacheEntry); + // notify any staved off listeners; they will propagate this error listenableCacheEntry.onFailure(e); - // notify the listener of the inflight authentication request; this request is not retried + // notify the listener of the inflight authentication request listener.onFailure(e); })); } @@ -223,25 +227,21 @@ public final void lookupUser(String username, ActionListener listener) { private void lookupWithCache(String username, ActionListener listener) { try { final AtomicBoolean lookupInCache = new AtomicBoolean(true); - final ListenableFuture listenableCacheEntry = cache.computeIfAbsent(username, key -> { + final ListenableFuture listenableCacheEntry = cache.computeIfAbsent(username, key -> { lookupInCache.set(false); return new ListenableFuture<>(); }); if (false == lookupInCache.get()) { // attempt lookup against the user directory doLookupUser(username, ActionListener.wrap(user -> { - if (user != null) { - // user found - final UserWithHash userWithHash = new UserWithHash(user, null, null); - // notify forestalled request listeners - listenableCacheEntry.onResponse(userWithHash); - } else { + final CachedResult result = new CachedResult(AuthenticationResult.notHandled(), cacheHasher, user, null); + if (user == null) { // user not found, invalidate cache so that subsequent requests are forwarded to // the user directory cache.invalidate(username, listenableCacheEntry); - // notify forestalled request listeners - listenableCacheEntry.onResponse(null); } + // notify forestalled request listeners + listenableCacheEntry.onResponse(result); }, e -> { // the next request should be forwarded, not halted by a failed lookup attempt cache.invalidate(username, listenableCacheEntry); @@ -249,9 +249,9 @@ private void lookupWithCache(String username, ActionListener listener) { listenableCacheEntry.onFailure(e); })); } - listenableCacheEntry.addListener(ActionListener.wrap(userWithHash -> { - if (userWithHash != null) { - listener.onResponse(userWithHash.user); + listenableCacheEntry.addListener(ActionListener.wrap(cachedResult -> { + if (cachedResult.user != null) { + listener.onResponse(cachedResult.user); } else { listener.onResponse(null); } @@ -263,16 +263,21 @@ private void lookupWithCache(String username, ActionListener listener) { protected abstract void doLookupUser(String username, ActionListener listener); - private static class UserWithHash { - final User user; - final char[] hash; + private static class CachedResult { + private final AuthenticationResult authenticationResult; + private final User user; + private final char[] hash; - UserWithHash(User user, SecureString password, Hasher hasher) { - this.user = Objects.requireNonNull(user); + private CachedResult(AuthenticationResult result, Hasher hasher, @Nullable User user, @Nullable SecureString password) { + this.authenticationResult = Objects.requireNonNull(result); + if (authenticationResult.isAuthenticated() && user == null) { + throw new IllegalArgumentException("authentication cannot be successful with a null user"); + } + this.user = user; this.hash = password == null ? null : hasher.hash(password); } - boolean verify(SecureString password) { + private boolean verify(SecureString password) { return hash != null && Hasher.verifyHash(password, hash); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java index ab824dcb34d18..c2c0a2c5fb5a5 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java @@ -58,13 +58,13 @@ public void setup() { } @After - public void stop() throws InterruptedException { + public void stop() { if (threadPool != null) { terminate(threadPool); } } - public void testCacheSettings() throws Exception { + public void testCacheSettings() { String cachingHashAlgo = Hasher.values()[randomIntBetween(0, Hasher.values().length - 1)].name().toLowerCase(Locale.ROOT); int maxUsers = randomIntBetween(10, 100); TimeValue ttl = TimeValue.timeValueMinutes(randomIntBetween(10, 20)); @@ -323,7 +323,7 @@ private void sleepUntil(long until) throws InterruptedException { } } - public void testAuthenticateContract() throws Exception { + public void testAuthenticateContract() { Realm realm = new FailingAuthenticationRealm(globalSettings, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.authenticate(new UsernamePasswordToken("user", new SecureString("pass")), future); @@ -337,7 +337,7 @@ public void testAuthenticateContract() throws Exception { assertThat(e.getMessage(), containsString("whatever exception")); } - public void testLookupContract() throws Exception { + public void testLookupContract() { Realm realm = new FailingAuthenticationRealm(globalSettings, threadPool); PlainActionFuture future = new PlainActionFuture<>(); realm.lookupUser("user", future); @@ -351,7 +351,7 @@ public void testLookupContract() throws Exception { assertThat(e.getMessage(), containsString("lookup exception")); } - public void testReturnDifferentObjectFromCache() throws Exception { + public void testReturnDifferentObjectFromCache() { final AtomicReference userArg = new AtomicReference<>(); final AtomicReference result = new AtomicReference<>(); Realm realm = new AlwaysAuthenticateCachingRealm(globalSettings, threadPool) { @@ -444,6 +444,76 @@ protected void doLookupUser(String username, ActionListener listener) { assertEquals(1, authCounter.get()); } + public void testUnauthenticatedResultPropagatesWithSameCreds() throws Exception { + final String username = "username"; + final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; + final AtomicInteger authCounter = new AtomicInteger(0); + final Hasher pwdHasher = Hasher.resolve(randomFrom("pbkdf2", "pbkdf2_1000", "bcrypt", "bcrypt9")); + final String passwordHash = new String(pwdHasher.hash(password)); + RealmConfig config = new RealmConfig(new RealmConfig.RealmIdentifier("caching", "test_realm"), globalSettings, + TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); + + + + final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); + final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3); + final int numberOfIterations = scaledRandomIntBetween(20, 100); + final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + List threads = new ArrayList<>(numberOfThreads); + final SecureString credsToUse = new SecureString(randomAlphaOfLength(12).toCharArray()); + final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm(config, threadPool) { + @Override + protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { + authCounter.incrementAndGet(); + // do something slow + if (pwdHasher.verify(token.credentials(), passwordHash.toCharArray())) { + listener.onFailure(new IllegalStateException("password auth should never succeed")); + } else { + listener.onResponse(AuthenticationResult.unsuccessful("password verification failed", null)); + } + } + + @Override + protected void doLookupUser(String username, ActionListener listener) { + listener.onFailure(new UnsupportedOperationException("this method should not be called")); + } + }; + for (int i = 0; i < numberOfThreads; i++) { + threads.add(new Thread(() -> { + try { + latch.countDown(); + latch.await(); + for (int i1 = 0; i1 < numberOfIterations; i1++) { + final UsernamePasswordToken token = new UsernamePasswordToken(username, credsToUse); + + realm.authenticate(token, ActionListener.wrap((result) -> { + if (result.isAuthenticated()) { + throw new IllegalStateException("invalid password led to an authenticated result: " + result); + } + assertThat(result.getMessage(), containsString("password verification failed")); + }, (e) -> { + logger.error("caught exception", e); + fail("unexpected exception - " + e); + })); + } + + } catch (InterruptedException e) { + logger.error("thread was interrupted", e); + Thread.currentThread().interrupt(); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertEquals(numberOfIterations, authCounter.get()); + } + public void testCacheConcurrency() throws Exception { final String username = "username"; final SecureString password = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; @@ -675,27 +745,4 @@ protected void doLookupUser(String username, ActionListener listener) { listener.onResponse(new User(username, new String[]{"lookupRole1", "lookupRole2"})); } } - - static class LookupNotSupportedRealm extends CachingUsernamePasswordRealm { - - public final AtomicInteger authInvocationCounter = new AtomicInteger(0); - public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0); - - LookupNotSupportedRealm(Settings globalSettings, ThreadPool threadPool) { - super(new RealmConfig(new RealmConfig.RealmIdentifier("caching", "lookup-notsupported-test"), globalSettings, - TestEnvironment.newEnvironment(globalSettings), threadPool.getThreadContext()), threadPool); - } - - @Override - protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { - authInvocationCounter.incrementAndGet(); - listener.onResponse(AuthenticationResult.success(new User(token.principal(), new String[]{"testRole1", "testRole2"}))); - } - - @Override - protected void doLookupUser(String username, ActionListener listener) { - lookupInvocationCounter.incrementAndGet(); - listener.onFailure(new UnsupportedOperationException("don't call lookup if lookup isn't supported!!!")); - } - } } From afd1ecda49474ce793d586a824b73e67b38e93d3 Mon Sep 17 00:00:00 2001 From: jaymode Date: Fri, 21 Dec 2018 14:20:32 -0700 Subject: [PATCH 2/2] fix test --- .../CachingUsernamePasswordRealmTests.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java index c2c0a2c5fb5a5..57cb464a654d3 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java @@ -453,11 +453,8 @@ public void testUnauthenticatedResultPropagatesWithSameCreds() throws Exception RealmConfig config = new RealmConfig(new RealmConfig.RealmIdentifier("caching", "test_realm"), globalSettings, TestEnvironment.newEnvironment(globalSettings), new ThreadContext(Settings.EMPTY)); - - final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3); - final int numberOfIterations = scaledRandomIntBetween(20, 100); final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); List threads = new ArrayList<>(numberOfThreads); final SecureString credsToUse = new SecureString(randomAlphaOfLength(12).toCharArray()); @@ -483,19 +480,17 @@ protected void doLookupUser(String username, ActionListener listener) { try { latch.countDown(); latch.await(); - for (int i1 = 0; i1 < numberOfIterations; i1++) { - final UsernamePasswordToken token = new UsernamePasswordToken(username, credsToUse); - - realm.authenticate(token, ActionListener.wrap((result) -> { - if (result.isAuthenticated()) { - throw new IllegalStateException("invalid password led to an authenticated result: " + result); - } - assertThat(result.getMessage(), containsString("password verification failed")); - }, (e) -> { - logger.error("caught exception", e); - fail("unexpected exception - " + e); - })); - } + final UsernamePasswordToken token = new UsernamePasswordToken(username, credsToUse); + + realm.authenticate(token, ActionListener.wrap((result) -> { + if (result.isAuthenticated()) { + throw new IllegalStateException("invalid password led to an authenticated result: " + result); + } + assertThat(result.getMessage(), containsString("password verification failed")); + }, (e) -> { + logger.error("caught exception", e); + fail("unexpected exception - " + e); + })); } catch (InterruptedException e) { logger.error("thread was interrupted", e); @@ -511,7 +506,7 @@ protected void doLookupUser(String username, ActionListener listener) { for (Thread thread : threads) { thread.join(); } - assertEquals(numberOfIterations, authCounter.get()); + assertEquals(1, authCounter.get()); } public void testCacheConcurrency() throws Exception {