diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc
index 2bf644f24..98d8e166c 100644
--- a/docs/src/main/asciidoc/_configprops.adoc
+++ b/docs/src/main/asciidoc/_configprops.adoc
@@ -32,6 +32,10 @@
|spring.cloud.loadbalancer.health-check.path | |
|spring.cloud.loadbalancer.hint | | Allows setting the value of hint
that is passed on to the LoadBalancer request and can subsequently be used in {@link ReactiveLoadBalancer} implementations.
|spring.cloud.loadbalancer.retry.avoid-previous-instance | true | Enables wrapping ServiceInstanceListSupplier beans with `RetryAwareServiceInstanceListSupplier` if Spring-Retry is in the classpath.
+|spring.cloud.loadbalancer.retry.backoff.enabled | false | Indicates whether Reactor Retry backoffs should be applied.
+|spring.cloud.loadbalancer.retry.backoff.jitter | 0.5 | Used to set {@link RetryBackoffSpec#jitter}.
+|spring.cloud.loadbalancer.retry.backoff.max-backoff | | Used to set {@link RetryBackoffSpec#maxBackoff}.
+|spring.cloud.loadbalancer.retry.backoff.min-backoff | 5ms | Used to set {@link RetryBackoffSpec#minBackoff}.
|spring.cloud.loadbalancer.retry.enabled | true |
|spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance | 1 | Number of retries to be executed on the next ServiceInstance
. A ServiceInstance
is chosen before each retry call.
|spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance | 0 | Number of retries to be executed on the same ServiceInstance
.
diff --git a/docs/src/main/asciidoc/spring-cloud-commons.adoc b/docs/src/main/asciidoc/spring-cloud-commons.adoc
index 404fe6e39..8916b5a36 100644
--- a/docs/src/main/asciidoc/spring-cloud-commons.adoc
+++ b/docs/src/main/asciidoc/spring-cloud-commons.adoc
@@ -437,11 +437,13 @@ Then, `ReactiveLoadBalancer` is used underneath.
A load-balanced `RestTemplate` can be configured to retry failed requests.
By default, this logic is disabled.
-You can enable it by adding link:https://github.com/spring-projects/spring-retry[Spring Retry] to your application's classpath.
+For the non-reactive version (with `RestTemplate`), you can enable it by adding link:https://github.com/spring-projects/spring-retry[Spring Retry] to your application's classpath. For the reactive version (with `WebTestClient), you need to set `spring.cloud.loadbalancer.retry.enabled=false`.
-If you would like to disable the retry logic with Spring Retry on the classpath, you can set `spring.cloud.loadbalancer.retry.enabled=false`.
+If you would like to disable the retry logic with Spring Retry or Reactive Retry on the classpath, you can set `spring.cloud.loadbalancer.retry.enabled=false`.
-If you would like to implement a `BackOffPolicy` in your retries, you need to create a bean of type `LoadBalancedRetryFactory` and override the `createBackOffPolicy()` method.
+For the non-reactive implementation, if you would like to implement a `BackOffPolicy` in your retries, you need to create a bean of type `LoadBalancedRetryFactory` and override the `createBackOffPolicy()` method.
+
+For the reactive implementation, you just need to enable it by setting `spring.cloud.loadbalancer.retry.backoff.enabled` to `false`.
You can set:
@@ -449,6 +451,13 @@ You can set:
- `spring.cloud.loadbalancer.retry.maxRetriesOnNextServiceInstance` - indicates how many times a request should be retried a newly selected `ServiceInstance`
- `spring.cloud.loadbalancer.retry.retryableStatusCodes` - the status codes on which to always retry a failed request.
+For the reactive implementation, you can additionally set:
+ - `spring.cloud.loadbalancer.retry.backoff.minBackoff` - Sets the minimum backoff duration (by default, 5 milliseconds)
+ - `spring.cloud.loadbalancer.retry.backoff.maxBackoff` - Sets the maximum backoff duration (by default, max long value of milliseconds)
+ - `spring.cloud.loadbalancer.retry.backoff.jitter` - Sets the jitter used for calculationg the actual backoff duration for each call (by default, 0.5).
+
+For the reactive implementation, you can also implement your own `LoadBalancerRetryPolicy` to have more detailed control over the load-balanced call retries.
+
NOTE: For load-balanced retries, by default, we wrap the `ServiceInstanceListSupplier` bean with `RetryAwareServiceInstanceListSupplier` to select a different instance from the one previously chosen, if available. You can disable this behavior by setting the value of `spring.cloud.loadbalancer.retry.avoidPreviousInstance` to `false`.
====
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/ClientRequestContext.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/ClientRequestContext.java
index 4772f351e..8bdbab38f 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/ClientRequestContext.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/ClientRequestContext.java
@@ -16,6 +16,7 @@
package org.springframework.cloud.client.loadbalancer;
+import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.client.ClientRequest;
/**
@@ -36,4 +37,8 @@ public ClientRequest getClientRequest() {
return (ClientRequest) super.getClientRequest();
}
+ public HttpMethod method() {
+ return ((ClientRequest) super.getClientRequest()).method();
+ }
+
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java
new file mode 100644
index 000000000..18e7124ec
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+
+/**
+ * A utility class for load-balanced {@link ExchangeFilterFunction} instances.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+public final class ExchangeFilterFunctionUtils {
+
+ private ExchangeFilterFunctionUtils() {
+ throw new IllegalStateException("Can't instantiate a utility class.");
+ }
+
+ static String getHint(String serviceId, Map hints) {
+ String defaultHint = hints.getOrDefault("default", "default");
+ String hintPropertyValue = hints.get(serviceId);
+ return hintPropertyValue != null ? hintPropertyValue : defaultHint;
+ }
+
+ static ClientRequest buildClientRequest(ClientRequest request, URI uri) {
+ return ClientRequest.create(request.method(), uri).headers(headers -> headers.addAll(request.headers()))
+ .cookies(cookies -> cookies.addAll(request.cookies()))
+ .attributes(attributes -> attributes.putAll(request.attributes())).body(request.body()).build();
+ }
+
+ static String serviceInstanceUnavailableMessage(String serviceId) {
+ return "LoadBalancer does not contain an instance for the service " + serviceId;
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancedExchangeFilterFunction.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancedExchangeFilterFunction.java
new file mode 100644
index 000000000..a6ba1d031
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancedExchangeFilterFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+
+/**
+ * A marker interface for load-balanced {@link ExchangeFilterFunction} instances.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+public interface LoadBalancedExchangeFilterFunction extends ExchangeFilterFunction {
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerBeanPostProcessorAutoConfiguration.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerBeanPostProcessorAutoConfiguration.java
index 3af428078..8e61f33c0 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerBeanPostProcessorAutoConfiguration.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerBeanPostProcessorAutoConfiguration.java
@@ -58,8 +58,8 @@ protected static class ReactorDeferringLoadBalancerFilterConfig {
@Bean
@Primary
- DeferringLoadBalancerExchangeFilterFunction reactorDeferringLoadBalancerExchangeFilterFunction(
- ObjectProvider exchangeFilterFunctionProvider) {
+ DeferringLoadBalancerExchangeFilterFunction reactorDeferringLoadBalancerExchangeFilterFunction(
+ ObjectProvider exchangeFilterFunctionProvider) {
return new DeferringLoadBalancerExchangeFilterFunction<>(exchangeFilterFunctionProvider);
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerProperties.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerProperties.java
index 5bbc89df1..c25a43091 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerProperties.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerProperties.java
@@ -21,6 +21,8 @@
import java.util.Map;
import java.util.Set;
+import reactor.util.retry.RetryBackoffSpec;
+
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.http.HttpMethod;
import org.springframework.util.LinkedCaseInsensitiveMap;
@@ -47,7 +49,7 @@ public class LoadBalancerProperties {
private Map hint = new LinkedCaseInsensitiveMap<>();
/**
- * Properties for Spring-Retry support in Spring Cloud LoadBalancer.
+ * Properties for Spring-Retry and Reactor Retry support in Spring Cloud LoadBalancer.
*/
private Retry retry = new Retry();
@@ -141,6 +143,11 @@ public static class Retry {
*/
private Set retryableStatusCodes = new HashSet<>();
+ /**
+ * Properties for Reactor Retry backoffs in Spring Cloud LoadBalancer.
+ */
+ private Backoff backoff = new Backoff();
+
/**
* Returns true if the load balancer should retry failed requests.
* @return True if the load balancer should retry failed requests; false
@@ -190,6 +197,70 @@ public void setRetryableStatusCodes(Set retryableStatusCodes) {
this.retryableStatusCodes = retryableStatusCodes;
}
+ public Backoff getBackoff() {
+ return backoff;
+ }
+
+ public void setBackoff(Backoff backoff) {
+ this.backoff = backoff;
+ }
+
+ public static class Backoff {
+
+ /**
+ * Indicates whether Reactor Retry backoffs should be applied.
+ */
+ private boolean enabled = false;
+
+ /**
+ * Used to set {@link RetryBackoffSpec#minBackoff}.
+ */
+ private Duration minBackoff = Duration.ofMillis(5);
+
+ /**
+ * Used to set {@link RetryBackoffSpec#maxBackoff}.
+ */
+ private Duration maxBackoff = Duration.ofMillis(Long.MAX_VALUE);
+
+ /**
+ * Used to set {@link RetryBackoffSpec#jitter}.
+ */
+ private double jitter = 0.5d;
+
+ public Duration getMinBackoff() {
+ return minBackoff;
+ }
+
+ public void setMinBackoff(Duration minBackoff) {
+ this.minBackoff = minBackoff;
+ }
+
+ public Duration getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public void setMaxBackoff(Duration maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ }
+
+ public double getJitter() {
+ return jitter;
+ }
+
+ public void setJitter(double jitter) {
+ this.jitter = jitter;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ }
+
}
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java
new file mode 100644
index 000000000..f6e55c776
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.http.HttpMethod;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+
+/**
+ * Stores the data for a load-balanced call that is being retried.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+class LoadBalancerRetryContext {
+
+ private final ClientRequest request;
+
+ private ClientResponse clientResponse;
+
+ private Integer retriesSameServiceInstance = 0;
+
+ private Integer retriesNextServiceInstance = 0;
+
+ LoadBalancerRetryContext(ClientRequest request) {
+ this.request = request;
+ }
+
+ ClientRequest getRequest() {
+ return request;
+ }
+
+ ClientResponse getClientResponse() {
+ return clientResponse;
+ }
+
+ void setClientResponse(ClientResponse clientResponse) {
+ this.clientResponse = clientResponse;
+ }
+
+ Integer getRetriesSameServiceInstance() {
+ return retriesSameServiceInstance;
+ }
+
+ void incrementRetriesSameServiceInstance() {
+ retriesSameServiceInstance++;
+ }
+
+ void resetRetriesSameServiceInstance() {
+ retriesSameServiceInstance = 0;
+ }
+
+ Integer getRetriesNextServiceInstance() {
+ return retriesNextServiceInstance;
+ }
+
+ void incrementRetriesNextServiceInstance() {
+ retriesNextServiceInstance++;
+ }
+
+ Integer getResponseStatusCode() {
+ return clientResponse.statusCode().value();
+ }
+
+ HttpMethod getRequestMethod() {
+ return request.method();
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java
new file mode 100644
index 000000000..0d2edad87
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Pluggable policy used to establish whether a given load-balanced call should be
+ * retried.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+public interface LoadBalancerRetryPolicy {
+
+ /**
+ * Return true
to retry on the same service instance.
+ * @param context the context for the retry operation
+ * @return true to retry on the same service instance
+ */
+ boolean canRetrySameServiceInstance(LoadBalancerRetryContext context);
+
+ /**
+ * Return true
to retry on the next service instance.
+ * @param context the context for the retry operation
+ * @return true to retry on the same service instance
+ */
+ boolean canRetryNextServiceInstance(LoadBalancerRetryContext context);
+
+ /**
+ * Return true
to retry on the provided HTTP status code.
+ * @param statusCode the HTTP status code
+ * @return true to retry on the provided HTTP status code
+ */
+ boolean retryableStatusCode(int statusCode);
+
+ /**
+ * Return true
to retry on the provided HTTP method.
+ * @param method the HTTP request method
+ * @return true to retry on the provided HTTP method
+ */
+ boolean canRetryOnMethod(HttpMethod method);
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfiguration.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfiguration.java
index 3a994e2de..3af26e461 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfiguration.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfiguration.java
@@ -19,6 +19,8 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -38,10 +40,28 @@
public class ReactorLoadBalancerClientAutoConfiguration {
@ConditionalOnMissingBean
+ @ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "false",
+ matchIfMissing = true)
@Bean
public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
- ReactiveLoadBalancer.Factory loadBalancerFactory, LoadBalancerProperties properties) {
+ ReactiveLoadBalancer.Factory loadBalancerFactory, LoadBalancerProperties properties) {
return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory, properties);
}
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true")
+ @Bean
+ public RetryableLoadBalancerExchangeFilterFunction retryableLoadBalancerExchangeFilterFunction(
+ ReactiveLoadBalancer.Factory loadBalancerFactory, LoadBalancerProperties properties,
+ LoadBalancerRetryPolicy retryPolicy) {
+ return new RetryableLoadBalancerExchangeFilterFunction(retryPolicy, loadBalancerFactory, properties);
+ }
+
+ @ConditionalOnMissingBean
+ @ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true")
+ @Bean
+ public LoadBalancerRetryPolicy loadBalancerRetryPolicy(LoadBalancerProperties properties) {
+ return new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(properties);
+ }
+
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
index 0615bed59..6b2c903fa 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
@@ -30,7 +30,6 @@
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycle;
import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycleValidator;
-import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.http.HttpStatus;
@@ -39,6 +38,11 @@
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
+import static org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools.reconstructURI;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.buildClientRequest;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.getHint;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.serviceInstanceUnavailableMessage;
+
/**
* An {@link ExchangeFilterFunction} that uses {@link ReactiveLoadBalancer} to execute
* requests against a correct {@link ServiceInstance}.
@@ -47,7 +51,7 @@
* @since 2.2.0
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
+public class ReactorLoadBalancerExchangeFilterFunction implements LoadBalancedExchangeFilterFunction {
private static final Log LOG = LogFactory.getLog(ReactorLoadBalancerExchangeFilterFunction.class);
@@ -76,7 +80,7 @@ public Mono filter(ClientRequest clientRequest, ExchangeFunction
.getSupportedLifecycleProcessors(
loadBalancerFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
ClientRequestContext.class, ClientResponse.class, ServiceInstance.class);
- String hint = getHint(serviceId);
+ String hint = getHint(serviceId, properties.getHint());
DefaultRequest lbRequest = new DefaultRequest<>(
new ClientRequestContext(clientRequest, hint));
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
@@ -94,7 +98,7 @@ public Mono filter(ClientRequest clientRequest, ExchangeFunction
}
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Load balancer has retrieved the instance for service %s: %s", serviceId,
+ LOG.debug(String.format("LoadBalancer has retrieved the instance for service %s: %s", serviceId,
instance.getUri()));
}
ClientRequest newRequest = buildClientRequest(clientRequest, reconstructURI(instance, originalUrl));
@@ -108,10 +112,6 @@ public Mono filter(ClientRequest clientRequest, ExchangeFunction
});
}
- protected URI reconstructURI(ServiceInstance instance, URI original) {
- return LoadBalancerUriTools.reconstructURI(instance, original);
- }
-
protected Mono> choose(String serviceId, Request request) {
ReactiveLoadBalancer loadBalancer = loadBalancerFactory.getInstance(serviceId);
if (loadBalancer == null) {
@@ -120,20 +120,4 @@ protected Mono> choose(String serviceId, Request headers.addAll(request.headers()))
- .cookies(cookies -> cookies.addAll(request.cookies()))
- .attributes(attributes -> attributes.putAll(request.attributes())).body(request.body()).build();
- }
-
- private String getHint(String serviceId) {
- String defaultHint = properties.getHint().getOrDefault("default", "default");
- String hintPropertyValue = properties.getHint().get(serviceId);
- return hintPropertyValue != null ? hintPropertyValue : defaultHint;
- }
-
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java
new file mode 100644
index 000000000..acb2c95a1
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * The default implementation of {@link LoadBalancerRetryPolicy}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+public class RetryableExchangeFilterFunctionLoadBalancerRetryPolicy implements LoadBalancerRetryPolicy {
+
+ private final LoadBalancerProperties properties;
+
+ public RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(LoadBalancerProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean canRetrySameServiceInstance(LoadBalancerRetryContext context) {
+ return context.getRetriesSameServiceInstance() < properties.getRetry().getMaxRetriesOnSameServiceInstance();
+ }
+
+ @Override
+ public boolean canRetryNextServiceInstance(LoadBalancerRetryContext context) {
+ return context.getRetriesNextServiceInstance() < properties.getRetry().getMaxRetriesOnNextServiceInstance();
+ }
+
+ @Override
+ public boolean retryableStatusCode(int statusCode) {
+ return properties.getRetry().getRetryableStatusCodes().contains(statusCode);
+ }
+
+ @Override
+ public boolean canRetryOnMethod(HttpMethod method) {
+ return HttpMethod.GET.equals(method) || properties.getRetry().isRetryOnAllOperations();
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java
new file mode 100644
index 000000000..8864e9733
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetrySpec;
+
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.loadbalancer.ClientRequestContext;
+import org.springframework.cloud.client.loadbalancer.CompletionContext;
+import org.springframework.cloud.client.loadbalancer.DefaultRequest;
+import org.springframework.cloud.client.loadbalancer.EmptyResponse;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycle;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycleValidator;
+import org.springframework.cloud.client.loadbalancer.Request;
+import org.springframework.cloud.client.loadbalancer.Response;
+import org.springframework.cloud.client.loadbalancer.RetryableRequestContext;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.ExchangeFunction;
+
+import static org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools.reconstructURI;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.buildClientRequest;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.getHint;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.serviceInstanceUnavailableMessage;
+
+/**
+ * An {@link ExchangeFilterFunction} that uses {@link ReactiveLoadBalancer} to execute
+ * requests against a correct {@link ServiceInstance} and Reactor Retries to retry the
+ * call both against the same and the next service instance, based on the provided
+ * {@link LoadBalancerRetryPolicy}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+public class RetryableLoadBalancerExchangeFilterFunction implements LoadBalancedExchangeFilterFunction {
+
+ private static final Log LOG = LogFactory.getLog(RetryableLoadBalancerExchangeFilterFunction.class);
+
+ private static final List> exceptions = Arrays.asList(IOException.class,
+ TimeoutException.class,
+ org.springframework.cloud.client.loadbalancer.reactive.RetryableStatusCodeException.class);
+
+ private final LoadBalancerRetryPolicy retryPolicy;
+
+ private final LoadBalancerProperties properties;
+
+ private final ReactiveLoadBalancer.Factory loadBalancerFactory;
+
+ public RetryableLoadBalancerExchangeFilterFunction(LoadBalancerRetryPolicy retryPolicy,
+ ReactiveLoadBalancer.Factory loadBalancerFactory, LoadBalancerProperties properties) {
+ this.retryPolicy = retryPolicy;
+ this.loadBalancerFactory = loadBalancerFactory;
+ this.properties = properties;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Mono filter(ClientRequest clientRequest, ExchangeFunction next) {
+ LoadBalancerRetryContext loadBalancerRetryContext = new LoadBalancerRetryContext(clientRequest);
+ Retry exchangeRetry = buildRetrySpec(properties.getRetry().getMaxRetriesOnSameServiceInstance(), true);
+ Retry filterRetry = buildRetrySpec(properties.getRetry().getMaxRetriesOnNextServiceInstance(), false);
+
+ URI originalUrl = clientRequest.url();
+ String serviceId = originalUrl.getHost();
+ if (serviceId == null) {
+ String message = String.format("Request URI does not contain a valid hostname: %s", originalUrl.toString());
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(message);
+ }
+ return Mono.just(ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build());
+ }
+ Set supportedLifecycleProcessors = LoadBalancerLifecycleValidator
+ .getSupportedLifecycleProcessors(
+ loadBalancerFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
+ ClientRequestContext.class, ClientResponse.class, ServiceInstance.class);
+ String hint = getHint(serviceId, properties.getHint());
+ DefaultRequest lbRequest = new DefaultRequest<>(
+ new RetryableRequestContext(null, clientRequest, hint));
+ supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
+ return Mono.defer(() -> choose(serviceId, lbRequest).flatMap(lbResponse -> {
+ ServiceInstance instance = lbResponse.getServer();
+ lbRequest.setContext(new RetryableRequestContext(instance, clientRequest, hint));
+ if (instance == null) {
+ String message = serviceInstanceUnavailableMessage(serviceId);
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(message);
+ }
+ supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
+ .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbResponse)));
+ return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE)
+ .body(serviceInstanceUnavailableMessage(serviceId)).build());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("LoadBalancer has retrieved the instance for service %s: %s", serviceId,
+ instance.getUri()));
+ }
+ ClientRequest newRequest = buildClientRequest(clientRequest, reconstructURI(instance, originalUrl));
+ return next.exchange(newRequest)
+ .doOnError(throwable -> supportedLifecycleProcessors.forEach(
+ lifecycle -> lifecycle.onComplete(new CompletionContext(
+ CompletionContext.Status.FAILED, throwable, lbResponse))))
+ .doOnSuccess(clientResponse -> supportedLifecycleProcessors.forEach(
+ lifecycle -> lifecycle.onComplete(new CompletionContext(
+ CompletionContext.Status.SUCCESS, lbResponse, clientResponse))))
+ .map(clientResponse -> {
+ loadBalancerRetryContext.setClientResponse(clientResponse);
+ if (shouldRetrySameServiceInstance(loadBalancerRetryContext)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying on status code: %d",
+ clientResponse.statusCode().value()));
+ }
+ throw new RetryableStatusCodeException();
+ }
+ return clientResponse;
+
+ });
+ }).map(clientResponse -> {
+ loadBalancerRetryContext.setClientResponse(clientResponse);
+ if (shouldRetryNextServiceInstance(loadBalancerRetryContext)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying on status code: %d", clientResponse.statusCode().value()));
+ }
+ throw new RetryableStatusCodeException();
+ }
+ return clientResponse;
+
+ }).retryWhen(exchangeRetry)).retryWhen(filterRetry);
+ }
+
+ private Retry buildRetrySpec(int max, boolean transientErrors) {
+ LoadBalancerProperties.Retry.Backoff backoffProperties = properties.getRetry().getBackoff();
+ if (backoffProperties.isEnabled()) {
+ return RetrySpec.backoff(max, backoffProperties.getMinBackoff()).filter(this::isRetryException)
+ .maxBackoff(backoffProperties.getMaxBackoff()).jitter(backoffProperties.getJitter())
+ .transientErrors(transientErrors);
+ }
+ return RetrySpec.max(max).filter(this::isRetryException).transientErrors(transientErrors);
+ }
+
+ private boolean shouldRetrySameServiceInstance(LoadBalancerRetryContext loadBalancerRetryContext) {
+ boolean shouldRetry = retryPolicy.retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode())
+ && retryPolicy.canRetryOnMethod(loadBalancerRetryContext.getRequestMethod())
+ && retryPolicy.canRetrySameServiceInstance(loadBalancerRetryContext);
+ if (shouldRetry) {
+ loadBalancerRetryContext.incrementRetriesSameServiceInstance();
+ }
+ return shouldRetry;
+ }
+
+ private boolean shouldRetryNextServiceInstance(LoadBalancerRetryContext loadBalancerRetryContext) {
+ boolean shouldRetry = retryPolicy.retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode())
+ && retryPolicy.canRetryOnMethod(loadBalancerRetryContext.getRequestMethod())
+ && retryPolicy.canRetryNextServiceInstance(loadBalancerRetryContext);
+ if (shouldRetry) {
+ loadBalancerRetryContext.incrementRetriesNextServiceInstance();
+ loadBalancerRetryContext.resetRetriesSameServiceInstance();
+ }
+ return shouldRetry;
+ }
+
+ private boolean isRetryException(Throwable throwable) {
+ return exceptions.stream()
+ .anyMatch(exception -> exception.isInstance(throwable)
+ || throwable != null && exception.isInstance(throwable.getCause())
+ || Exceptions.isRetryExhausted(throwable));
+ }
+
+ protected Mono> choose(String serviceId, Request request) {
+ ReactiveLoadBalancer loadBalancer = loadBalancerFactory.getInstance(serviceId);
+ if (loadBalancer == null) {
+ return Mono.just(new EmptyResponse());
+ }
+ return Mono.from(loadBalancer.choose(request));
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java
new file mode 100644
index 000000000..c4814aab3
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+/**
+ * An {@link IllegalStateException} used to trigger retries based on the returned HTTP
+ * status code.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+class RetryableStatusCodeException extends IllegalStateException {
+
+}
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/DiscoveryClientBasedReactiveLoadBalancer.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/DiscoveryClientBasedReactiveLoadBalancer.java
new file mode 100644
index 000000000..36b3678c6
--- /dev/null
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/DiscoveryClientBasedReactiveLoadBalancer.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.util.List;
+import java.util.Random;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.cloud.client.loadbalancer.DefaultResponse;
+import org.springframework.cloud.client.loadbalancer.EmptyResponse;
+import org.springframework.cloud.client.loadbalancer.Request;
+import org.springframework.cloud.client.loadbalancer.Response;
+import org.springframework.cloud.client.loadbalancer.RetryableRequestContext;
+
+/**
+ * A {@link ReactiveLoadBalancer} implementation used for tests.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+class DiscoveryClientBasedReactiveLoadBalancer implements ReactiveLoadBalancer {
+
+ private final Random random = new Random();
+
+ private final String serviceId;
+
+ private final DiscoveryClient discoveryClient;
+
+ DiscoveryClientBasedReactiveLoadBalancer(String serviceId, DiscoveryClient discoveryClient) {
+ this.serviceId = serviceId;
+ this.discoveryClient = discoveryClient;
+ }
+
+ @Override
+ public Publisher> choose() {
+ List instances = discoveryClient.getInstances(serviceId);
+ if (instances.size() == 0) {
+ return Mono.just(new EmptyResponse());
+ }
+ int instanceIdx = this.random.nextInt(instances.size());
+ return Mono.just(new DefaultResponse(instances.get(instanceIdx)));
+ }
+
+ @Override
+ public Publisher> choose(Request request) {
+
+ List instances = discoveryClient.getInstances(serviceId);
+ if (request.getContext() instanceof RetryableRequestContext) {
+ RetryableRequestContext context = (RetryableRequestContext) request.getContext();
+ if (context.getPreviousServiceInstance() != null) {
+ List instancesCopy = discoveryClient.getInstances(serviceId);
+ instancesCopy.remove(context.getPreviousServiceInstance());
+ if (!instancesCopy.isEmpty()) {
+ instances = instancesCopy;
+ }
+ }
+ }
+ if (instances.size() == 0) {
+ return Mono.just(new EmptyResponse());
+ }
+ int instanceIdx = this.random.nextInt(instances.size());
+ return Mono.just(new DefaultResponse(instances.get(instanceIdx)));
+ }
+
+}
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfigurationTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfigurationTests.java
index cac43fc80..455d40d5b 100644
--- a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfigurationTests.java
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerClientAutoConfigurationTests.java
@@ -59,6 +59,27 @@ void loadBalancerFilterAddedToWebClientBuilder() {
assertLoadBalanced(testService.webClient, ReactorLoadBalancerExchangeFilterFunction.class);
}
+ @Test
+ void loadBalancerFilterAddedToWebClientBuilderWithRetryEnabled() {
+ System.setProperty("spring.cloud.loadbalancer.retry.enabled", "true");
+ ConfigurableApplicationContext context = init(OneWebClientBuilder.class);
+ final Map webClientBuilders = context.getBeansOfType(WebClient.Builder.class);
+
+ then(webClientBuilders).isNotNull().hasSize(1);
+ WebClient.Builder webClientBuilder = webClientBuilders.values().iterator().next();
+ then(webClientBuilder).isNotNull();
+
+ assertLoadBalanced(webClientBuilder, RetryableLoadBalancerExchangeFilterFunction.class);
+
+ final Map testServiceMap = context
+ .getBeansOfType(OneWebClientBuilder.TestService.class);
+ then(testServiceMap).isNotNull().hasSize(1);
+ OneWebClientBuilder.TestService testService = testServiceMap.values().stream().findFirst().get();
+ assertLoadBalanced(testService.webClient, RetryableLoadBalancerExchangeFilterFunction.class);
+
+ System.clearProperty("spring.cloud.loadbalancer.retry.enabled");
+ }
+
@Test
void loadBalancerFilterAddedOnlyToLoadBalancedWebClientBuilder() {
ConfigurableApplicationContext context = init(TwoWebClientBuilders.class);
@@ -75,6 +96,24 @@ void loadBalancerFilterAddedOnlyToLoadBalancedWebClientBuilder() {
then(getFilters(two.nonLoadBalanced)).isNullOrEmpty();
}
+ @Test
+ void loadBalancerFilterAddedOnlyToLoadBalancedWebClientBuilderWithRetryEnabled() {
+ System.setProperty("spring.cloud.loadbalancer.retry.enabled", "true");
+ ConfigurableApplicationContext context = init(TwoWebClientBuilders.class);
+ final Map webClientBuilders = context.getBeansOfType(WebClient.Builder.class);
+
+ then(webClientBuilders).hasSize(2);
+
+ TwoWebClientBuilders.Two two = context.getBean(TwoWebClientBuilders.Two.class);
+
+ then(two.loadBalanced).isNotNull();
+ assertLoadBalanced(two.loadBalanced, RetryableLoadBalancerExchangeFilterFunction.class);
+
+ then(two.nonLoadBalanced).isNotNull();
+ then(getFilters(two.nonLoadBalanced)).isNullOrEmpty();
+ System.clearProperty("spring.cloud.loadbalancer.retry.enabled");
+ }
+
@Test
void noCustomWebClientBuilders() {
ConfigurableApplicationContext context = init(NoWebClientBuilder.class);
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunctionTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunctionTests.java
index cf76b5d36..30887182a 100644
--- a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunctionTests.java
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunctionTests.java
@@ -20,17 +20,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
@@ -44,11 +40,8 @@
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties;
import org.springframework.cloud.client.loadbalancer.CompletionContext;
import org.springframework.cloud.client.loadbalancer.DefaultRequestContext;
-import org.springframework.cloud.client.loadbalancer.DefaultResponse;
-import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycle;
import org.springframework.cloud.client.loadbalancer.Request;
-import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -247,33 +240,3 @@ protected String getName() {
}
}
-
-class DiscoveryClientBasedReactiveLoadBalancer implements ReactiveLoadBalancer {
-
- private final Random random = new Random();
-
- private final String serviceId;
-
- private final DiscoveryClient discoveryClient;
-
- DiscoveryClientBasedReactiveLoadBalancer(String serviceId, DiscoveryClient discoveryClient) {
- this.serviceId = serviceId;
- this.discoveryClient = discoveryClient;
- }
-
- @Override
- public Publisher> choose() {
- List instances = discoveryClient.getInstances(serviceId);
- if (instances.size() == 0) {
- return Mono.just(new EmptyResponse());
- }
- int instanceIdx = this.random.nextInt(instances.size());
- return Mono.just(new DefaultResponse(instances.get(instanceIdx)));
- }
-
- @Override
- public Publisher> choose(Request request) {
- return choose();
- }
-
-}
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java
new file mode 100644
index 000000000..f03d6e567
--- /dev/null
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2012-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.cloud.client.DefaultServiceInstance;
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties;
+import org.springframework.cloud.client.loadbalancer.CompletionContext;
+import org.springframework.cloud.client.loadbalancer.DefaultRequestContext;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerLifecycle;
+import org.springframework.cloud.client.loadbalancer.Request;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.BDDAssertions.then;
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+/**
+ * Integration tests for {@link RetryableLoadBalancerExchangeFilterFunction}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 3.0.0
+ */
+@SpringBootTest(webEnvironment = RANDOM_PORT)
+class RetryableLoadBalancerExchangeFilterFunctionIntegrationTests {
+
+ @Autowired
+ private RetryableLoadBalancerExchangeFilterFunction loadBalancerFunction;
+
+ @Autowired
+ private SimpleDiscoveryProperties properties;
+
+ @Autowired
+ private LoadBalancerProperties loadBalancerProperties;
+
+ @Autowired
+ private ReactiveLoadBalancer.Factory factory;
+
+ @LocalServerPort
+ private int port;
+
+ @BeforeEach
+ void setUp() {
+ DefaultServiceInstance instance = new DefaultServiceInstance();
+ instance.setServiceId("testservice");
+ instance.setUri(URI.create("http://localhost:" + port));
+ DefaultServiceInstance instanceWithNoLifecycleProcessors = new DefaultServiceInstance();
+ instanceWithNoLifecycleProcessors.setServiceId("serviceWithNoLifecycleProcessors");
+ instanceWithNoLifecycleProcessors.setUri(URI.create("http://localhost:" + port));
+ properties.getInstances().put("testservice", Collections.singletonList(instance));
+ properties.getInstances().put("serviceWithNoLifecycleProcessors",
+ Collections.singletonList(instanceWithNoLifecycleProcessors));
+ }
+
+ @Test
+ void loadBalancerLifecycleCallbacksExecuted() {
+ final String callbackTestHint = "callbackTestHint";
+ loadBalancerProperties.getHint().put("testservice", "callbackTestHint");
+ final String result = "callbackTestResult";
+
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice")
+ .filter(this.loadBalancerFunction).build().get().uri("/callback").exchange().block();
+
+ Collection> lifecycleLogRequests = ((TestLoadBalancerLifecycle) factory
+ .getInstances("testservice", LoadBalancerLifecycle.class).get("loadBalancerLifecycle")).getStartLog()
+ .values();
+ Collection> anotherLifecycleLogRequests = ((AnotherLoadBalancerLifecycle) factory
+ .getInstances("testservice", LoadBalancerLifecycle.class).get("anotherLoadBalancerLifecycle"))
+ .getCompleteLog().values();
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(lifecycleLogRequests).extracting(request -> ((DefaultRequestContext) request.getContext()).getHint())
+ .contains(callbackTestHint);
+ assertThat(anotherLifecycleLogRequests)
+ .extracting(completionContext -> ((ClientResponse) completionContext.getClientResponse())
+ .bodyToMono(String.class).block())
+ .contains(result);
+ }
+
+ @Test
+ void correctResponseReturnedForExistingHostAndInstancePresent() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World");
+ }
+
+ @Test
+ void correctResponseReturnedAfterRetryingOnSameServiceInstance() {
+ loadBalancerProperties.getRetry().setMaxRetriesOnSameServiceInstance(1);
+ loadBalancerProperties.getRetry().getRetryableStatusCodes().add(500);
+
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice")
+ .filter(this.loadBalancerFunction).build().get().uri("/exception").exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World!");
+ }
+
+ @Test
+ void correctResponseReturnedAfterRetryingOnNextServiceInstanceWithBackoff() {
+ loadBalancerProperties.getRetry().getBackoff().setEnabled(true);
+ loadBalancerProperties.getRetry().setMaxRetriesOnSameServiceInstance(1);
+ DefaultServiceInstance goodRetryTestInstance = new DefaultServiceInstance();
+ goodRetryTestInstance.setServiceId("retrytest");
+ goodRetryTestInstance.setUri(URI.create("http://localhost:" + port));
+ DefaultServiceInstance badRetryTestInstance = new DefaultServiceInstance();
+ badRetryTestInstance.setServiceId("retrytest");
+ badRetryTestInstance.setUri(URI.create("http://localhost:" + 8080));
+ properties.getInstances().put("retrytest", Arrays.asList(badRetryTestInstance, goodRetryTestInstance));
+ loadBalancerProperties.getRetry().getRetryableStatusCodes().add(500);
+
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://retrytest")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World");
+
+ ClientResponse secondClientResponse = WebClient.builder().baseUrl("http://retrytest")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange().block();
+
+ then(secondClientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(secondClientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World");
+ }
+
+ @Test
+ void serviceUnavailableReturnedWhenNoInstancePresent() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://xxx").filter(this.loadBalancerFunction)
+ .build().get().exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
+ }
+
+ @Test
+ @Disabled
+ // FIXME 3.0.0
+ void badRequestReturnedForIncorrectHost() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http:///xxx").filter(this.loadBalancerFunction)
+ .build().get().exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
+ }
+
+ @Test
+ void exceptionNotThrownWhenFactoryReturnsNullLifecycleProcessorsMap() {
+ assertThatCode(() -> WebClient.builder().baseUrl("http://serviceWithNoLifecycleProcessors")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange().block())
+ .doesNotThrowAnyException();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @EnableDiscoveryClient
+ @EnableAutoConfiguration
+ @SpringBootConfiguration(proxyBeanMethods = false)
+ @RestController
+ static class Config {
+
+ AtomicInteger exceptionCallsCount = new AtomicInteger();
+
+ @GetMapping("/hello")
+ public String hello() {
+ return "Hello World";
+ }
+
+ @GetMapping("/callback")
+ String callbackTestResult() {
+ return "callbackTestResult";
+ }
+
+ @GetMapping("/exception")
+ String exception() {
+ int callCount = exceptionCallsCount.incrementAndGet();
+ if (callCount % 2 != 0) {
+ throw new IllegalStateException("Test!");
+ }
+ return "Hello World!";
+ }
+
+ @Bean
+ ReactiveLoadBalancer.Factory reactiveLoadBalancerFactory(DiscoveryClient discoveryClient) {
+ return new ReactiveLoadBalancer.Factory() {
+
+ private final TestLoadBalancerLifecycle testLoadBalancerLifecycle = new TestLoadBalancerLifecycle();
+
+ private final TestLoadBalancerLifecycle anotherLoadBalancerLifecycle = new AnotherLoadBalancerLifecycle();
+
+ @Override
+ public ReactiveLoadBalancer getInstance(String serviceId) {
+ return new org.springframework.cloud.client.loadbalancer.reactive.DiscoveryClientBasedReactiveLoadBalancer(
+ serviceId, discoveryClient);
+ }
+
+ @Override
+ public Map getInstances(String name, Class type) {
+ if (name.equals("serviceWithNoLifecycleProcessors")) {
+ return null;
+ }
+ Map lifecycleProcessors = new HashMap<>();
+ lifecycleProcessors.put("loadBalancerLifecycle", testLoadBalancerLifecycle);
+ lifecycleProcessors.put("anotherLoadBalancerLifecycle", anotherLoadBalancerLifecycle);
+ return lifecycleProcessors;
+ }
+
+ @Override
+ public X getInstance(String name, Class> clazz, Class>... generics) {
+ return null;
+ }
+ };
+ }
+
+ @Bean
+ LoadBalancerProperties loadBalancerProperties() {
+ return new LoadBalancerProperties();
+ }
+
+ @Bean
+ RetryableLoadBalancerExchangeFilterFunction exchangeFilterFunction(LoadBalancerProperties properties,
+ ReactiveLoadBalancer.Factory factory) {
+ return new RetryableLoadBalancerExchangeFilterFunction(
+ new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(properties), factory, properties);
+ }
+
+ }
+
+ protected static class TestLoadBalancerLifecycle implements LoadBalancerLifecycle