@@ -307,16 +307,16 @@ private Mono<String> token(final ConnectionContext connectionContext) {
307307 * and there can be concurrent callers during the subscription/execution of the Mono:
308308 * - The latter is very harmful: This may lead to concurrent execution of the logic
309309 * written in requestToken and hence to multiple requests to the UAA server using
310- * the same *value for the refresh token*! The UAA server will sequentialize them,
310+ * the same *value for the refresh token*! The UAA server will sequentialize them,
311311 * one will go through just as normal and a new refresh token gets issued.
312- * The UAA server invalidates the old refresh token. The second request then arrives
312+ * The UAA server invalidates the old refresh token. The second request then arrives
313313 * with the old refresh token and gets rejected. In an earlier version this led
314314 * to caching the second request, hence to cache an error. This caused deadlocks.
315315 * - The first is only "not nice", if the second issue is resolved: It causes that
316316 * we will request two access tokens from the UAA server shortly after the other,
317317 * but having use appropriate refresh tokens sequentially. The second request
318318 * simply is to be considered waste.
319- *
319+ *
320320 * The coding below fixes both issues: It ensures that the execution of the Mono
321321 * is synchronized and it ensures that two threads arriving to fetch the JWT in a
322322 * non-caching situation does not trigger "wasteful" requests to the UAA server.
@@ -327,54 +327,80 @@ private Mono<String> token(final ConnectionContext connectionContext) {
327327 * during creation of the Mono (where it is of little relevance), but
328328 * during the execution/subscription.
329329 */
330- return Mono .defer (() -> {
331- // Check if there's already an active token request
332- final Mono <String > existingRequest = this .activeTokenRequests .get (connectionContext );
333- if (existingRequest != null ) {
334- LOGGER .debug ("Reusing existing UAA JWT token request for connection context" );
335- return existingRequest ;
336- }
337-
338- final Mono <String > baseTokenRequest = createTokenRequest (connectionContext )
339- .doOnSubscribe (s -> LOGGER .debug ("Starting new UAA JWT token request" ))
340- .doOnSuccess (token -> LOGGER .debug ("UAA JWT token request completed successfully" ))
341- .doOnError (error -> LOGGER .debug ("UAA JWT token request failed" , error ))
342- .doFinally (signal -> {
343- // Clear the active request when done (success or error)
344- this .activeTokenRequests .remove (connectionContext );
345- });
346-
347- // Apply cache duration from connection context
348- final Mono <String > newTokenRequest = connectionContext
349- .getCacheDuration ()
350- .map (baseTokenRequest ::cache )
351- .orElseGet (baseTokenRequest ::cache )
352- /*
353- * Ensure execution on single thread.
354- * This prevents sending requests to the UAA server with expired refresh tokens.
355- */
356- .publishOn (connectionContext .getTokenScheduler ());
357-
358- // Store the active request atomically
359- final Mono <String > actualRequest = this .activeTokenRequests .putIfAbsent (connectionContext , newTokenRequest );
360- if (actualRequest != null ) {
361- // Another thread beat us to it, use their request. This prevents "wasteful" requests.
362- LOGGER .debug ("Another thread created token request first, using theirs instead" );
363- return actualRequest ;
364- }
365-
366- // We successfully stored our request, use it
367- return newTokenRequest ;
368- });
330+ return Mono .defer (
331+ () -> {
332+ // Check if there's already an active token request
333+ final Mono <String > existingRequest =
334+ this .activeTokenRequests .get (connectionContext );
335+ if (existingRequest != null ) {
336+ LOGGER .debug (
337+ "Reusing existing UAA JWT token request for connection context" );
338+ return existingRequest ;
339+ }
340+
341+ final Mono <String > baseTokenRequest =
342+ createTokenRequest (connectionContext )
343+ .doOnSubscribe (
344+ s -> LOGGER .debug ("Starting new UAA JWT token request" ))
345+ .doOnSuccess (
346+ token ->
347+ LOGGER .debug (
348+ "UAA JWT token request completed"
349+ + " successfully" ))
350+ .doOnError (
351+ error ->
352+ LOGGER .debug (
353+ "UAA JWT token request failed" , error ))
354+ .doFinally (
355+ signal -> {
356+ // Clear the active request when done (success or
357+ // error)
358+ this .activeTokenRequests .remove (connectionContext );
359+ });
360+
361+ // Apply cache duration from connection context
362+ final Mono <String > newTokenRequest =
363+ connectionContext
364+ .getCacheDuration ()
365+ .map (baseTokenRequest ::cache )
366+ .orElseGet (baseTokenRequest ::cache )
367+ /*
368+ * Ensure execution on single thread.
369+ * This prevents sending requests to the UAA server with expired refresh tokens.
370+ */
371+ .publishOn (connectionContext .getTokenScheduler ());
372+
373+ // Store the active request atomically
374+ final Mono <String > actualRequest =
375+ this .activeTokenRequests .putIfAbsent (
376+ connectionContext , newTokenRequest );
377+ if (actualRequest != null ) {
378+ // Another thread beat us to it, use their request. This prevents "wasteful"
379+ // requests.
380+ LOGGER .debug (
381+ "Another thread created token request first, using theirs instead" );
382+ return actualRequest ;
383+ }
384+
385+ // We successfully stored our request, use it
386+ return newTokenRequest ;
387+ });
369388 }
370389
371390 private Mono <String > createTokenRequest (final ConnectionContext connectionContext ) {
372391 return this .refreshTokens
373392 .getOrDefault (connectionContext , Mono .empty ())
374- .flatMap (refreshToken -> refreshToken (connectionContext , refreshToken )
375- .doOnSubscribe (s -> LOGGER .debug ("Negotiating using refresh token" )))
376- .switchIfEmpty (primaryToken (connectionContext )
377- .doOnSubscribe (s -> LOGGER .debug ("Negotiating using token provider" )))
393+ .flatMap (
394+ refreshToken ->
395+ refreshToken (connectionContext , refreshToken )
396+ .doOnSubscribe (
397+ s ->
398+ LOGGER .debug (
399+ "Negotiating using refresh token" )))
400+ .switchIfEmpty (
401+ primaryToken (connectionContext )
402+ .doOnSubscribe (
403+ s -> LOGGER .debug ("Negotiating using token provider" )))
378404 .checkpoint ();
379405 }
380406
0 commit comments