Skip to content

Commit 7ad23db

Browse files
committed
Base proposal to fix #1146
1 parent 6317464 commit 7ad23db

File tree

1 file changed

+60
-23
lines changed

1 file changed

+60
-23
lines changed

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.function.BiConsumer;
3838
import java.util.function.Consumer;
3939
import java.util.function.Function;
40+
import reactor.core.scheduler.Scheduler;
41+
import reactor.core.scheduler.Schedulers;
4042
import org.cloudfoundry.Nullable;
4143
import org.cloudfoundry.reactor.ConnectionContext;
4244
import org.cloudfoundry.reactor.TokenProvider;
@@ -83,6 +85,12 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider {
8385
private final ConcurrentMap<ConnectionContext, Mono<String>> refreshTokens =
8486
new ConcurrentHashMap<>(1);
8587

88+
private final ConcurrentMap<ConnectionContext, Scheduler> tokenSchedulers =
89+
new ConcurrentHashMap<>(1);
90+
91+
private final ConcurrentMap<ConnectionContext, Mono<String>> activeTokenRequests =
92+
new ConcurrentHashMap<>(1);
93+
8694
/**
8795
* The client id. Defaults to {@code cf}.
8896
*/
@@ -297,30 +305,59 @@ private void setAuthorization(HttpHeaders headers) {
297305
headers.set(AUTHORIZATION, String.format("Basic %s", encoded));
298306
}
299307

300-
private Mono<String> token(ConnectionContext connectionContext) {
301-
Mono<String> cached =
302-
this.refreshTokens
303-
.getOrDefault(connectionContext, Mono.empty())
304-
.flatMap(
305-
refreshToken ->
306-
refreshToken(connectionContext, refreshToken)
307-
.doOnSubscribe(
308-
s ->
309-
LOGGER.debug(
310-
"Negotiating using refresh"
311-
+ " token")))
312-
.switchIfEmpty(
313-
primaryToken(connectionContext)
314-
.doOnSubscribe(
315-
s ->
316-
LOGGER.debug(
317-
"Negotiating using token"
318-
+ " provider")));
308+
private Mono<String> token(final ConnectionContext connectionContext) {
309+
// Get or create a single-threaded scheduler for this connection context
310+
final Scheduler tokenScheduler = this.tokenSchedulers.computeIfAbsent(
311+
connectionContext,
312+
ctx -> Schedulers.newSingle("token-" + ctx.hashCode())
313+
);
314+
315+
return Mono.defer(() -> {
316+
// Check if there's already an active token request
317+
final Mono<String> existingRequest = this.activeTokenRequests.get(connectionContext);
318+
if (existingRequest != null) {
319+
LOGGER.debug("Reusing existing token request for connection context");
320+
return existingRequest;
321+
}
322+
323+
// Create new token request with proper synchronization and cache duration
324+
final Mono<String> baseTokenRequest = createTokenRequest(connectionContext)
325+
.publishOn(tokenScheduler) // Ensure execution on single thread
326+
.doOnSubscribe(s -> LOGGER.debug("Starting new token request"))
327+
.doOnSuccess(token -> LOGGER.debug("Token request completed successfully"))
328+
.doOnError(error -> LOGGER.debug("Token request failed", error))
329+
.doFinally(signal -> {
330+
// Clear the active request when done (success or error)
331+
this.activeTokenRequests.remove(connectionContext);
332+
LOGGER.debug("Cleared active token request for connection context");
333+
});
334+
335+
// Apply cache duration from connection context
336+
final Mono<String> newTokenRequest = connectionContext
337+
.getCacheDuration()
338+
.map(baseTokenRequest::cache)
339+
.orElseGet(baseTokenRequest::cache);
340+
341+
// Store the active request atomically
342+
final Mono<String> actualRequest = this.activeTokenRequests.putIfAbsent(connectionContext, newTokenRequest);
343+
if (actualRequest != null) {
344+
// Another thread beat us to it, use their request
345+
LOGGER.debug("Another thread created token request first, using theirs");
346+
return actualRequest;
347+
}
348+
349+
// We successfully stored our request, use it
350+
return newTokenRequest;
351+
});
352+
}
319353

320-
return connectionContext
321-
.getCacheDuration()
322-
.map(cached::cache)
323-
.orElseGet(cached::cache)
354+
private Mono<String> createTokenRequest(final ConnectionContext connectionContext) {
355+
return this.refreshTokens
356+
.getOrDefault(connectionContext, Mono.empty())
357+
.flatMap(refreshToken -> refreshToken(connectionContext, refreshToken)
358+
.doOnSubscribe(s -> LOGGER.debug("Negotiating using refresh token")))
359+
.switchIfEmpty(primaryToken(connectionContext)
360+
.doOnSubscribe(s -> LOGGER.debug("Negotiating using token provider")))
324361
.checkpoint();
325362
}
326363

0 commit comments

Comments
 (0)