diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index a661c7140..28f73836b 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -43,6 +43,7 @@ * * @author Olga Maciaszek-Sharma * @author Roman Matiushchenko + * @author Roman Chigvintsev * @since 2.2.0 */ public class HealthCheckServiceInstanceListSupplier @@ -73,9 +74,9 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega .onlyIf(repeatContext -> this.healthCheck.getRefetchInstances()) .fixedBackoff(healthCheck.getRefetchInstancesInterval()); Flux> aliveInstancesFlux = Flux.defer(delegate) + .repeatWhen(aliveInstancesReplayRepeat) .switchMap(serviceInstances -> healthCheckFlux(serviceInstances).map( - alive -> Collections.unmodifiableList(new ArrayList<>(alive)))) - .repeatWhen(aliveInstancesReplayRepeat); + alive -> Collections.unmodifiableList(new ArrayList<>(alive)))); aliveInstancesReplay = aliveInstancesFlux .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())) .replay(1).refCount(1); diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index 2318ba38a..78954c94e 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -56,6 +56,7 @@ * * @author Olga Maciaszek-Sharma * @author Roman Matiushchenko + * @author Roman Chigvintsev */ @ExtendWith(SpringExtension.class) @SpringBootTest( @@ -457,6 +458,39 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .verify(VERIFY_TIMEOUT); } + @Test + void shouldRefetchInstancesWithRepeatingHealthCheck() { + healthCheck.setInitialDelay(1000); + healthCheck.setRepeatHealthCheck(true); + healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1)); + healthCheck.setRefetchInstances(true); + ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = mock( + ServiceInstanceListSupplier.class); + when(delegate.get()) + .thenReturn(Flux.just(Collections.singletonList(serviceInstance1))) + .thenReturn(Flux.just(Collections.singletonList(serviceInstance2))); + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + }; + return listSupplier.get(); + }).expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNext(Lists.list(serviceInstance1)) + .thenAwait(healthCheck.getRefetchInstancesInterval()) + .expectNext(Lists.list(serviceInstance2)).thenCancel() + .verify(VERIFY_TIMEOUT); + } + @Test void shouldCacheResultIfAfterPropertiesSetInvoked() { healthCheck.setInitialDelay(1000);