Skip to content

Commit 2e553a0

Browse files
committed
fixes issues from spring-cloudgh-683
- WebClient response leaks - potential Scheduled task leak - rework polling to plain reactor operators - remove non atomic operations on ServiceInstance lists related to spring-cloudgh-629
1 parent 070e0a8 commit 2e553a0

File tree

1 file changed

+50
-42
lines changed

1 file changed

+50
-42
lines changed

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616

1717
package org.springframework.cloud.loadbalancer.core;
1818

19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Collections;
2122
import java.util.List;
22-
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.CopyOnWriteArrayList;
2324

2425
import org.apache.commons.logging.Log;
2526
import org.apache.commons.logging.LogFactory;
27+
import reactor.core.Disposable;
2628
import reactor.core.publisher.Flux;
27-
import reactor.core.publisher.FluxSink;
2829
import reactor.core.publisher.Mono;
2930
import reactor.core.scheduler.Schedulers;
3031

32+
import org.springframework.beans.factory.DisposableBean;
3133
import org.springframework.cloud.client.ServiceInstance;
3234
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
3335
import org.springframework.http.HttpStatus;
@@ -40,10 +42,11 @@
4042
* {@link WebClient} to ping the <code>health</code> endpoint of the instances.
4143
*
4244
* @author Olga Maciaszek-Sharma
45+
* @author Roman Matiushchenko
4346
* @since 2.2.0
4447
*/
4548
public class HealthCheckServiceInstanceListSupplier
46-
implements ServiceInstanceListSupplier {
49+
implements ServiceInstanceListSupplier, DisposableBean {
4750

4851
private static final Log LOG = LogFactory
4952
.getLog(HealthCheckServiceInstanceListSupplier.class);
@@ -56,11 +59,11 @@ public class HealthCheckServiceInstanceListSupplier
5659

5760
private final String defaultHealthCheckPath;
5861

59-
private List<ServiceInstance> instances = Collections
60-
.synchronizedList(new ArrayList<>());
62+
private List<ServiceInstance> instances = Collections.emptyList();
6163

62-
private List<ServiceInstance> healthyInstances = Collections
63-
.synchronizedList(new ArrayList<>());
64+
private volatile List<ServiceInstance> healthyInstances = Collections.emptyList();
65+
66+
private Disposable healthCheckDisposable;
6467

6568
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
6669
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
@@ -74,32 +77,31 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega
7477
}
7578

7679
private void initInstances() {
77-
delegate.get().subscribe(delegateInstances -> {
78-
instances.clear();
79-
instances.addAll(delegateInstances);
80-
});
81-
82-
Flux<List<ServiceInstance>> healthCheckFlux = healthCheckFlux();
83-
84-
healthCheckFlux.subscribe(verifiedInstances -> {
85-
healthyInstances.clear();
86-
healthyInstances.addAll(verifiedInstances);
87-
});
80+
healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> {
81+
instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances));
82+
}).thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel())
83+
.subscribe(verifiedInstances -> healthyInstances = verifiedInstances);
8884
}
8985

9086
protected Flux<List<ServiceInstance>> healthCheckFlux() {
91-
return Flux.create(emitter -> Schedulers
92-
.newSingle("Health Check Verifier: " + getServiceId(), true)
93-
.schedulePeriodically(() -> {
94-
List<ServiceInstance> verifiedInstances = new ArrayList<>();
95-
Flux.fromIterable(instances).filterWhen(this::isAlive)
96-
.subscribe(serviceInstance -> {
97-
verifiedInstances.add(serviceInstance);
98-
emitter.next(verifiedInstances);
99-
});
100-
}, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
101-
TimeUnit.MILLISECONDS),
102-
FluxSink.OverflowStrategy.LATEST);
87+
return Flux.defer(() -> {
88+
List<ServiceInstance> result = new CopyOnWriteArrayList<>();
89+
90+
List<Mono<ServiceInstance>> checks = new ArrayList<>();
91+
for (ServiceInstance instance : instances) {
92+
Mono<ServiceInstance> alive = isAlive(instance)
93+
.onErrorResume(throwable -> Mono.empty())
94+
.timeout(healthCheck.getInterval(), Mono.empty()).filter(it -> it)
95+
.map(check -> instance);
96+
97+
checks.add(alive);
98+
}
99+
return Flux.merge(checks).map(alive -> {
100+
result.add(alive);
101+
return result;
102+
}).defaultIfEmpty(result);
103+
}).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval()))
104+
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()));
103105
}
104106

105107
@Override
@@ -109,16 +111,17 @@ public String getServiceId() {
109111

110112
@Override
111113
public Flux<List<ServiceInstance>> get() {
112-
if (!healthyInstances.isEmpty()) {
113-
return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
114-
}
115-
// If there are no healthy instances, it might be better to still retry on all of
116-
// them
117-
if (LOG.isWarnEnabled()) {
118-
LOG.warn(
119-
"No verified healthy instances were found, returning all listed instances.");
120-
}
121-
return Flux.defer(() -> Flux.fromIterable(instances).collectList());
114+
return Flux.defer(() -> {
115+
List<ServiceInstance> it = new ArrayList<>(healthyInstances);
116+
if (it.isEmpty()) {
117+
if (LOG.isWarnEnabled()) {
118+
LOG.warn(
119+
"No verified healthy instances were found, returning all listed instances.");
120+
}
121+
it = instances;
122+
}
123+
return Flux.just(it);
124+
});
122125
}
123126

124127
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
@@ -129,8 +132,13 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
129132
return webClient.get()
130133
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
131134
.path(healthCheckPath).build().toUri())
132-
.exchange()
133-
.map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
135+
.exchange().flatMap(clientResponse -> clientResponse.releaseBody()
136+
.thenReturn(HttpStatus.OK.equals(clientResponse.statusCode())));
137+
}
138+
139+
@Override
140+
public void destroy() {
141+
this.healthCheckDisposable.dispose();
134142
}
135143

136144
}

0 commit comments

Comments
 (0)