Skip to content

Commit b04d425

Browse files
committed
Fix bug of service instance refetching when repeating health check is activated
Fixes gh-899
1 parent 70ec68f commit b04d425

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega
6868
.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
6969
.fixedBackoff(healthCheck.getRefetchInstancesInterval());
7070
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate)
71+
.repeatWhen(aliveInstancesReplayRepeat)
7172
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances)
72-
.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive))))
73-
.repeatWhen(aliveInstancesReplayRepeat);
73+
.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive))));
7474
aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1)
7575
.refCount(1);
7676
}

spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.BiFunction;
2425

2526
import org.assertj.core.api.Assertions;
2627
import org.assertj.core.util.Lists;
@@ -440,6 +441,34 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
440441
.thenCancel().verify(VERIFY_TIMEOUT);
441442
}
442443

444+
@Test
445+
void shouldRefetchInstancesWithRepeatingHealthCheck() {
446+
healthCheck.setInitialDelay(Duration.ofSeconds(1));
447+
healthCheck.setRepeatHealthCheck(true);
448+
healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1));
449+
healthCheck.setRefetchInstances(true);
450+
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1",
451+
port, false);
452+
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2",
453+
port, false);
454+
455+
StepVerifier.withVirtualTime(() -> {
456+
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
457+
when(delegate.get()).thenReturn(Flux.just(Collections.singletonList(serviceInstance1)))
458+
.thenReturn(Flux.just(Collections.singletonList(serviceInstance2)));
459+
BiFunction<ServiceInstance, String, Mono<Boolean>> healthCheckFunc = healthCheckFunction(webClient);
460+
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, healthCheckFunc) {
461+
@Override
462+
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
463+
return Mono.just(true);
464+
}
465+
};
466+
return listSupplier.get();
467+
}).expectSubscription().expectNoEvent(healthCheck.getInitialDelay()).expectNext(Lists.list(serviceInstance1))
468+
.thenAwait(healthCheck.getRefetchInstancesInterval()).expectNext(Lists.list(serviceInstance2))
469+
.thenCancel().verify(VERIFY_TIMEOUT);
470+
}
471+
443472
@Test
444473
void shouldCacheResultIfAfterPropertiesSetInvoked() {
445474
healthCheck.setInitialDelay(Duration.ofSeconds(1));

0 commit comments

Comments
 (0)