diff --git a/README.adoc b/README.adoc
index 6aeed1c4a..8fdcaee70 100644
--- a/README.adoc
+++ b/README.adoc
@@ -27,7 +27,7 @@ Extract the files into the JDK/jre/lib/security folder for whichever version of
== Building
-:jdkversion: 1.7
+:jdkversion: 1.8
=== Basic Compile and Test
diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc
index 0f7126ae7..dd749d138 100644
--- a/docs/src/main/asciidoc/_configprops.adoc
+++ b/docs/src/main/asciidoc/_configprops.adoc
@@ -33,6 +33,10 @@
|spring.cloud.loadbalancer.health-check.refetch-instances | false | Indicates whether the instances should be refetched by the HealthCheckServiceInstanceListSupplier. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux.
|spring.cloud.loadbalancer.health-check.refetch-instances-interval | 25s | Interval for refetching available service instances.
|spring.cloud.loadbalancer.health-check.repeat-health-check | true | Indicates whether health checks should keep repeating. It might be useful to set it to false if periodically refetching the instances, as every refetch will also trigger a healthcheck.
+|spring.cloud.loadbalancer.retry.backoff.enabled | false | Indicates whether Reactor Retry backoffs should be applied.
+|spring.cloud.loadbalancer.retry.backoff.jitter | 0.5 | Used to set {@link RetryBackoffSpec#jitter}.
+|spring.cloud.loadbalancer.retry.backoff.max-backoff | | Used to set {@link RetryBackoffSpec#maxBackoff}.
+|spring.cloud.loadbalancer.retry.backoff.min-backoff | 5ms | Used to set {@link RetryBackoffSpec#minBackoff}.
|spring.cloud.loadbalancer.retry.enabled | true |
|spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance | 1 | Number of retries to be executed on the next ServiceInstance. A ServiceInstance is chosen before each retry call.
|spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance | 0 | Number of retries to be executed on the same ServiceInstance.
diff --git a/docs/src/main/asciidoc/spring-cloud-commons.adoc b/docs/src/main/asciidoc/spring-cloud-commons.adoc
index ceba4ef73..39a135a4a 100644
--- a/docs/src/main/asciidoc/spring-cloud-commons.adoc
+++ b/docs/src/main/asciidoc/spring-cloud-commons.adoc
@@ -452,11 +452,67 @@ set the `spring.cloud.loadbalancer.ribbon.enabled` property to `false`.
A load-balanced `RestTemplate` can be configured to retry failed requests.
By default, this logic is disabled.
-You can enable it by adding link:https://github.com/spring-projects/spring-retry[Spring Retry] to your application's classpath.
+For the non-reactive version (with `RestTemplate`), you can enable it by adding link:https://github.com/spring-projects/spring-retry[Spring Retry] to your application's classpath.
+
+To use the reactive version of load-balanced retries in the Hoxton release train, you will need to instantiate your own `RetryableLoadBalancerExchangeFilterFunction` bean:
+
+[source,java,indent=0]
+----
+@Configuration
+public class MyConfiguration {
+
+ @Bean
+ RetryableLoadBalancerExchangeFilterFunction retryableLoadBalancerExchangeFilterFunction(
+ LoadBalancerRetryProperties properties,
+ ReactiveLoadBalancer.Factory factory) {
+ return new RetryableLoadBalancerExchangeFilterFunction(
+ new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ properties),
+ factory, properties);
+ }
+}
+----
+
+Then you can use it as a filter while building `webClient` instances:
+
+[source,java,indent=0]
+----
+public class MyClass {
+ @Autowired
+ private RetryableLoadBalancerExchangeFilterFunction retryableLbFunction;
+
+ public Mono doOtherStuff() {
+ return WebClient.builder().baseUrl("http://stores")
+ .filter(retryableLbFunction)
+ .build()
+ .get()
+ .uri("/stores")
+ .retrieve()
+ .bodyToMono(String.class);
+ }
+}
+----
If you would like to disable the retry logic with Spring Retry on the classpath, you can set `spring.cloud.loadbalancer.retry.enabled=false`.
-If you would like to implement a `BackOffPolicy` in your retries, you need to create a bean of type `LoadBalancedRetryFactory` and override the `createBackOffPolicy()` method.
+For the non-reactive implementation, if you would like to implement a `BackOffPolicy` in your retries, you need to create a bean of type `LoadBalancedRetryFactory` and override the `createBackOffPolicy()` method.
+
+For the reactive implementation, you just need to enable it by setting `spring.cloud.loadbalancer.retry.backoff.enabled` to `false`.
+
+You can set:
+
+- `spring.cloud.loadbalancer.retry.maxRetriesOnSameServiceInstance` - indicates how many times a request should be retried on the same `ServiceInstance` (counted separately for every selected instance)
+- `spring.cloud.loadbalancer.retry.maxRetriesOnNextServiceInstance` - indicates how many times a request should be retried a newly selected `ServiceInstance`
+- `spring.cloud.loadbalancer.retry.retryableStatusCodes` - the status codes on which to always retry a failed request.
+
+For the reactive implementation, you can additionally set:
+- `spring.cloud.loadbalancer.retry.backoff.minBackoff` - Sets the minimum backoff duration (by default, 5 milliseconds)
+- `spring.cloud.loadbalancer.retry.backoff.maxBackoff` - Sets the maximum backoff duration (by default, max long value of milliseconds)
+- `spring.cloud.loadbalancer.retry.backoff.jitter` - Sets the jitter used for calculationg the actual backoff duration for each call (by default, 0.5).
+
+For the reactive implementation, you can also implement your own `LoadBalancerRetryPolicy` to have more detailed control over the load-balanced call retries.
+
+WARN:: For the non-reactive version, if you chose to override the `LoadBalancedRetryFactory` while using the Spring Cloud LoadBalancer-backed approach, make sure you annotate your bean with `@Order` and set it to a higher precedence than `1000`, which is the order set on the `BlockingLoadBalancedRetryFactory`.
===== Ribbon-based retries
@@ -475,23 +531,6 @@ For the Spring Cloud LoadBalancer-backed implementation, you can set:
WARN:: If you chose to override the `LoadBalancedRetryFactory` while using the Spring Cloud LoadBalancer-backed approach, make sure you annotate your bean with `@Order` and set it to a higher precedence than `1000`, which is the order set on the `BlockingLoadBalancedRetryFactory`.
-====
-[source,java,indent=0]
-----
-@Configuration
-public class MyConfiguration {
- @Bean
- LoadBalancedRetryFactory retryFactory() {
- return new LoadBalancedRetryFactory() {
- @Override
- public BackOffPolicy createBackOffPolicy(String service) {
- return new ExponentialBackOffPolicy();
- }
- };
- }
-}
-----
-====
NOTE: `client` in the preceding examples should be replaced with your Ribbon client's name.
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerRetryProperties.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerRetryProperties.java
index fbded2dd7..58156e6bd 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerRetryProperties.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerRetryProperties.java
@@ -16,9 +16,12 @@
package org.springframework.cloud.client.loadbalancer;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
+import reactor.util.retry.RetryBackoffSpec;
+
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.http.HttpMethod;
@@ -54,6 +57,11 @@ public class LoadBalancerRetryProperties {
*/
private Set retryableStatusCodes = new HashSet<>();
+ /**
+ * Properties for Reactor Retry backoffs in Spring Cloud LoadBalancer.
+ */
+ private Backoff backoff = new Backoff();
+
/**
* Returns true if the load balancer should retry failed requests.
* @return True if the load balancer should retry failed requests; false otherwise.
@@ -102,4 +110,68 @@ public void setRetryableStatusCodes(Set retryableStatusCodes) {
this.retryableStatusCodes = retryableStatusCodes;
}
+ public Backoff getBackoff() {
+ return backoff;
+ }
+
+ public void setBackoff(Backoff backoff) {
+ this.backoff = backoff;
+ }
+
+ public static class Backoff {
+
+ /**
+ * Indicates whether Reactor Retry backoffs should be applied.
+ */
+ private boolean enabled = false;
+
+ /**
+ * Used to set {@link RetryBackoffSpec#minBackoff}.
+ */
+ private Duration minBackoff = Duration.ofMillis(5);
+
+ /**
+ * Used to set {@link RetryBackoffSpec#maxBackoff}.
+ */
+ private Duration maxBackoff = Duration.ofMillis(Long.MAX_VALUE);
+
+ /**
+ * Used to set {@link RetryBackoffSpec#jitter}.
+ */
+ private double jitter = 0.5d;
+
+ public Duration getMinBackoff() {
+ return minBackoff;
+ }
+
+ public void setMinBackoff(Duration minBackoff) {
+ this.minBackoff = minBackoff;
+ }
+
+ public Duration getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public void setMaxBackoff(Duration maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ }
+
+ public double getJitter() {
+ return jitter;
+ }
+
+ public void setJitter(double jitter) {
+ this.jitter = jitter;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ }
+
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableRequestContext.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableRequestContext.java
new file mode 100644
index 000000000..b49a20931
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableRequestContext.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer;
+
+import java.util.Objects;
+
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.core.style.ToStringCreator;
+
+/**
+ * A request context object that allows storing information on previously used service
+ * instances.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+public class RetryableRequestContext extends DefaultRequestContext {
+
+ private ServiceInstance previousServiceInstance;
+
+ public RetryableRequestContext(ServiceInstance previousServiceInstance) {
+ this.previousServiceInstance = previousServiceInstance;
+ }
+
+ public RetryableRequestContext(ServiceInstance previousServiceInstance, String hint) {
+ super(hint);
+ this.previousServiceInstance = previousServiceInstance;
+ }
+
+ public ServiceInstance getPreviousServiceInstance() {
+ return previousServiceInstance;
+ }
+
+ public void setPreviousServiceInstance(ServiceInstance previousServiceInstance) {
+ this.previousServiceInstance = previousServiceInstance;
+ }
+
+ @Override
+ public String toString() {
+ ToStringCreator to = new ToStringCreator(this);
+ to.append("previousServiceInstance", previousServiceInstance);
+ return to.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof RetryableRequestContext)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RetryableRequestContext context = (RetryableRequestContext) o;
+ return Objects.equals(previousServiceInstance, context.previousServiceInstance);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), previousServiceInstance);
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java
new file mode 100644
index 000000000..8e4ce1512
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ExchangeFilterFunctionUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.net.URI;
+
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+
+/**
+ * A utility class for load-balanced {@link ExchangeFilterFunction} instances.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+public final class ExchangeFilterFunctionUtils {
+
+ private ExchangeFilterFunctionUtils() {
+ throw new IllegalStateException("Can't instantiate a utility class.");
+ }
+
+ static ClientRequest buildClientRequest(ClientRequest request, URI uri) {
+ return ClientRequest.create(request.method(), uri)
+ .headers(headers -> headers.addAll(request.headers()))
+ .cookies(cookies -> cookies.addAll(request.cookies()))
+ .attributes(attributes -> attributes.putAll(request.attributes()))
+ .body(request.body()).build();
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java
new file mode 100644
index 000000000..607215602
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryContext.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.http.HttpMethod;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+
+/**
+ * Stores the data for a load-balanced call that is being retried.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+class LoadBalancerRetryContext {
+
+ private final ClientRequest request;
+
+ private ClientResponse clientResponse;
+
+ private Integer retriesSameServiceInstance = 0;
+
+ private Integer retriesNextServiceInstance = 0;
+
+ LoadBalancerRetryContext(ClientRequest request) {
+ this.request = request;
+ }
+
+ ClientRequest getRequest() {
+ return request;
+ }
+
+ ClientResponse getClientResponse() {
+ return clientResponse;
+ }
+
+ void setClientResponse(ClientResponse clientResponse) {
+ this.clientResponse = clientResponse;
+ }
+
+ Integer getRetriesSameServiceInstance() {
+ return retriesSameServiceInstance;
+ }
+
+ void incrementRetriesSameServiceInstance() {
+ retriesSameServiceInstance++;
+ }
+
+ void resetRetriesSameServiceInstance() {
+ retriesSameServiceInstance = 0;
+ }
+
+ Integer getRetriesNextServiceInstance() {
+ return retriesNextServiceInstance;
+ }
+
+ void incrementRetriesNextServiceInstance() {
+ retriesNextServiceInstance++;
+ }
+
+ Integer getResponseStatusCode() {
+ return clientResponse.statusCode().value();
+ }
+
+ HttpMethod getRequestMethod() {
+ return request.method();
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java
new file mode 100644
index 000000000..35e696674
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerRetryPolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Pluggable policy used to establish whether a given load-balanced call should be
+ * retried.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+public interface LoadBalancerRetryPolicy {
+
+ /**
+ * Return true to retry on the same service instance.
+ * @param context the context for the retry operation
+ * @return true to retry on the same service instance
+ */
+ boolean canRetrySameServiceInstance(LoadBalancerRetryContext context);
+
+ /**
+ * Return true to retry on the next service instance.
+ * @param context the context for the retry operation
+ * @return true to retry on the same service instance
+ */
+ boolean canRetryNextServiceInstance(LoadBalancerRetryContext context);
+
+ /**
+ * Return true to retry on the provided HTTP status code.
+ * @param statusCode the HTTP status code
+ * @return true to retry on the provided HTTP status code
+ */
+ boolean retryableStatusCode(int statusCode);
+
+ /**
+ * Return true to retry on the provided HTTP method.
+ * @param method the HTTP request method
+ * @return true to retry on the provided HTTP method
+ */
+ boolean canRetryOnMethod(HttpMethod method);
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
index 537f2b8ef..88fdd6731 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/ReactorLoadBalancerExchangeFilterFunction.java
@@ -30,6 +30,8 @@
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.buildClientRequest;
+
/**
* An {@link ExchangeFilterFunction} that uses {@link ReactiveLoadBalancer} to execute
* requests against a correct {@link ServiceInstance}.
@@ -102,12 +104,4 @@ private String serviceInstanceUnavailableMessage(String serviceId) {
return "Load balancer does not contain an instance for the service " + serviceId;
}
- private ClientRequest buildClientRequest(ClientRequest request, URI uri) {
- return ClientRequest.create(request.method(), uri)
- .headers(headers -> headers.addAll(request.headers()))
- .cookies(cookies -> cookies.addAll(request.cookies()))
- .attributes(attributes -> attributes.putAll(request.attributes()))
- .body(request.body()).build();
- }
-
}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java
new file mode 100644
index 000000000..13f082b42
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableExchangeFilterFunctionLoadBalancerRetryPolicy.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties;
+import org.springframework.http.HttpMethod;
+
+/**
+ * The default implementation of {@link LoadBalancerRetryPolicy}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+public class RetryableExchangeFilterFunctionLoadBalancerRetryPolicy
+ implements LoadBalancerRetryPolicy {
+
+ private final LoadBalancerRetryProperties properties;
+
+ public RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ LoadBalancerRetryProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean canRetrySameServiceInstance(LoadBalancerRetryContext context) {
+ return context.getRetriesSameServiceInstance() < properties
+ .getMaxRetriesOnSameServiceInstance();
+ }
+
+ @Override
+ public boolean canRetryNextServiceInstance(LoadBalancerRetryContext context) {
+ return context.getRetriesNextServiceInstance() < properties
+ .getMaxRetriesOnNextServiceInstance();
+ }
+
+ @Override
+ public boolean retryableStatusCode(int statusCode) {
+ return properties.getRetryableStatusCodes().contains(statusCode);
+ }
+
+ @Override
+ public boolean canRetryOnMethod(HttpMethod method) {
+ return HttpMethod.GET.equals(method) || properties.isRetryOnAllOperations();
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java
new file mode 100644
index 000000000..c213c4ed3
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunction.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetrySpec;
+
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties;
+import org.springframework.cloud.client.loadbalancer.RetryableRequestContext;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.ExchangeFunction;
+
+import static org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools.reconstructURI;
+import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.buildClientRequest;
+
+/**
+ * An {@link ExchangeFilterFunction} that uses {@link ReactiveLoadBalancer} to execute
+ * requests against a correct {@link ServiceInstance} and Reactor Retries to retry the
+ * call both against the same and the next service instance, based on the provided
+ * {@link LoadBalancerRetryPolicy}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+public class RetryableLoadBalancerExchangeFilterFunction
+ implements ExchangeFilterFunction {
+
+ private static final Log LOG = LogFactory
+ .getLog(RetryableLoadBalancerExchangeFilterFunction.class);
+
+ private static final List> exceptions = Arrays.asList(
+ IOException.class, TimeoutException.class,
+ RetryableStatusCodeException.class);
+
+ private final LoadBalancerRetryPolicy retryPolicy;
+
+ private final LoadBalancerRetryProperties retryProperties;
+
+ private final ReactiveLoadBalancer.Factory loadBalancerFactory;
+
+ public RetryableLoadBalancerExchangeFilterFunction(
+ LoadBalancerRetryPolicy retryPolicy,
+ ReactiveLoadBalancer.Factory loadBalancerFactory,
+ LoadBalancerRetryProperties retryProperties) {
+ this.retryPolicy = retryPolicy;
+ this.loadBalancerFactory = loadBalancerFactory;
+ this.retryProperties = retryProperties;
+ }
+
+ public RetryableLoadBalancerExchangeFilterFunction(
+ ReactiveLoadBalancer.Factory loadBalancerFactory,
+ LoadBalancerRetryProperties retryProperties) {
+ this.retryPolicy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ retryProperties);
+ this.loadBalancerFactory = loadBalancerFactory;
+ this.retryProperties = retryProperties;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public Mono filter(ClientRequest clientRequest,
+ ExchangeFunction next) {
+ LoadBalancerRetryContext loadBalancerRetryContext = new LoadBalancerRetryContext(
+ clientRequest);
+ Retry exchangeRetry = buildRetrySpec(
+ retryProperties.getMaxRetriesOnSameServiceInstance(), true);
+ Retry filterRetry = buildRetrySpec(
+ retryProperties.getMaxRetriesOnNextServiceInstance(), false);
+
+ URI originalUrl = clientRequest.url();
+ String serviceId = originalUrl.getHost();
+ if (serviceId == null) {
+ String message = String.format(
+ "Request URI does not contain a valid hostname: %s",
+ originalUrl.toString());
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(message);
+ }
+ return Mono.just(
+ ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build());
+ }
+ DefaultRequest lbRequest = new DefaultRequest<>(
+ new RetryableRequestContext(null));
+ return Mono.defer(() -> choose(serviceId, lbRequest).flatMap(lbResponse -> {
+ ServiceInstance instance = lbResponse.getServer();
+ lbRequest.setContext(new RetryableRequestContext(instance));
+ if (instance == null) {
+ String message = "LoadBalancer does not contain an instance for the service "
+ + serviceId;
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("LoadBalancer does not contain an instance for the service "
+ + serviceId);
+ }
+ return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE)
+ .body(message).build());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "LoadBalancer has retrieved the instance for service %s: %s",
+ serviceId, instance.getUri()));
+ }
+ ClientRequest newRequest = buildClientRequest(clientRequest,
+ reconstructURI(instance, originalUrl));
+ return next.exchange(newRequest).map(clientResponse -> {
+ loadBalancerRetryContext.setClientResponse(clientResponse);
+ if (shouldRetrySameServiceInstance(loadBalancerRetryContext)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying on status code: %d",
+ clientResponse.statusCode().value()));
+ }
+ throw new RetryableStatusCodeException();
+ }
+ return clientResponse;
+
+ });
+ }).map(clientResponse -> {
+ loadBalancerRetryContext.setClientResponse(clientResponse);
+ if (shouldRetryNextServiceInstance(loadBalancerRetryContext)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying on status code: %d",
+ clientResponse.statusCode().value()));
+ }
+ throw new RetryableStatusCodeException();
+ }
+ return clientResponse;
+
+ }).retryWhen(exchangeRetry)).retryWhen(filterRetry);
+ }
+
+ private Retry buildRetrySpec(int max, boolean transientErrors) {
+ LoadBalancerRetryProperties.Backoff backoffProperties = retryProperties
+ .getBackoff();
+ if (backoffProperties.isEnabled()) {
+ return RetrySpec.backoff(max, backoffProperties.getMinBackoff())
+ .filter(this::isRetryException)
+ .maxBackoff(backoffProperties.getMaxBackoff())
+ .jitter(backoffProperties.getJitter())
+ .transientErrors(transientErrors);
+ }
+ return RetrySpec.max(max).filter(this::isRetryException)
+ .transientErrors(transientErrors);
+ }
+
+ private boolean shouldRetrySameServiceInstance(
+ LoadBalancerRetryContext loadBalancerRetryContext) {
+ boolean shouldRetry = retryPolicy
+ .retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode())
+ && retryPolicy
+ .canRetryOnMethod(loadBalancerRetryContext.getRequestMethod())
+ && retryPolicy.canRetrySameServiceInstance(loadBalancerRetryContext);
+ if (shouldRetry) {
+ loadBalancerRetryContext.incrementRetriesSameServiceInstance();
+ }
+ return shouldRetry;
+ }
+
+ private boolean shouldRetryNextServiceInstance(
+ LoadBalancerRetryContext loadBalancerRetryContext) {
+ boolean shouldRetry = retryPolicy
+ .retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode())
+ && retryPolicy
+ .canRetryOnMethod(loadBalancerRetryContext.getRequestMethod())
+ && retryPolicy.canRetryNextServiceInstance(loadBalancerRetryContext);
+ if (shouldRetry) {
+ loadBalancerRetryContext.incrementRetriesNextServiceInstance();
+ loadBalancerRetryContext.resetRetriesSameServiceInstance();
+ }
+ return shouldRetry;
+ }
+
+ private boolean isRetryException(Throwable throwable) {
+ return exceptions.stream()
+ .anyMatch(exception -> exception.isInstance(throwable)
+ || throwable != null && exception.isInstance(throwable.getCause())
+ || Exceptions.isRetryExhausted(throwable));
+ }
+
+ protected Mono> choose(String serviceId,
+ Request request) {
+ ReactiveLoadBalancer loadBalancer = loadBalancerFactory
+ .getInstance(serviceId);
+ if (loadBalancer == null) {
+ return Mono.just(
+ new org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse());
+ }
+ return Mono.from(loadBalancer.choose(request));
+ }
+
+}
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java
new file mode 100644
index 000000000..b4548b8ae
--- /dev/null
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableStatusCodeException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+/**
+ * An {@link IllegalStateException} used to trigger retries based on the returned HTTP
+ * status code.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+class RetryableStatusCodeException extends IllegalStateException {
+
+}
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java
new file mode 100644
index 000000000..9f57080cc
--- /dev/null
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionIntegrationTests.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.cloud.client.DefaultServiceInstance;
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.discovery.DiscoveryClient;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.BDDAssertions.then;
+import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
+
+/**
+ * Integration tests for {@link RetryableLoadBalancerExchangeFilterFunction}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+@SpringBootTest(webEnvironment = RANDOM_PORT)
+class RetryableLoadBalancerExchangeFilterFunctionIntegrationTests {
+
+ @Autowired
+ private RetryableLoadBalancerExchangeFilterFunction loadBalancerFunction;
+
+ @Autowired
+ private SimpleDiscoveryProperties properties;
+
+ @Autowired
+ private LoadBalancerRetryProperties retryProperties;
+
+ @LocalServerPort
+ private int port;
+
+ @BeforeEach
+ void setUp() {
+ DefaultServiceInstance instance = new DefaultServiceInstance();
+ instance.setServiceId("testservice");
+ instance.setUri(URI.create("http://localhost:" + port));
+ DefaultServiceInstance instanceWithNoLifecycleProcessors = new DefaultServiceInstance();
+ instanceWithNoLifecycleProcessors
+ .setServiceId("serviceWithNoLifecycleProcessors");
+ instanceWithNoLifecycleProcessors.setUri(URI.create("http://localhost:" + port));
+ properties.getInstances().put("testservice", Collections.singletonList(instance));
+ properties.getInstances().put("serviceWithNoLifecycleProcessors",
+ Collections.singletonList(instanceWithNoLifecycleProcessors));
+ }
+
+ @Test
+ void correctResponseReturnedForExistingHostAndInstancePresent() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange()
+ .block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World");
+ }
+
+ @Test
+ void correctResponseReturnedAfterRetryingOnSameServiceInstance() {
+ retryProperties.setMaxRetriesOnSameServiceInstance(1);
+ retryProperties.getRetryableStatusCodes().add(500);
+
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice")
+ .filter(this.loadBalancerFunction).build().get().uri("/exception")
+ .exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World!");
+ }
+
+ // FIXME - flaky test
+ @Disabled
+ @Test
+ void correctResponseReturnedAfterRetryingOnNextServiceInstanceWithBackoff() {
+ retryProperties.getBackoff().setEnabled(true);
+ retryProperties.setMaxRetriesOnSameServiceInstance(1);
+ DefaultServiceInstance goodRetryTestInstance = new DefaultServiceInstance();
+ goodRetryTestInstance.setServiceId("retrytest");
+ goodRetryTestInstance.setUri(URI.create("http://localhost:" + port));
+ DefaultServiceInstance badRetryTestInstance = new DefaultServiceInstance();
+ badRetryTestInstance.setServiceId("retrytest");
+ badRetryTestInstance.setUri(URI.create("http://localhost:" + 8080));
+ properties.getInstances().put("retrytest",
+ Arrays.asList(badRetryTestInstance, goodRetryTestInstance));
+ retryProperties.getRetryableStatusCodes().add(500);
+
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://retrytest")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange()
+ .block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World");
+
+ ClientResponse secondClientResponse = WebClient.builder()
+ .baseUrl("http://retrytest").filter(this.loadBalancerFunction).build()
+ .get().uri("/hello").exchange().block();
+
+ then(secondClientResponse.statusCode()).isEqualTo(HttpStatus.OK);
+ then(secondClientResponse.bodyToMono(String.class).block())
+ .isEqualTo("Hello World");
+ }
+
+ @Test
+ void serviceUnavailableReturnedWhenNoInstancePresent() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http://xxx")
+ .filter(this.loadBalancerFunction).build().get().exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
+ }
+
+ @Test
+ @Disabled
+ // FIXME 3.0.0
+ void badRequestReturnedForIncorrectHost() {
+ ClientResponse clientResponse = WebClient.builder().baseUrl("http:///xxx")
+ .filter(this.loadBalancerFunction).build().get().exchange().block();
+
+ then(clientResponse.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST);
+ }
+
+ @Test
+ void exceptionNotThrownWhenFactoryReturnsNullLifecycleProcessorsMap() {
+ assertThatCode(() -> WebClient.builder()
+ .baseUrl("http://serviceWithNoLifecycleProcessors")
+ .filter(this.loadBalancerFunction).build().get().uri("/hello").exchange()
+ .block()).doesNotThrowAnyException();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @EnableDiscoveryClient
+ @EnableAutoConfiguration
+ @SpringBootConfiguration(proxyBeanMethods = false)
+ @RestController
+ static class Config {
+
+ AtomicInteger exceptionCallsCount = new AtomicInteger();
+
+ @GetMapping("/hello")
+ public String hello() {
+ return "Hello World";
+ }
+
+ @GetMapping("/callback")
+ String callbackTestResult() {
+ return "callbackTestResult";
+ }
+
+ @GetMapping("/exception")
+ String exception() {
+ int callCount = exceptionCallsCount.incrementAndGet();
+ if (callCount % 2 != 0) {
+ throw new IllegalStateException("Test!");
+ }
+ return "Hello World!";
+ }
+
+ @Bean
+ ReactiveLoadBalancer.Factory reactiveLoadBalancerFactory(
+ DiscoveryClient discoveryClient) {
+ return serviceId -> new DiscoveryClientBasedReactiveLoadBalancer(serviceId,
+ discoveryClient);
+ }
+
+ @Bean
+ LoadBalancerRetryProperties loadBalancerRetryProperties() {
+ return new LoadBalancerRetryProperties();
+ }
+
+ @Bean
+ RetryableLoadBalancerExchangeFilterFunction exchangeFilterFunction(
+ LoadBalancerRetryProperties properties,
+ ReactiveLoadBalancer.Factory factory) {
+ return new RetryableLoadBalancerExchangeFilterFunction(
+ new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ properties),
+ factory, properties);
+ }
+
+ }
+
+}
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionTests.java
new file mode 100644
index 000000000..c0ed3a6dd
--- /dev/null
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/reactive/RetryableLoadBalancerExchangeFilterFunctionTests.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2012-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.client.loadbalancer.reactive;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+import reactor.core.publisher.Mono;
+
+import org.springframework.cloud.client.ServiceInstance;
+import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFunction;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link RetryableLoadBalancerExchangeFilterFunction}.
+ *
+ * @author Olga Maciaszek-Sharma
+ * @since 2.2.7
+ */
+@SuppressWarnings("unchecked")
+class RetryableLoadBalancerExchangeFilterFunctionTests {
+
+ private final LoadBalancerRetryProperties properties = new LoadBalancerRetryProperties();
+
+ private final LoadBalancerRetryPolicy policy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ properties);
+
+ private final ReactiveLoadBalancer.Factory factory = mock(
+ ReactiveLoadBalancer.Factory.class);
+
+ private final RetryableLoadBalancerExchangeFilterFunction filterFunction = new RetryableLoadBalancerExchangeFilterFunction(
+ policy, factory, properties);
+
+ private final ClientRequest clientRequest = mock(ClientRequest.class);
+
+ private final ExchangeFunction next = mock(ExchangeFunction.class);
+
+ private final ClientResponse clientResponse = mock(ClientResponse.class);
+
+ private final InOrder inOrder = inOrder(next, factory);
+
+ @BeforeEach
+ void setUp() {
+ properties.setMaxRetriesOnSameServiceInstance(1);
+ properties.getRetryableStatusCodes().add(404);
+ when(clientRequest.url()).thenReturn(URI.create("http://test"));
+ when(factory.getInstance("test")).thenReturn(new TestReactiveLoadBalancer());
+ when(clientRequest.headers()).thenReturn(new HttpHeaders());
+ when(clientRequest.cookies()).thenReturn(new HttpHeaders());
+
+ }
+
+ @Test
+ void shouldRetryOnSameAndNextServiceInstanceOnException() {
+ when(clientRequest.method()).thenReturn(HttpMethod.GET);
+ when(clientResponse.statusCode()).thenReturn(HttpStatus.OK);
+ when(next.exchange(any()))
+ .thenThrow(new IllegalStateException(new IOException()));
+
+ try {
+ filterFunction.filter(clientRequest, next).subscribe();
+ }
+ catch (Exception ignored) {
+ }
+
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ }
+
+ @Test
+ void shouldRetryOnSameAndNextServiceInstanceOnRetryableStatusCode() {
+ when(clientRequest.method()).thenReturn(HttpMethod.GET);
+ when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND);
+ when(next.exchange(any())).thenReturn(Mono.just(clientResponse));
+
+ filterFunction.filter(clientRequest, next).subscribe();
+
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ }
+
+ @Test
+ void shouldNotRetryWhenNoRetryableExceptionOrStatusCode() {
+ when(clientRequest.method()).thenReturn(HttpMethod.GET);
+ when(clientResponse.statusCode()).thenReturn(HttpStatus.OK);
+ when(next.exchange(any())).thenReturn(Mono.just(clientResponse));
+
+ filterFunction.filter(clientRequest, next).subscribe();
+
+ verify(next, times(1)).exchange(any());
+ verify(factory, times(1)).getInstance(any());
+ }
+
+ @Test
+ void shouldNotRetryOnMethodOtherThanGet() {
+ when(clientRequest.method()).thenReturn(HttpMethod.POST);
+ when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND);
+ when(next.exchange(any())).thenReturn(Mono.just(clientResponse));
+
+ filterFunction.filter(clientRequest, next).subscribe();
+
+ verify(next, times(1)).exchange(any());
+ verify(factory, times(1)).getInstance(any());
+ }
+
+ @Test
+ void shouldRetryOnMethodOtherThanGetWhenEnabled() {
+ LoadBalancerRetryProperties properties = new LoadBalancerRetryProperties();
+ properties.setRetryOnAllOperations(true);
+ properties.setMaxRetriesOnSameServiceInstance(1);
+ properties.getRetryableStatusCodes().add(404);
+ LoadBalancerRetryPolicy policy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy(
+ properties);
+ RetryableLoadBalancerExchangeFilterFunction filterFunction = new RetryableLoadBalancerExchangeFilterFunction(
+ policy, factory, properties);
+ when(clientRequest.method()).thenReturn(HttpMethod.POST);
+ when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND);
+ when(next.exchange(any())).thenReturn(Mono.just(clientResponse));
+
+ filterFunction.filter(clientRequest, next).subscribe();
+
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ inOrder.verify(factory, times(1)).getInstance(any());
+ inOrder.verify(next, times(2)).exchange(any());
+ }
+
+}