diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 6b2cff53e..c9aa93327 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -40,6 +40,7 @@ * * @author Olga Maciaszek-Sharma * @author Roman Matiushchenko + * @author Roman Chigvintsev * @since 2.2.0 */ public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier @@ -68,9 +69,9 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega .onlyIf(repeatContext -> this.healthCheck.getRefetchInstances()) .fixedBackoff(healthCheck.getRefetchInstancesInterval()); Flux> aliveInstancesFlux = Flux.defer(delegate) + .repeatWhen(aliveInstancesReplayRepeat) .switchMap(serviceInstances -> healthCheckFlux(serviceInstances) - .map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))) - .repeatWhen(aliveInstancesReplayRepeat); + .map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))); aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1) .refCount(1); } diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index b71d5df28..b75afb09d 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; @@ -59,6 +60,7 @@ * * @author Olga Maciaszek-Sharma * @author Roman Matiushchenko + * @author Roman Chigvintsev */ @ExtendWith(SpringExtension.class) @SpringBootTest(classes = HealthCheckServiceInstanceListSupplierTests.TestApplication.class, @@ -440,6 +442,34 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .thenCancel().verify(VERIFY_TIMEOUT); } + @Test + void shouldRefetchInstancesWithRepeatingHealthCheck() { + healthCheck.setInitialDelay(Duration.ofSeconds(1)); + healthCheck.setRepeatHealthCheck(true); + healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1)); + healthCheck.setRefetchInstances(true); + ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", + port, false); + ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2", + port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class); + when(delegate.get()).thenReturn(Flux.just(Collections.singletonList(serviceInstance1))) + .thenReturn(Flux.just(Collections.singletonList(serviceInstance2))); + BiFunction> healthCheckFunc = healthCheckFunction(webClient); + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, healthCheckFunc) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + }; + return listSupplier.get(); + }).expectSubscription().expectNoEvent(healthCheck.getInitialDelay()).expectNext(Lists.list(serviceInstance1)) + .thenAwait(healthCheck.getRefetchInstancesInterval()).expectNext(Lists.list(serviceInstance2)) + .thenCancel().verify(VERIFY_TIMEOUT); + } + @Test void shouldCacheResultIfAfterPropertiesSetInvoked() { healthCheck.setInitialDelay(Duration.ofSeconds(1));