diff --git a/README.md b/README.md index ffed2f36f4..58bb8269b3 100644 --- a/README.md +++ b/README.md @@ -253,7 +253,7 @@ Beyond that, it is helpful to capture the following information: If you open a Github issue with a request for help, please include as much of the information above as possible and do not forget to sanitize any request/response data posted. ## Development -The project depends on Java 8. To build from source and install to your local Maven cache, run the following: +The project depends on Java 8 to 21. To build from source and install to your local Maven cache, run the following: ```shell $ git submodule update --init --recursive @@ -297,6 +297,7 @@ Name | Description `TEST_PROXY_PORT` | _(Optional)_ The port of a proxy to route all requests through. Defaults to `8080`. `TEST_PROXY_USERNAME` | _(Optional)_ The username for a proxy to route all requests through `TEST_SKIPSSLVALIDATION` | _(Optional)_ Whether to skip SSL validation when connecting to the Cloud Foundry instance. Defaults to `false`. +`UAA_API_REQUEST_LIMIT` | _(Optional)_ If your UAA server does rate limiting and returns 429 errors, set this variable to a value smaller than the limit. Defaults to `0` (no limit) If you do not have access to a CloudFoundry instance with admin access, you can run one locally using [bosh-deployment](https://github.com/cloudfoundry/bosh-deployment) & [cf-deployment](https://github.com/cloudfoundry/cf-deployment/) and Virtualbox. diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java index d4ab1ab9d2..69529ccb91 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java @@ -24,6 +24,7 @@ import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; import org.cloudfoundry.reactor.client.QueryBuilder; +import org.cloudfoundry.reactor.uaa.UaaThrottler.Token; import org.cloudfoundry.reactor.util.AbstractReactorOperations; import org.cloudfoundry.reactor.util.ErrorPayloadMappers; import org.cloudfoundry.reactor.util.Operator; @@ -43,62 +44,114 @@ protected AbstractUaaOperations( super(connectionContext, root, tokenProvider, requestTags); } - @Override - protected Mono createOperator() { - return super.createOperator().map(this::attachErrorPayloadMapper); + private Mono createOperator(Token token) { + return this.root + .map(super::buildOperatorContext) + .flatMap( + context -> + Mono.just( + new UaaOperator( + context, + this.connectionContext.getHttpClient(), + token, + "")) + .map(op -> op.headers(super::addHeaders)) + .map(op -> op.headersWhen(super::addHeadersWhen))) + .map(this::attachErrorPayloadMapper); } protected final Mono delete( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> delete(requestPayload, responseType, uriTransformer, token)); + } + + private Mono delete( + Object requestPayload, + Class responseType, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .delete() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .delete() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono get( Object requestPayload, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, uriTransformer, token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( Object requestPayload, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, uriTransformer, headersTransformer, token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + Consumer headersTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( @@ -106,38 +159,74 @@ protected final Mono get( Function uriTransformer, Consumer headersTransformer, Function> headersWhenTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .headersWhen(headersWhenTransformer) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .get()); + token -> + get( + requestPayload, + uriTransformer, + headersTransformer, + headersWhenTransformer, + token)); + } + + private Mono get( + Object requestPayload, + Function uriTransformer, + Consumer headersTransformer, + Function> headersWhenTransformer, + Token token) { + return createOperator(token) + .flatMap( + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .headersWhen(headersWhenTransformer) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .get(); + }); } protected final Mono get( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> get(requestPayload, responseType, uriTransformer, token)); + } + + private Mono get( + Object requestPayload, + Class responseType, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .parseBody(responseType); + }); } protected final Mono get( @@ -145,38 +234,74 @@ protected final Mono get( Class responseType, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap( + token -> + get( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + token)); + } + + private Mono get( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .get() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .get() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .response() + .parseBody(responseType); + }); } protected final Mono patch( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> patch(requestPayload, responseType, uriTransformer, token)); + } + + private Mono patch( + Object requestPayload, + Class responseType, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .patch() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .patch() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( @@ -184,22 +309,44 @@ protected final Mono post( Class responseType, Function uriTransformer, Consumer headersTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap( + token -> + post( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( @@ -208,57 +355,109 @@ protected final Mono post( Function uriTransformer, Consumer headersTransformer, Function> headersWhenTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) .flatMap( - operator -> - operator.headers( - headers -> - addHeaders( - headers, - requestPayload, - headersTransformer)) - .headersWhen(headersWhenTransformer) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + token -> + post( + requestPayload, + responseType, + uriTransformer, + headersTransformer, + headersWhenTransformer, + token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + Consumer headersTransformer, + Function> headersWhenTransformer, + Token token) { + return createOperator(token) + .flatMap( + operator -> { + return operator.headers( + headers -> + addHeaders( + headers, + requestPayload, + headersTransformer)) + .headersWhen(headersWhenTransformer) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono post( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> post(requestPayload, responseType, uriTransformer, token)); + } + + private Mono post( + Object requestPayload, + Class responseType, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .post() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .post() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } protected final Mono put( Object requestPayload, Class responseType, Function uriTransformer) { - return createOperator() + return UaaThrottler.getInstance() + .acquire( + queryTransformer(requestPayload) + .andThen(uriTransformer) + .apply(UriComponentsBuilder.fromPath("")) + .build() + .toUriString()) + .flatMap(token -> put(requestPayload, responseType, uriTransformer, token)); + } + + private Mono put( + Object requestPayload, + Class responseType, + Function uriTransformer, + Token token) { + return createOperator(token) .flatMap( - operator -> - operator.headers(headers -> addHeaders(headers, requestPayload)) - .put() - .uri( - queryTransformer(requestPayload) - .andThen(uriTransformer)) - .send(requestPayload) - .response() - .parseBody(responseType)); + operator -> { + return operator.headers(headers -> addHeaders(headers, requestPayload)) + .put() + .uri(queryTransformer(requestPayload).andThen(uriTransformer)) + .send(requestPayload) + .response() + .parseBody(responseType); + }); } private static void addHeaders( @@ -274,9 +473,14 @@ private static void addHeaders(HttpHeaders httpHeaders, Object requestPayload) { VersionBuilder.augment(httpHeaders, requestPayload); } - private Operator attachErrorPayloadMapper(Operator operator) { - return operator.withErrorPayloadMapper( - ErrorPayloadMappers.uaa(this.connectionContext.getObjectMapper())); + private UaaOperator attachErrorPayloadMapper(Operator operator) { + if (operator instanceof UaaOperator) { + UaaOperator op = (UaaOperator) operator; + return op.withErrorPayloadMapper( + ErrorPayloadMappers.uaa(this.connectionContext.getObjectMapper())); + } else { + throw new RuntimeException("Wrong class of operator " + operator.getClass().toString()); + } } private Function queryTransformer( diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java new file mode 100644 index 0000000000..c350367d61 --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/ReactorRatelimit.java @@ -0,0 +1,53 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.uaa; + +import java.util.Map; +import org.cloudfoundry.reactor.ConnectionContext; +import org.cloudfoundry.reactor.TokenProvider; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; +import org.cloudfoundry.uaa.ratelimit.RatelimitRequest; +import org.cloudfoundry.uaa.ratelimit.RatelimitResponse; +import reactor.core.publisher.Mono; + +public final class ReactorRatelimit extends AbstractUaaOperations implements Ratelimit { + + /** + * Creates an instance + * + * @param connectionContext the {@link ConnectionContext} to use when communicating with the server + * @param root the root URI of the server. Typically something like {@code https://uaa.run.pivotal.io}. + * @param tokenProvider the {@link TokenProvider} to use when communicating with the server + * @param requestTags map with custom http headers which will be added to web request + */ + public ReactorRatelimit( + ConnectionContext connectionContext, + Mono root, + TokenProvider tokenProvider, + Map requestTags) { + super(connectionContext, root, tokenProvider, requestTags); + } + + @Override + public Mono getRatelimit(RatelimitRequest request) { + return get( + request, + RatelimitResponse.class, + builder -> builder.pathSegment("RateLimitingStatus")) + .checkpoint(); + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java new file mode 100644 index 0000000000..3656f6e0a9 --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaOperator.java @@ -0,0 +1,84 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.uaa; + +import io.netty.handler.codec.http.HttpHeaders; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; +import org.cloudfoundry.reactor.uaa.UaaThrottler.Token; +import org.cloudfoundry.reactor.util.ErrorPayloadMapper; +import org.cloudfoundry.reactor.util.Operator; +import org.cloudfoundry.reactor.util.OperatorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +public class UaaOperator extends Operator { + + private Token token = null; + private static final Logger LOGGER = LoggerFactory.getLogger("cloudfoundry-client.test"); + + public UaaOperator(OperatorContext context, HttpClient httpClient, Token value, String caller) { + super(context, httpClient); + token = Objects.requireNonNull(value, "value must not be null"); + if (token != UaaThrottler.NON_UAA_TOKEN) { + LOGGER.trace("UaaOperator creating instance for " + value.id() + " caller " + caller); + } + } + + @Override + public UaaOperator followRedirects() { + return new UaaOperator( + this.context, super.getHttpClient().followRedirect(true), this.token, "follow"); + } + + @Override + public UaaOperator headers(Consumer headersTransformer) { + return new UaaOperator( + this.context, + super.getHttpClient().headers(headersTransformer), + this.token, + "headers"); + } + + @Override + public UaaOperator headersWhen( + Function> headersWhenTransformer) { + return new UaaOperator( + this.context, + super.getHttpClient().headersWhen(headersWhenTransformer), + this.token, + "headersWhen"); + } + + @Override + public UaaOperator withErrorPayloadMapper(ErrorPayloadMapper errorPayloadMapper) { + return new UaaOperator( + this.context.withErrorPayloadMapper(errorPayloadMapper), + super.getHttpClient(), + this.token, + "errorPayload"); + } + + @Override + protected HttpClient attachRequestLogger(HttpClient httpClient) { + return super.attachRequestLogger(httpClient) + .doAfterRequest((response, connection) -> token.activate()); + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java new file mode 100644 index 0000000000..73e7049d37 --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/UaaThrottler.java @@ -0,0 +1,280 @@ +package org.cloudfoundry.reactor.uaa; + +import java.io.PrintStream; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.cloudfoundry.uaa.ratelimit.LimiterMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; + +/** + * Throttle calls to uaa in order to avoid running into a rate limit. + * If your UAA server is configured with a rate limit, the number of request within a given time + * must be set here in order to slow down the client and avoid http 429-responses. + * + * @author D034003 + * + */ +public class UaaThrottler { + public static Token NON_UAA_TOKEN; + private static UaaThrottler instance = null; + + // List of all rate limits. Each url may have a different value. For each limit, the list of + // calls that are not outdated is kept. + private List mappings; + + // All requests that must not run now because they would violate a limit. + private Queue> waitingQueue; + + // Time base of the limits in seconds. + private int minDelay; + + private static final Logger LOGGER = LoggerFactory.getLogger("cloudfoundry-client.test"); + + public static UaaThrottler getInstance() { + if (instance == null) { + instance = new UaaThrottler(); + } + return instance; + } + + private UaaThrottler() { + NON_UAA_TOKEN = Token.empty(); + mappings = new ArrayList<>(); + waitingQueue = new ConcurrentLinkedQueue<>(); + minDelay = Integer.MAX_VALUE; + } + + /** + * Add one entry for every mapping configured on the server. + * @param mapping + */ + public void addLimiterMapping(LimiterMapping mapping) { + mappings.add(new LimiterMappingWithRunningRequests(mapping)); + int delay = mapping.timeBase(); + if (minDelay > delay) { + minDelay = delay; + } + } + + public Mono acquire(String url) { + if ((mappings.size() > 0)) { + LOGGER.trace( + "UaaThrottler: about to acquire one token " + + this.logString() + + " for url " + + url); + return Mono.defer( + () -> { + Sinks.One waiter = Sinks.one(); + waitingQueue.add(waiter); // first add, so no signal get's lost. + return tryAcquire(url, waiter); + }) + .subscribeOn(Schedulers.parallel()); + } else { + return Mono.just(NON_UAA_TOKEN); + } + } + + private synchronized Mono tryAcquire(String url, Sinks.One waiter) { + if (checkDelayNeededAndResume(url)) { + // LOGGER.debug("UaaThrottler: Delay needed for "+url+", "+ this.logString()); + // too much noise + return doDelay(url, waiter); + } else { + waitingQueue.remove(waiter); // no need to wait, so delete the waiter. + // LOGGER.debug("UaaThrottler: no Delay needed for "+ url); too much noise + Token token = new Token(url); + for (LimiterMappingWithRunningRequests mapping : mappings) { + if (mapping.limiter.matches(url)) { + mapping.runningRequests.add(token); + } + } + return Mono.just(token); + } + } + + /** + * check if all windows in all mappers are below limit and return that value. + * As a side effect, clean up windows and trigger resume of waiting requests. + * + * @param url + * @return + */ + synchronized boolean checkDelayNeededAndResume(String url) { + Instant now = Instant.now(); + Set removed = new HashSet<>(); + try { + for (LimiterMappingWithRunningRequests mapping : mappings) { + if (mapping.limiter.matches(url)) { + removed.addAll(clearOutdatedTokens(mapping, now)); + if (mapping.runningRequests.size() >= mapping.limiter.limit()) { + return true; + } + } + } + } finally { + checkIfResumeIsPossible(removed); + } + return false; + } + + /** + * explicit method to allow unit testing. Do not use. + */ + Mono doDelay(String url, Sinks.One waiter) { + return Mono.delay(Duration.ofMillis(minDelay)) + .then(Mono.defer(() -> tryAcquire(url, waiter))); + } + + private Set clearOutdatedTokens(LimiterMappingWithRunningRequests mapping, Instant now) { + int window = mapping.limiter.timeBase(); + Instant windowStart = now.minus(Duration.ofSeconds(window)); + Iterator it = mapping.runningRequests.iterator(); + Set removed = new HashSet<>(); + while (it.hasNext()) { + Token token = it.next(); + if (token.startTime != null && token.startTime.isBefore(windowStart)) { + removed.add(token); + it.remove(); + } else { + // not started or still counting, do nothing + } + } + return removed; + } + + // if a token that is outdated in one mapping is not listed in any of the other mapping-windows, + // we can resume. + private void checkIfResumeIsPossible(Set removed) { + for (Token oneToken : removed) { + boolean isUsed = false; + for (LimiterMappingWithRunningRequests mapping : mappings) { + if (mapping.runningRequests.contains(oneToken)) { + isUsed = true; + break; + } + } + if (!isUsed) { + resume(oneToken.id); + } + } + } + + private void resume(String url) { + Sinks.One next = waitingQueue.poll(); + LOGGER.trace("UaaThrottler: Releasing because " + url + " is out of window."); + if (next != null) { + LOGGER.trace("UaaThrottler: waking up " + next.name()); + next.tryEmitEmpty(); // wake up one waiting caller + } else { + LOGGER.trace("UaaThrottler: waiting queue is empty"); + } + } + + private String logString() { + String mappingValues = ""; + for (LimiterMappingWithRunningRequests mapping : mappings) { + mappingValues += + "mapping limiter " + + mapping.limiter.name() + + " limit " + + mapping.limiter.limit() + + "/" + + mapping.limiter.timeBase() + + "s for path " + + mapping.limiter.pathSelectors() + + "; "; + } + return "QueueLenght " + + waitingQueue.size() + + " number of mappings " + + mappings.size() + + mappingValues; + } + + /** + * This method should only be used to get a defined state in test coding. + */ + static void reset() { + instance = null; + } + + /** only for unit testing. + * + * @param spy + */ + static void setInstance(UaaThrottler spy) { + instance = spy; + } + + /** + * Only for unittests. + * @param out + * @return + */ + boolean verifyAllQueuesEmpty(PrintStream out) { + boolean result = true; + if (waitingQueue.size() > 0) { + result = false; + waitingQueue.forEach( + sink -> { + out.println("Sinks entry is left over"); + }); + } + for (LimiterMappingWithRunningRequests mapping : mappings) { + if (mapping.runningRequests.size() > 0) { + out.println( + "Mapping for " + + mapping.limiter.name() + + " is not 0 as expected, but " + + mapping.runningRequests.size()); + result = false; + } + } + return result; + } + + public static class Token { + private final String id; + private Instant startTime; + + private Token(String url) { + this.id = url; + } + + private static Token empty() { + return new Token(null); + } + + public void activate() { + if (this != NON_UAA_TOKEN) { + startTime = Instant.now(); + } + } + + public String id() { + return id; + } + } + + private static class LimiterMappingWithRunningRequests { + private LimiterMapping limiter; + private Set runningRequests; + + private LimiterMappingWithRunningRequests(LimiterMapping mapping) { + limiter = mapping; + runningRequests = new HashSet<>(); + } + } +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java index 1b2d613c4e..a1b28ca848 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_ReactorUaaClient.java @@ -33,6 +33,7 @@ import org.cloudfoundry.uaa.groups.Groups; import org.cloudfoundry.uaa.identityproviders.IdentityProviders; import org.cloudfoundry.uaa.identityzones.IdentityZones; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.serverinformation.ServerInformation; import org.cloudfoundry.uaa.tokens.Tokens; import org.cloudfoundry.uaa.users.Users; @@ -104,6 +105,12 @@ public Users users() { return new ReactorUsers(getConnectionContext(), getRoot(), getTokenProvider(), getRequestTags()); } + @Override + @Value.Derived + public Ratelimit rateLimit() { + return new ReactorRatelimit(getConnectionContext(), getRoot(), getTokenProvider(), getRequestTags()); + } + /** * The connection context */ diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java new file mode 100644 index 0000000000..054620e752 --- /dev/null +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/_UaaRatelimit.java @@ -0,0 +1,17 @@ +package org.cloudfoundry.reactor.uaa; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import org.cloudfoundry.Nullable; +import org.immutables.value.Value; + +@JsonDeserialize +@Value.Immutable +abstract class _UaaRatelimit { + + @JsonProperty("limiterMappings") + @Nullable + public abstract Integer getRatelimit(); + + +} diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java index ec97861f60..f80486807c 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java @@ -58,19 +58,19 @@ protected Mono createOperator() { .map(operator -> operator.headersWhen(this::addHeadersWhen)); } - private void addHeaders(HttpHeaders httpHeaders) { + public void addHeaders(HttpHeaders httpHeaders) { UserAgent.setUserAgent(httpHeaders); JsonCodec.setDecodeHeaders(httpHeaders); this.requestTags.forEach(httpHeaders::set); } - private Mono addHeadersWhen(HttpHeaders httpHeaders) { + public Mono addHeadersWhen(HttpHeaders httpHeaders) { return this.tokenProvider .getToken(this.connectionContext) .map(token -> httpHeaders.set(AUTHORIZATION, token)); } - private OperatorContext buildOperatorContext(String root) { + public OperatorContext buildOperatorContext(String root) { return OperatorContext.builder() .connectionContext(this.connectionContext) .root(root) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java index bc06912326..ba7ae43052 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java @@ -61,6 +61,10 @@ public UriConfiguration delete() { return request(HttpMethod.DELETE); } + protected HttpClient getHttpClient() { + return this.httpClient; + } + public Operator followRedirects() { return new Operator(this.context, this.httpClient.followRedirect(true)); } @@ -104,7 +108,7 @@ public Operator withErrorPayloadMapper(ErrorPayloadMapper errorPayloadMapper) { this.context.withErrorPayloadMapper(errorPayloadMapper), this.httpClient); } - private static HttpClient attachRequestLogger(HttpClient httpClient) { + protected HttpClient attachRequestLogger(HttpClient httpClient) { RequestLogger requestLogger = new RequestLogger(); return httpClient .doAfterRequest((request, connection) -> requestLogger.request(request)) @@ -129,18 +133,30 @@ public ResponseReceiverConstructor send(Object payload) { return send(serialized(payload)); } + private BiFunction> + wrapRequestTransformer( + BiFunction> original) { + return (req, out) -> Mono.from(original.apply(req, out)); + } + public ResponseReceiverConstructor send( BiFunction> requestTransformer) { - HttpClient.ResponseReceiver responseReceiver = - this.requestSender.send(requestTransformer); - return new ResponseReceiverConstructor(this.context, responseReceiver); + return new ResponseReceiverConstructor( + this.context, + this.requestSender.send(wrapRequestTransformer(requestTransformer))); + } + + BiConsumer wrapFormTransformer( + BiConsumer original) { + + return (req, form) -> original.accept(req, form); } public ResponseReceiverConstructor sendForm( BiConsumer requestTransformer) { - HttpClient.ResponseReceiver responseReceiver = - this.requestSender.sendForm(requestTransformer); - return new ResponseReceiverConstructor(this.context, responseReceiver); + return new ResponseReceiverConstructor( + this.context, + this.requestSender.sendForm(wrapFormTransformer(requestTransformer))); } private BiFunction> serialized( diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java index d0f3cc5ae3..0acf846658 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/RequestLogger.java @@ -34,7 +34,7 @@ public class RequestLogger { private long requestSentTime; public void request(HttpClientRequest request) { - request(String.format("%-6s {}", request.method()), request.uri()); + request(String.format("%-6s {}", request.method()), request.resourceUrl()); } public void response(HttpClientResponse response) { @@ -50,19 +50,19 @@ public void response(HttpClientResponse response) { RESPONSE_LOGGER.debug( "{} {} ({}, {})", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, response.responseHeaders().get("X-Vcap-Request-Id")); } else { RESPONSE_LOGGER.debug( - "{} {} ({})", response.status().code(), response.uri(), elapsed); + "{} {} ({})", response.status().code(), response.resourceUrl(), elapsed); } } else { if (RESPONSE_LOGGER.isTraceEnabled()) { RESPONSE_LOGGER.warn( "{} {} ({}, {}) [{}]", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, response.responseHeaders().get("X-Vcap-Request-Id"), String.join(", ", warnings)); @@ -70,7 +70,7 @@ public void response(HttpClientResponse response) { RESPONSE_LOGGER.warn( "{} {} ({}) [{}]", response.status().code(), - response.uri(), + response.resourceUrl(), elapsed, String.join(", ", warnings)); } diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java index f3f3ad3e1a..df842fa716 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/AbstractRestTest.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.deser.DeserializationProblemHandler; +import io.netty.handler.codec.http.HttpMethod; import java.io.IOException; +import java.time.Duration; import java.util.*; import mockwebserver3.Dispatcher; import mockwebserver3.MockResponse; @@ -53,7 +55,7 @@ public abstract class AbstractRestTest { protected final Mono root; - final MockWebServer mockWebServer; + protected final MockWebServer mockWebServer; private MultipleRequestDispatcher multipleRequestDispatcher = new MultipleRequestDispatcher(); @@ -78,7 +80,12 @@ protected final void mockRequest(InteractionContext interactionContext) { this.multipleRequestDispatcher.add(interactionContext); } - private static final class FailingDeserializationProblemHandler + protected final void mockRequestParallel( + InteractionContext interactionContext, Duration delay) { + this.multipleRequestDispatcher.addParallel(interactionContext, delay); + } + + public static final class FailingDeserializationProblemHandler extends DeserializationProblemHandler { @Override @@ -98,35 +105,77 @@ public boolean handleUnknownProperty( private static final class MultipleRequestDispatcher extends Dispatcher { - private Queue responses = new LinkedList<>(); + private Queue responsesOrdered = new LinkedList<>(); + private Map responsesParallel = new HashMap<>(); + private Map responsesParallelDelays = new HashMap<>(); private List verifications = new ArrayList<>(); @Override public MockResponse dispatch(RecordedRequest request) { - InteractionContext interactionContext = this.responses.poll(); + InteractionContext interactionContext = this.responsesOrdered.peek(); if (interactionContext == null) { - throw new IllegalStateException( - String.format( - "Unexpected request for %s %s received", - request.getMethod(), request.getPath())); + return dispatchParallel(request); } - interactionContext.setDone(true); - try { interactionContext.getRequest().assertEquals(request); + interactionContext.setDone(true); + this.responsesOrdered.poll(); return interactionContext.getResponse().getMockResponse(); } catch (AssertionError e) { - e.printStackTrace(); - return new MockResponse().setResponseCode(400); + return dispatchParallel(request); + } + } + + private MockResponse dispatchParallel(RecordedRequest request) { + TestRequest testRequest = + TestRequest.builder() + .method(HttpMethod.valueOf(request.getMethod())) + .path(request.getPath()) + .build(); + InteractionContext interactionContext = this.responsesParallel.get(testRequest); + if (interactionContext != null) { + interactionContext.setDone(true); + Duration delay = this.responsesParallelDelays.get(testRequest); + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + throw new IllegalStateException( + String.format( + "Unexpectedly interrupted %s %s", + request.getMethod(), request.getPath())); + } + return interactionContext.getResponse().getMockResponse(); + } else { // implement previous error behavior of responsesOrdered + try { + interactionContext = this.responsesOrdered.poll(); + if (interactionContext == null) { + throw new IllegalStateException( + String.format( + "Unexpected request for %s %s received", + request.getMethod(), request.getPath())); + } + interactionContext.getRequest().assertEquals(request); + } catch (AssertionError e) { + e.printStackTrace(); + return new MockResponse().setResponseCode(400); + } } + return new MockResponse().setResponseCode(500); } private void add(InteractionContext interactionContext) { - this.responses.add(interactionContext); + this.responsesOrdered.add(interactionContext); + this.verifications.add(interactionContext); + } + + private void addParallel(InteractionContext interactionContext, Duration delay) { + TestRequest request = interactionContext.getRequest(); + this.responsesParallel.put(request, interactionContext); this.verifications.add(interactionContext); + this.responsesParallelDelays.put(request, delay); } private void verify() { diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/UaaThrottlerTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/UaaThrottlerTest.java new file mode 100644 index 0000000000..5590970660 --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/UaaThrottlerTest.java @@ -0,0 +1,251 @@ +package org.cloudfoundry.reactor.uaa; + +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.never; + +import java.time.Duration; +import java.util.Collections; +import org.cloudfoundry.reactor.InteractionContext; +import org.cloudfoundry.reactor.TestRequest; +import org.cloudfoundry.reactor.TestResponse; +import org.cloudfoundry.reactor.uaa.groups.ReactorGroups; +import org.cloudfoundry.uaa.Metadata; +import org.cloudfoundry.uaa.groups.GetGroupRequest; +import org.cloudfoundry.uaa.groups.GetGroupResponse; +import org.cloudfoundry.uaa.groups.MemberSummary; +import org.cloudfoundry.uaa.groups.MemberType; +import org.cloudfoundry.uaa.ratelimit.LimiterMapping; +import org.cloudfoundry.uaa.ratelimit.PathSelector; +import org.cloudfoundry.uaa.ratelimit.PathSelectorModel.PathMatchType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class UaaThrottlerTest extends AbstractUaaApiTest { + private final ReactorGroups groups = + new ReactorGroups( + CONNECTION_CONTEXT, super.root, TOKEN_PROVIDER, Collections.emptyMap()); + private static final Logger LOGGER = LoggerFactory.getLogger("cloudfoundry-client.test"); + + @BeforeEach + void clean() { + UaaThrottler.reset(); + } + + @Test + void exhaustUaaThrottlerTest() throws InterruptedException { + UaaThrottler throttlerSpy = Mockito.spy(UaaThrottler.getInstance()); + UaaThrottler.setInstance(throttlerSpy); + GetGroupResponse resp1Body = createGroupResponse(); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id1", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(400)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id2", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(200)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id3", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + PathSelector tmp = + PathSelector.builder().type(PathMatchType.startsWith).path("/Groups").build(); + throttlerSpy.addLimiterMapping( + LimiterMapping.builder() + .timeBase(1) + .limit(2) + .name("test") + .pathSelector(tmp) + .build()); + + Mono resp1 = + groups.get(GetGroupRequest.builder().groupId("test-group-id1").build()) + .delaySubscription(Duration.ofMillis(0)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp1")) + .doOnNext(v -> LOGGER.trace("resp1 emits: " + v)); + Mono resp2 = + groups.get(GetGroupRequest.builder().groupId("test-group-id2").build()) + .delaySubscription(Duration.ofMillis(100)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp2")) + .doOnNext(v -> LOGGER.trace("resp2 emits: " + v)); + Mono resp3 = + groups.get(GetGroupRequest.builder().groupId("test-group-id3").build()) + .delaySubscription(Duration.ofMillis(300)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp3")) + .doOnNext(v -> LOGGER.trace("resp3 emits: " + v)); + + Flux merged = Flux.merge(resp1, resp2, resp3); + + StepVerifier.create(merged) + .expectNext(resp1Body) + .expectNext(resp1Body) + .expectNext(resp1Body) + .verifyComplete(); + org.mockito.Mockito.verify(throttlerSpy, atLeast(1)).doDelay(any(), any()); + Thread.sleep(1100); + throttlerSpy.checkDelayNeededAndResume("/Groups/test-group-id3"); + assertThat(throttlerSpy.verifyAllQueuesEmpty(System.out)).isTrue(); + } + + @Test + void slowServerDoesNotCreateTest() throws InterruptedException { + UaaThrottler throttlerSpy = Mockito.spy(UaaThrottler.getInstance()); + UaaThrottler.setInstance(throttlerSpy); + GetGroupResponse resp1Body = createGroupResponse(); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id1", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id2", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id3", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + PathSelector tmp = + PathSelector.builder().type(PathMatchType.startsWith).path("/Groups").build(); + throttlerSpy.addLimiterMapping( + LimiterMapping.builder() + .timeBase(1) + .limit(2) + .name("test") + .pathSelector(tmp) + .build()); + + Mono resp1 = + groups.get(GetGroupRequest.builder().groupId("test-group-id1").build()) + .delaySubscription(Duration.ofMillis(0)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp1")) + .doOnNext(v -> LOGGER.trace("resp1 emits: " + v)); + Mono resp2 = + groups.get(GetGroupRequest.builder().groupId("test-group-id2").build()) + .delaySubscription(Duration.ofMillis(100)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp2")) + .doOnNext(v -> LOGGER.trace("resp2 emits: " + v)); + Mono resp3 = + groups.get(GetGroupRequest.builder().groupId("test-group-id3").build()) + .delaySubscription(Duration.ofMillis(1500)) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp3")) + .doOnNext(v -> LOGGER.trace("resp3 emits: " + v)); + + Flux merged = Flux.merge(resp1, resp2, resp3); + + StepVerifier.create(merged) + .expectNext(resp1Body) + .expectNext(resp1Body) + .expectNext(resp1Body) + .verifyComplete(); + org.mockito.Mockito.verify(throttlerSpy, never()).doDelay(any(), any()); + } + + @Test + void noopWhenNoLimitIsSetTest() throws InterruptedException { + UaaThrottler throttlerSpy = Mockito.spy(UaaThrottler.getInstance()); + UaaThrottler.setInstance(throttlerSpy); + GetGroupResponse resp1Body = createGroupResponse(); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id1", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id2", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + mockRequestParallel( + createInteractionContext( + "/Groups/test-group-id3", "fixtures/uaa/groups/GET_{id}_response.json"), + Duration.ofMillis(0)); + + Mono resp1 = + groups.get(GetGroupRequest.builder().groupId("test-group-id1").build()) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp1")) + .doOnNext(v -> LOGGER.trace("resp1 emits: " + v)); + Mono resp2 = + groups.get(GetGroupRequest.builder().groupId("test-group-id2").build()) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp2")) + .doOnNext(v -> LOGGER.trace("resp2 emits: " + v)); + Mono resp3 = + groups.get(GetGroupRequest.builder().groupId("test-group-id3").build()) + .doOnSubscribe(s -> LOGGER.trace("Subscribing resp3")) + .doOnNext(v -> LOGGER.trace("resp3 emits: " + v)); + + Flux merged = Flux.merge(resp1, resp2, resp3); + + StepVerifier.create(merged) + .expectNext(resp1Body) + .expectNext(resp1Body) + .expectNext(resp1Body) + .verifyComplete(); + org.mockito.Mockito.verify(throttlerSpy, never()).doDelay(any(), any()); + } + + @Test + void isDelayNeeded_worksWithSeveralLimiters() { + UaaThrottler throttler = UaaThrottler.getInstance(); + throttler.addLimiterMapping( + LimiterMapping.builder() + .timeBase(1) + .name("test1") + .limit(-1) + .pathSelector( + PathSelector.builder() + .type(PathMatchType.equals) + .path("/SomethingElse") + .build()) + .build()); + throttler.addLimiterMapping( + LimiterMapping.builder() + .timeBase(1) + .name("test2") + .limit(-1) + .pathSelector( + PathSelector.builder() + .type(PathMatchType.startsWith) + .path("/Groups") + .build()) + .build()); + assertThat(throttler.checkDelayNeededAndResume("/Users")).isFalse(); + assertThat(throttler.checkDelayNeededAndResume("/Groups")).isTrue(); + } + + private GetGroupResponse createGroupResponse() { + return GetGroupResponse.builder() + .id("test-group-id") + .metadata( + Metadata.builder() + .created("2016-06-03T17:59:30.527Z") + .lastModified("2016-06-03T17:59:30.561Z") + .version(1) + .build()) + .description("the cool group") + .displayName("Cooler Group Name for Retrieve") + .member( + MemberSummary.builder() + .origin("uaa") + .type(MemberType.USER) + .memberId("f0e6a061-6e3a-4be9-ace5-142ee24e20b7") + .build()) + .schema("urn:scim:schemas:core:1.0") + .zoneId("uaa") + .build(); + } + + private InteractionContext createInteractionContext(String path, String responseFile) { + return InteractionContext.builder() + .request(TestRequest.builder().method(GET).path(path).build()) + .response(TestResponse.builder().status(OK).payload(responseFile).build()) + .build(); + } +} diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java index 2275afcccd..82e930d62d 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/uaa/groups/ReactorGroupsTest.java @@ -73,7 +73,7 @@ final class ReactorGroupsTest extends AbstractUaaApiTest { private final ReactorGroups groups = new ReactorGroups( - CONNECTION_CONTEXT, this.root, TOKEN_PROVIDER, Collections.emptyMap()); + CONNECTION_CONTEXT, super.root, TOKEN_PROVIDER, Collections.emptyMap()); @Test void addMember() { diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java index 9c88d37f59..665fb920ee 100644 --- a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/UaaClient.java @@ -21,6 +21,7 @@ import org.cloudfoundry.uaa.groups.Groups; import org.cloudfoundry.uaa.identityproviders.IdentityProviders; import org.cloudfoundry.uaa.identityzones.IdentityZones; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.serverinformation.ServerInformation; import org.cloudfoundry.uaa.tokens.Tokens; import org.cloudfoundry.uaa.users.Users; @@ -80,4 +81,9 @@ public interface UaaClient { * Main entry point to the UAA User Client API */ Users users(); + + /** + * Main entry point to the UAA Ratelimit API + */ + Ratelimit rateLimit(); } diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/PathSelectorModel.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/PathSelectorModel.java new file mode 100644 index 0000000000..1dbfdd029a --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/PathSelectorModel.java @@ -0,0 +1,66 @@ +package org.cloudfoundry.uaa.ratelimit; + +import java.util.List; +import org.immutables.value.Value; + +public interface PathSelectorModel { + + enum PathMatchType { + equals, + startsWith, + other + } + + @Value.Immutable + interface _PathSelector { + PathMatchType type(); + + String path(); + + default boolean matches(String id) { + switch (type()) { + case equals: + return path().equals(id); + case startsWith: + return id.startsWith(path()); + case other: + return true; + default: + System.err.println("unhandled enum value " + type()); + } + return true; + } + } + + @Value.Immutable + interface _LimiterMapping { + /** + * time until the ratelimit is reset in seconds. + * @return + */ + @Value.Default + default int timeBase() { + return 1; + } + ; + + String name(); + + @Value.Default + default int limit() { + return 0; + } + ; + + List pathSelectors(); + + default boolean matches(String url) { + for (PathSelector oneSelector : pathSelectors()) { + if (oneSelector.matches(url)) { + return true; + } + } + return false; + } + } +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java new file mode 100644 index 0000000000..c277fa7331 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/Ratelimit.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import reactor.core.publisher.Mono; + +/** + * Main entry point to the UAA Ratelimit Client API + */ +public interface Ratelimit { + + Mono getRatelimit(RatelimitRequest request); +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java new file mode 100644 index 0000000000..57811e9e59 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_Current.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.util.Date; + +import org.immutables.value.Value; + +/** + * The payload for the uaa ratelimiting + */ +@JsonDeserialize +@Value.Immutable +abstract class _Current { + + /** + * The number of configured limiter mappings + */ + @JsonProperty("limiterMappings") + abstract Integer getLimiterMappings(); + + /** + * Is ratelimit "ACTIVE" or not? Possible values are DISABLED, PENDING, ACTIVE + */ + @JsonProperty("status") + abstract String getStatus(); + + /** + * Timestamp, when this Current was created. + */ + @JsonProperty("asOf") + abstract Date getTimeOfCurrent(); + + /** + * The credentialIdExtractor + */ + @JsonProperty("credentialIdExtractor") + abstract String getCredentialIdExtractor(); + + /** + * The loggingLevel. Valid values include: "OnlyLimited", "AllCalls" and "AllCallsWithDetails" + */ + @JsonProperty("loggingLevel") + abstract String getLoggingLevel(); +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java new file mode 100644 index 0000000000..be75d58bfa --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitRequest.java @@ -0,0 +1,24 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import org.immutables.value.Value; + +@Value.Immutable +abstract class _RatelimitRequest { + +} diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java new file mode 100644 index 0000000000..0fe927cbf6 --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/uaa/ratelimit/_RatelimitResponse.java @@ -0,0 +1,36 @@ +/* + * Copyright 2013-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.uaa.ratelimit; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.cloudfoundry.Nullable; +import org.immutables.value.Value; + +@JsonDeserialize +@Value.Immutable +abstract class _RatelimitResponse { + + @JsonProperty("current") + @Nullable + abstract Current getCurrentData(); + + @JsonProperty("fromSource") + @Nullable + abstract String getFromSource(); + +} diff --git a/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java b/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java index c0b3f44b30..a42a71798b 100644 --- a/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java +++ b/integration-test/src/test/java/org/cloudfoundry/IntegrationTestConfiguration.java @@ -65,6 +65,7 @@ import org.cloudfoundry.reactor.uaa.ReactorUaaClient; import org.cloudfoundry.routing.RoutingClient; import org.cloudfoundry.uaa.UaaClient; +import org.cloudfoundry.uaa.UaaRatelimitInitializer; import org.cloudfoundry.uaa.clients.CreateClientRequest; import org.cloudfoundry.uaa.groups.AddMemberRequest; import org.cloudfoundry.uaa.groups.CreateGroupRequest; @@ -73,6 +74,7 @@ import org.cloudfoundry.uaa.groups.ListGroupsRequest; import org.cloudfoundry.uaa.groups.ListGroupsResponse; import org.cloudfoundry.uaa.groups.MemberType; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; import org.cloudfoundry.uaa.users.CreateUserRequest; import org.cloudfoundry.uaa.users.CreateUserResponse; import org.cloudfoundry.uaa.users.Email; @@ -209,7 +211,9 @@ ReactorUaaClient adminUaaClient( @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono> client( - @Qualifier("admin") UaaClient uaaClient, String clientId, String clientSecret) { + @Qualifier("admin") UaaClient uaaClient, + @Qualifier("clientId") String clientId, + @Qualifier("clientSecret") String clientSecret) { return uaaClient .clients() .create( @@ -243,17 +247,19 @@ String clientSecret(NameFactory nameFactory) { } @Bean + @DependsOn("uaaRateLimitInitializer") CloudFoundryCleaner cloudFoundryCleaner( @Qualifier("admin") CloudFoundryClient cloudFoundryClient, NameFactory nameFactory, @Qualifier("admin") NetworkingClient networkingClient, - Version serverVersion, + @Qualifier("serverVersion") Version serverVersion, @Qualifier("admin") UaaClient uaaClient) { return new CloudFoundryCleaner( cloudFoundryClient, nameFactory, networkingClient, serverVersion, uaaClient); } @Bean + @Qualifier("nonAdmin") ReactorCloudFoundryClient cloudFoundryClient( ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorCloudFoundryClient.builder() @@ -264,13 +270,13 @@ ReactorCloudFoundryClient cloudFoundryClient( @Bean DefaultCloudFoundryOperations cloudFoundryOperations( - CloudFoundryClient cloudFoundryClient, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, DopplerClient dopplerClient, - NetworkingClient networkingClient, + @Qualifier("nonAdmin") NetworkingClient networkingClient, RoutingClient routingClient, - UaaClient uaaClient, - String organizationName, - String spaceName) { + @Qualifier("nonAdmin") UaaClient uaaClient, + @Qualifier("organizationName") String organizationName, + @Qualifier("spaceName") String spaceName) { return DefaultCloudFoundryOperations.builder() .cloudFoundryClient(cloudFoundryClient) .dopplerClient(dopplerClient) @@ -315,10 +321,24 @@ DefaultConnectionContext connectionContext( connectionContext.proxyConfiguration(proxyConfiguration.build()); } - return connectionContext.build(); } + @Bean + public UaaRatelimitInitializer uaaRateLimitInitializer( + Ratelimit ratelimitService, + @Value("${uaa.api.request.limit:#{null}}") Integer commandlineRequestLimit) { + return new UaaRatelimitInitializer(ratelimitService, commandlineRequestLimit); + } + + @Bean + Ratelimit uaaRatelimit( + ConnectionContext connectionContext, + @Value("${test.admin.clientId}") String clientId, + @Value("${test.admin.clientSecret}") String clientSecret) { + return adminUaaClient(connectionContext, clientId, clientSecret).rateLimit(); + } + @Bean DopplerClient dopplerClient(ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorDopplerClient.builder() @@ -344,7 +364,9 @@ RandomNameFactory nameFactory() { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono metricRegistrarServiceInstance( - CloudFoundryClient cloudFoundryClient, Mono spaceId, NameFactory nameFactory) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("spaceId") Mono spaceId, + NameFactory nameFactory) { return spaceId.flatMap( spaceIdValue -> cloudFoundryClient @@ -360,6 +382,7 @@ Mono metricRegistrarServiceInstance( } @Bean + @Qualifier("nonAdmin") NetworkingClient networkingClient( ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorNetworkingClient.builder() @@ -371,10 +394,10 @@ NetworkingClient networkingClient( @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono organizationId( - CloudFoundryClient cloudFoundryClient, - String organizationName, - String organizationQuotaName, - Mono userId) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("organizationName") String organizationName, + @Qualifier("organizationQuotaName") String organizationQuotaName, + @Qualifier("userId") Mono userId) { return userId.flatMap( userId1 -> cloudFoundryClient @@ -468,12 +491,12 @@ Version serverVersion(@Qualifier("admin") CloudFoundryClient cloudFoundryClient) @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono serviceBrokerId( - CloudFoundryClient cloudFoundryClient, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, NameFactory nameFactory, - String planName, - String serviceBrokerName, - String serviceName, - Mono spaceId) { + @Qualifier("planName") String planName, + @Qualifier("serviceBrokerName") String serviceBrokerName, + @Qualifier("serviceName") String serviceName, + @Qualifier("spaceId") Mono spaceId) { return spaceId.flatMap( spaceId1 -> ServiceBrokerUtils.createServiceBroker( @@ -510,7 +533,9 @@ String serviceName(NameFactory nameFactory) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono spaceId( - CloudFoundryClient cloudFoundryClient, Mono organizationId, String spaceName) { + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("organizationId") Mono organizationId, + @Qualifier("spaceName") String spaceName) { return organizationId .flatMap( orgId -> @@ -535,7 +560,9 @@ String spaceName(NameFactory nameFactory) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono stackId(CloudFoundryClient cloudFoundryClient, Mono stackName) { + Mono stackId( + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("stackName") Mono stackName) { return stackName .flux() .flatMap( @@ -563,7 +590,7 @@ Mono stackId(CloudFoundryClient cloudFoundryClient, Mono stackNa */ @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono stackName(CloudFoundryClient cloudFoundryClient) { + Mono stackName(@Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient) { return PaginationUtils.requestClientV2Resources( page -> cloudFoundryClient @@ -581,11 +608,12 @@ Mono stackName(CloudFoundryClient cloudFoundryClient) { @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") Mono testLogCacheApp( - CloudFoundryClient cloudFoundryClient, - Mono spaceId, - Mono metricRegistrarServiceInstance, - String testLogCacheAppName, - String testLogCacheHostName, + @Qualifier("nonAdmin") CloudFoundryClient cloudFoundryClient, + @Qualifier("spaceId") Mono spaceId, + @Qualifier("metricRegistrarServiceInstance") + Mono metricRegistrarServiceInstance, + @Qualifier("testLogCacheAppName") String testLogCacheAppName, + @Qualifier("testLogCacheHostName") String testLogCacheHostName, Path testLogCacheAppbits) { return metricRegistrarServiceInstance .zipWith(spaceId) @@ -633,7 +661,10 @@ String testLogCacheHostName(NameFactory nameFactory) { @Bean @DependsOn({"client", "userId"}) PasswordGrantTokenProvider tokenProvider( - String clientId, String clientSecret, String password, String username) { + @Qualifier("clientId") String clientId, + @Qualifier("clientSecret") String clientSecret, + @Qualifier("password") String password, + @Qualifier("username") String username) { return PasswordGrantTokenProvider.builder() .clientId(clientId) .clientSecret(clientSecret) @@ -643,6 +674,7 @@ PasswordGrantTokenProvider tokenProvider( } @Bean + @Qualifier("nonAdmin") ReactorUaaClient uaaClient(ConnectionContext connectionContext, TokenProvider tokenProvider) { return ReactorUaaClient.builder() .connectionContext(connectionContext) @@ -652,7 +684,10 @@ ReactorUaaClient uaaClient(ConnectionContext connectionContext, TokenProvider to @Bean(initMethod = "block") @DependsOn("cloudFoundryCleaner") - Mono userId(@Qualifier("admin") UaaClient uaaClient, String password, String username) { + Mono userId( + @Qualifier("admin") UaaClient uaaClient, + @Qualifier("password") String password, + @Qualifier("username") String username) { return uaaClient .users() .create( diff --git a/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java b/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java new file mode 100644 index 0000000000..5aa37dcf19 --- /dev/null +++ b/integration-test/src/test/java/org/cloudfoundry/uaa/UaaRatelimitInitializer.java @@ -0,0 +1,90 @@ +package org.cloudfoundry.uaa; + +import java.time.Duration; +import org.cloudfoundry.reactor.uaa.UaaThrottler; +import org.cloudfoundry.uaa.ratelimit.Current; +import org.cloudfoundry.uaa.ratelimit.LimiterMapping; +import org.cloudfoundry.uaa.ratelimit.PathSelector; +import org.cloudfoundry.uaa.ratelimit.PathSelectorModel.PathMatchType; +import org.cloudfoundry.uaa.ratelimit.Ratelimit; +import org.cloudfoundry.uaa.ratelimit.RatelimitRequest; +import org.cloudfoundry.uaa.ratelimit.RatelimitResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +@Component +public class UaaRatelimitInitializer implements InitializingBean { + private final Logger logger = LoggerFactory.getLogger("cloudfoundry-client.test"); + + private final Ratelimit ratelimitService; + private Integer environmentRequestlimit; + + public UaaRatelimitInitializer(Ratelimit ratelimitService, Integer envRequestLimit) { + this.ratelimitService = ratelimitService; + this.environmentRequestlimit = envRequestLimit; + } + + private void init() { + int limit = 0; + + ratelimitService + .getRatelimit(RatelimitRequest.builder().build()) + .map(response -> getServerRatelimit(response)) + .timeout(Duration.ofSeconds(5)) + .onErrorResume( + ex -> { + logger.error( + "Warning: could not fetch UAA rate limit, using default" + + " " + + 0 + + ". Cause: " + + ex); + return Mono.just(false); + }) + .block(); + + if (environmentRequestlimit != null) { + limit = environmentRequestlimit.intValue(); + logger.debug("UaaRatelimitInitializer using configured value " + limit); + } + + LimiterMapping tmp = + LimiterMapping.builder() + .limit(limit) + .name("Test") + .timeBase(1) + .pathSelector( + PathSelector.builder().path("").type(PathMatchType.other).build()) + .build(); + UaaThrottler.getInstance().addLimiterMapping(tmp); + } + + private Boolean getServerRatelimit(RatelimitResponse response) { + Current curr = response.getCurrentData(); + if (!"ACTIVE".equals(curr.getStatus())) { + logger.debug( + "UaaRatelimitInitializer server ratelimit is not 'ACTIVE', but " + + curr.getStatus() + + ". Ignoring server value for ratelimit."); + return false; + } + Integer result = curr.getLimiterMappings(); + logger.info( + "Server uses uaa rate limiting. There are " + + result + + " mappings declared in file " + + response.getFromSource()); + logger.info( + "If you encounter 429 return codes, configure uaa rate limiting or set variable" + + " 'UAA_API_REQUEST_LIMIT' to a save value."); + return true; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.init(); + } +}