55 */
66package org .elasticsearch .xpack .security .authc .support ;
77
8- import org .apache .lucene .util .SetOnce ;
98import org .elasticsearch .action .ActionListener ;
109import org .elasticsearch .common .cache .Cache ;
1110import org .elasticsearch .common .cache .CacheBuilder ;
12- import org .elasticsearch .common .collect .Tuple ;
1311import org .elasticsearch .common .settings .SecureString ;
1412import org .elasticsearch .common .unit .TimeValue ;
1513import org .elasticsearch .common .util .concurrent .ListenableFuture ;
3028
3129public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm {
3230
33- private final Cache <String , ListenableFuture <Tuple < AuthenticationResult , UserWithHash > >> cache ;
31+ private final Cache <String , ListenableFuture <UserWithHash >> cache ;
3432 private final ThreadPool threadPool ;
3533 final Hasher cacheHasher ;
3634
3735 protected CachingUsernamePasswordRealm (String type , RealmConfig config , ThreadPool threadPool ) {
3836 super (type , config );
3937 cacheHasher = Hasher .resolve (CachingUsernamePasswordRealmSettings .CACHE_HASH_ALGO_SETTING .get (config .settings ()));
4038 this .threadPool = threadPool ;
41- TimeValue ttl = CachingUsernamePasswordRealmSettings .CACHE_TTL_SETTING .get (config .settings ());
39+ final TimeValue ttl = CachingUsernamePasswordRealmSettings .CACHE_TTL_SETTING .get (config .settings ());
4240 if (ttl .getNanos () > 0 ) {
43- cache = CacheBuilder .<String , ListenableFuture <Tuple < AuthenticationResult , UserWithHash > >>builder ()
41+ cache = CacheBuilder .<String , ListenableFuture <UserWithHash >>builder ()
4442 .setExpireAfterWrite (ttl )
4543 .setMaximumWeight (CachingUsernamePasswordRealmSettings .CACHE_MAX_USERS_SETTING .get (config .settings ()))
4644 .build ();
@@ -49,13 +47,15 @@ protected CachingUsernamePasswordRealm(String type, RealmConfig config, ThreadPo
4947 }
5048 }
5149
50+ @ Override
5251 public final void expire (String username ) {
5352 if (cache != null ) {
5453 logger .trace ("invalidating cache for user [{}] in realm [{}]" , username , name ());
5554 cache .invalidate (username );
5655 }
5756 }
5857
58+ @ Override
5959 public final void expireAll () {
6060 if (cache != null ) {
6161 logger .trace ("invalidating cache for all users in realm [{}]" , name ());
@@ -72,108 +72,84 @@ public final void expireAll() {
7272 */
7373 @ Override
7474 public final void authenticate (AuthenticationToken authToken , ActionListener <AuthenticationResult > listener ) {
75- UsernamePasswordToken token = (UsernamePasswordToken ) authToken ;
75+ final UsernamePasswordToken token = (UsernamePasswordToken ) authToken ;
7676 try {
7777 if (cache == null ) {
7878 doAuthenticate (token , listener );
7979 } else {
8080 authenticateWithCache (token , listener );
8181 }
82- } catch (Exception e ) {
82+ } catch (final Exception e ) {
8383 // each realm should handle exceptions, if we get one here it should be considered fatal
8484 listener .onFailure (e );
8585 }
8686 }
8787
88+ /**
89+ * This validates the {@code token} while making sure there is only one inflight
90+ * request to the authentication source. Only successful responses are cached
91+ * and any subsequent requests, bearing the <b>same</b> password, will succeed
92+ * without reaching to the authentication source. A different password in a
93+ * subsequent request, however, will clear the cache and <b>try</b> to reach to
94+ * the authentication source.
95+ *
96+ * @param token The authentication token
97+ * @param listener to be called at completion
98+ */
8899 private void authenticateWithCache (UsernamePasswordToken token , ActionListener <AuthenticationResult > listener ) {
89100 try {
90- final SetOnce <User > authenticatedUser = new SetOnce <>();
91- final AtomicBoolean createdAndStartedFuture = new AtomicBoolean (false );
92- final ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> future = cache .computeIfAbsent (token .principal (), k -> {
93- final ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> created = new ListenableFuture <>();
94- if (createdAndStartedFuture .compareAndSet (false , true ) == false ) {
95- throw new IllegalStateException ("something else already started this. how?" );
96- }
97- return created ;
101+ final AtomicBoolean authenticationInCache = new AtomicBoolean (true );
102+ final ListenableFuture <UserWithHash > listenableCacheEntry = cache .computeIfAbsent (token .principal (), k -> {
103+ authenticationInCache .set (false );
104+ return new ListenableFuture <>();
98105 });
99-
100- if (createdAndStartedFuture .get ()) {
101- doAuthenticate (token , ActionListener .wrap (result -> {
102- if (result .isAuthenticated ()) {
103- final User user = result .getUser ();
104- authenticatedUser .set (user );
105- final UserWithHash userWithHash = new UserWithHash (user , token .credentials (), cacheHasher );
106- future .onResponse (new Tuple <>(result , userWithHash ));
107- } else {
108- future .onResponse (new Tuple <>(result , null ));
109- }
110- }, future ::onFailure ));
111- }
112-
113- future .addListener (ActionListener .wrap (tuple -> {
114- if (tuple != null ) {
115- final UserWithHash userWithHash = tuple .v2 ();
116- final boolean performedAuthentication = createdAndStartedFuture .get () && userWithHash != null &&
117- tuple .v2 ().user == authenticatedUser .get ();
118- handleResult (future , createdAndStartedFuture .get (), performedAuthentication , token , tuple , listener );
119- } else {
120- handleFailure (future , createdAndStartedFuture .get (), token , new IllegalStateException ("unknown error authenticating" ),
121- listener );
122- }
123- }, e -> handleFailure (future , createdAndStartedFuture .get (), token , e , listener )),
124- threadPool .executor (ThreadPool .Names .GENERIC ));
125- } catch (ExecutionException e ) {
126- listener .onResponse (AuthenticationResult .unsuccessful ("" , e ));
127- }
128- }
129-
130- private void handleResult (ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> future , boolean createdAndStartedFuture ,
131- boolean performedAuthentication , UsernamePasswordToken token ,
132- Tuple <AuthenticationResult , UserWithHash > result , ActionListener <AuthenticationResult > listener ) {
133- final AuthenticationResult authResult = result .v1 ();
134- if (authResult == null ) {
135- // this was from a lookup; clear and redo
136- cache .invalidate (token .principal (), future );
137- authenticateWithCache (token , listener );
138- } else if (authResult .isAuthenticated ()) {
139- if (performedAuthentication ) {
140- listener .onResponse (authResult );
141- } else {
142- UserWithHash userWithHash = result .v2 ();
143- if (userWithHash .verify (token .credentials ())) {
144- if (userWithHash .user .enabled ()) {
145- User user = userWithHash .user ;
146- logger .debug ("realm [{}] authenticated user [{}], with roles [{}]" ,
147- name (), token .principal (), user .roles ());
106+ if (authenticationInCache .get ()) {
107+ // there is a cached or an inflight authenticate request
108+ listenableCacheEntry .addListener (ActionListener .wrap (authenticatedUserWithHash -> {
109+ if (authenticatedUserWithHash != null && authenticatedUserWithHash .verify (token .credentials ())) {
110+ // cached credential hash matches the credential hash for this forestalled request
111+ final User user = authenticatedUserWithHash .user ;
112+ logger .debug ("realm [{}] authenticated user [{}], with roles [{}], from cache" , name (), token .principal (),
113+ user .roles ());
148114 listener .onResponse (AuthenticationResult .success (user ));
149115 } else {
150- // re-auth to see if user has been enabled
151- cache .invalidate (token .principal (), future );
116+ // The inflight request has failed or its credential hash does not match the
117+ // hash of the credential for this forestalled request.
118+ // clear cache and try to reach the authentication source again because password
119+ // might have changed there and the local cached hash got stale
120+ cache .invalidate (token .principal (), listenableCacheEntry );
152121 authenticateWithCache (token , listener );
153122 }
154- } else {
155- // could be a password change?
156- cache .invalidate (token .principal (), future );
123+ }, e -> {
124+ // the inflight request failed, so try again, but first (always) make sure cache
125+ // is cleared of the failed authentication
126+ cache .invalidate (token .principal (), listenableCacheEntry );
157127 authenticateWithCache (token , listener );
158- }
159- }
160- } else {
161- cache .invalidate (token .principal (), future );
162- if (createdAndStartedFuture ) {
163- listener .onResponse (authResult );
128+ }), threadPool .executor (ThreadPool .Names .GENERIC ));
164129 } else {
165- authenticateWithCache (token , listener );
130+ // attempt authentication against the authentication source
131+ doAuthenticate (token , ActionListener .wrap (authResult -> {
132+ if (authResult .isAuthenticated () && authResult .getUser ().enabled ()) {
133+ // compute the credential hash of this successful authentication request
134+ final UserWithHash userWithHash = new UserWithHash (authResult .getUser (), token .credentials (), cacheHasher );
135+ // notify any forestalled request listeners; they will not reach to the
136+ // authentication request and instead will use this hash for comparison
137+ listenableCacheEntry .onResponse (userWithHash );
138+ } else {
139+ // notify any forestalled request listeners; they will retry the request
140+ listenableCacheEntry .onResponse (null );
141+ }
142+ // notify the listener of the inflight authentication request; this request is not retried
143+ listener .onResponse (authResult );
144+ }, e -> {
145+ // notify any staved off listeners; they will retry the request
146+ listenableCacheEntry .onFailure (e );
147+ // notify the listener of the inflight authentication request; this request is not retried
148+ listener .onFailure (e );
149+ }));
166150 }
167- }
168- }
169-
170- private void handleFailure (ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> future , boolean createdAndStarted ,
171- UsernamePasswordToken token , Exception e , ActionListener <AuthenticationResult > listener ) {
172- cache .invalidate (token .principal (), future );
173- if (createdAndStarted ) {
151+ } catch (final ExecutionException e ) {
174152 listener .onFailure (e );
175- } else {
176- authenticateWithCache (token , listener );
177153 }
178154 }
179155
@@ -193,38 +169,57 @@ protected int getCacheSize() {
193169
194170 @ Override
195171 public final void lookupUser (String username , ActionListener <User > listener ) {
196- if (cache != null ) {
197- try {
198- ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> future = cache .computeIfAbsent (username , key -> {
199- ListenableFuture <Tuple <AuthenticationResult , UserWithHash >> created = new ListenableFuture <>();
200- doLookupUser (username , ActionListener .wrap (user -> {
201- if (user != null ) {
202- UserWithHash userWithHash = new UserWithHash (user , null , null );
203- created .onResponse (new Tuple <>(null , userWithHash ));
204- } else {
205- created .onResponse (new Tuple <>(null , null ));
206- }
207- }, created ::onFailure ));
208- return created ;
209- });
210-
211- future .addListener (ActionListener .wrap (tuple -> {
212- if (tuple != null ) {
213- if (tuple .v2 () == null ) {
214- cache .invalidate (username , future );
215- listener .onResponse (null );
216- } else {
217- listener .onResponse (tuple .v2 ().user );
218- }
172+ try {
173+ if (cache == null ) {
174+ doLookupUser (username , listener );
175+ } else {
176+ lookupWithCache (username , listener );
177+ }
178+ } catch (final Exception e ) {
179+ // each realm should handle exceptions, if we get one here it should be
180+ // considered fatal
181+ listener .onFailure (e );
182+ }
183+ }
184+
185+ private void lookupWithCache (String username , ActionListener <User > listener ) {
186+ try {
187+ final AtomicBoolean lookupInCache = new AtomicBoolean (true );
188+ final ListenableFuture <UserWithHash > listenableCacheEntry = cache .computeIfAbsent (username , key -> {
189+ lookupInCache .set (false );
190+ return new ListenableFuture <>();
191+ });
192+ if (false == lookupInCache .get ()) {
193+ // attempt lookup against the user directory
194+ doLookupUser (username , ActionListener .wrap (user -> {
195+ if (user != null ) {
196+ // user found
197+ final UserWithHash userWithHash = new UserWithHash (user , null , null );
198+ // notify forestalled request listeners
199+ listenableCacheEntry .onResponse (userWithHash );
219200 } else {
220- listener .onResponse (null );
201+ // user not found, invalidate cache so that subsequent requests are forwarded to
202+ // the user directory
203+ cache .invalidate (username , listenableCacheEntry );
204+ // notify forestalled request listeners
205+ listenableCacheEntry .onResponse (null );
221206 }
222- }, listener ::onFailure ), threadPool .executor (ThreadPool .Names .GENERIC ));
223- } catch (ExecutionException e ) {
224- listener .onFailure (e );
207+ }, e -> {
208+ // the next request should be forwarded, not halted by a failed lookup attempt
209+ cache .invalidate (username , listenableCacheEntry );
210+ // notify forestalled listeners
211+ listenableCacheEntry .onFailure (e );
212+ }));
225213 }
226- } else {
227- doLookupUser (username , listener );
214+ listenableCacheEntry .addListener (ActionListener .wrap (userWithHash -> {
215+ if (userWithHash != null ) {
216+ listener .onResponse (userWithHash .user );
217+ } else {
218+ listener .onResponse (null );
219+ }
220+ }, listener ::onFailure ), threadPool .executor (ThreadPool .Names .GENERIC ));
221+ } catch (final ExecutionException e ) {
222+ listener .onFailure (e );
228223 }
229224 }
230225
0 commit comments