Skip to content

Commit 86d8366

Browse files
committed
Polish support for reactive Elasticsearch healthcheck
Fixes gh-21042
1 parent 203878a commit 86d8366

File tree

5 files changed

+182
-18
lines changed

5 files changed

+182
-18
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/elasticsearch/ElasticSearchReactiveHealthContributorAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* {@link ReactiveElasticsearchClient}.
4141
*
4242
* @author Aleksander Lech
43-
* @since 2.3
43+
* @since 2.3.2
4444
*/
4545
@Configuration(proxyBeanMethods = false)
4646
@ConditionalOnClass({ ReactiveElasticsearchClient.class, Flux.class })

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
1414
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
1515
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthContributorAutoConfiguration,\
1616
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseReactiveHealthContributorAutoConfiguration,\
17-
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticSearchRestHealthContributorAutoConfiguration,\
1817
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticSearchReactiveHealthContributorAutoConfiguration,\
18+
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticSearchRestHealthContributorAutoConfiguration,\
1919
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
2020
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
2121
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\

spring-boot-project/spring-boot-actuator/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ dependencies {
5252
optional("org.springframework.amqp:spring-rabbit")
5353
optional("org.springframework.data:spring-data-cassandra")
5454
optional("org.springframework.data:spring-data-couchbase")
55+
optional("org.springframework.data:spring-data-elasticsearch")
5556
optional("org.springframework.data:spring-data-ldap")
5657
optional("org.springframework.data:spring-data-mongodb")
5758
optional("org.springframework.data:spring-data-neo4j")
5859
optional("org.springframework.data:spring-data-redis")
5960
optional("org.springframework.data:spring-data-rest-webmvc")
60-
optional("org.springframework.data:spring-data-elasticsearch")
6161
optional("org.springframework.data:spring-data-solr")
6262
optional("org.springframework.integration:spring-integration-core")
6363
optional("org.springframework.security:spring-security-core")
@@ -80,6 +80,7 @@ dependencies {
8080
testImplementation("org.mockito:mockito-junit-jupiter")
8181
testImplementation("org.skyscreamer:jsonassert")
8282
testImplementation("org.springframework:spring-test")
83+
testImplementation("com.squareup.okhttp3:mockwebserver")
8384

8485
testRuntimeOnly("io.projectreactor.netty:reactor-netty")
8586
testRuntimeOnly("javax.xml.bind:jaxb-api")

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/elasticsearch/ElasticsearchReactiveHealthIndicator.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,31 @@
1616

1717
package org.springframework.boot.actuate.elasticsearch;
1818

19-
import java.util.stream.Collectors;
19+
import java.util.Map;
2020

2121
import reactor.core.publisher.Mono;
2222

2323
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
2424
import org.springframework.boot.actuate.health.Health;
2525
import org.springframework.boot.actuate.health.HealthIndicator;
26+
import org.springframework.core.ParameterizedTypeReference;
2627
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
2728

2829
/**
2930
* {@link HealthIndicator} for an Elasticsearch cluster using a
3031
* {@link ReactiveElasticsearchClient}.
3132
*
33+
* @author Brian Clozel
3234
* @author Aleksander Lech
33-
* @since 2.3
35+
* @since 2.3.2
3436
*/
3537
public class ElasticsearchReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
3638

39+
private static final ParameterizedTypeReference<Map<String, Object>> STRING_OBJECT_MAP = new ParameterizedTypeReference<Map<String, Object>>() {
40+
};
41+
42+
private static final String RED_STATUS = "red";
43+
3744
private final ReactiveElasticsearchClient client;
3845

3946
public ElasticsearchReactiveHealthIndicator(ReactiveElasticsearchClient client) {
@@ -43,19 +50,28 @@ public ElasticsearchReactiveHealthIndicator(ReactiveElasticsearchClient client)
4350

4451
@Override
4552
protected Mono<Health> doHealthCheck(Health.Builder builder) {
46-
return this.client.status().map((status) -> {
47-
if (status.isOk()) {
48-
builder.up();
49-
}
50-
else {
51-
builder.down();
52-
}
53-
54-
builder.withDetails(status.hosts().stream().collect(Collectors
55-
.toMap((host) -> host.getEndpoint().getHostString(), (host) -> host.getState().toString())));
56-
57-
return builder.build();
58-
});
53+
return this.client.execute((callback) -> callback.get().uri("/_cluster/health/").exchange())
54+
.flatMap((response) -> {
55+
if (response.statusCode().is2xxSuccessful()) {
56+
return response.bodyToMono(STRING_OBJECT_MAP).map((body) -> {
57+
String status = (String) body.get("status");
58+
if (RED_STATUS.equals(status)) {
59+
builder.outOfService();
60+
}
61+
else {
62+
builder.up();
63+
}
64+
builder.withDetails(body);
65+
return builder.build();
66+
});
67+
}
68+
else {
69+
builder.down();
70+
builder.withDetail("statusCode", response.rawStatusCode());
71+
builder.withDetail("reasonPhrase", response.statusCode().getReasonPhrase());
72+
return response.releaseBody().thenReturn(builder.build());
73+
}
74+
});
5975
}
6076

6177
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2012-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.elasticsearch;
18+
19+
import java.util.Map;
20+
21+
import okhttp3.mockwebserver.MockResponse;
22+
import okhttp3.mockwebserver.MockWebServer;
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.boot.actuate.health.Health;
28+
import org.springframework.boot.actuate.health.Status;
29+
import org.springframework.data.elasticsearch.client.ClientConfiguration;
30+
import org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient;
31+
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
32+
import org.springframework.http.HttpHeaders;
33+
import org.springframework.http.HttpStatus;
34+
import org.springframework.http.MediaType;
35+
import org.springframework.web.reactive.function.client.WebClient;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
import static org.assertj.core.api.Assertions.entry;
39+
40+
/**
41+
* Tests for {@link ElasticsearchReactiveHealthIndicator}
42+
*
43+
* @author Brian Clozel
44+
*/
45+
class ElasticsearchReactiveHealthIndicatorTests {
46+
47+
private MockWebServer server;
48+
49+
private WebClient.Builder builder;
50+
51+
private ElasticsearchReactiveHealthIndicator healthIndicator;
52+
53+
@BeforeEach
54+
void setup() throws Exception {
55+
this.server = new MockWebServer();
56+
this.server.start();
57+
this.builder = WebClient.builder().baseUrl(this.server.url("/").toString());
58+
ReactiveElasticsearchClient client = DefaultReactiveElasticsearchClient
59+
.create(ClientConfiguration.create(this.server.getHostName() + ":" + this.server.getPort()));
60+
this.healthIndicator = new ElasticsearchReactiveHealthIndicator(client);
61+
}
62+
63+
@AfterEach
64+
void shutdown() throws Exception {
65+
this.server.shutdown();
66+
}
67+
68+
@Test
69+
void elasticsearchIsUp() {
70+
setupMockResponse(200, "green");
71+
Health health = this.healthIndicator.health().block();
72+
assertThat(health.getStatus()).isEqualTo(Status.UP);
73+
assertHealthDetailsWithStatus(health.getDetails(), "green");
74+
}
75+
76+
@Test
77+
void elasticsearchWithYellowStatusIsUp() {
78+
setupMockResponse(200, "yellow");
79+
Health health = this.healthIndicator.health().block();
80+
assertThat(health.getStatus()).isEqualTo(Status.UP);
81+
assertHealthDetailsWithStatus(health.getDetails(), "yellow");
82+
}
83+
84+
@Test
85+
void elasticsearchIsDown() throws Exception {
86+
this.server.shutdown();
87+
Health health = this.healthIndicator.health().block();
88+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
89+
assertThat(health.getDetails().get("error")).asString()
90+
.contains("org.springframework.data.elasticsearch.client.NoReachableHostException");
91+
}
92+
93+
@Test
94+
void elasticsearchIsDownByResponseCode() {
95+
// first enqueue an OK response since the HostChecker first sends a HEAD request
96+
// to "/"
97+
this.server.enqueue(new MockResponse().setResponseCode(HttpStatus.OK.value()));
98+
this.server.enqueue(new MockResponse().setResponseCode(HttpStatus.INTERNAL_SERVER_ERROR.value()));
99+
Health health = this.healthIndicator.health().block();
100+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
101+
assertThat(health.getDetails().get("statusCode")).asString().isEqualTo("500");
102+
assertThat(health.getDetails().get("reasonPhrase")).asString().isEqualTo("Internal Server Error");
103+
}
104+
105+
@Test
106+
void elasticsearchIsOutOfServiceByStatus() {
107+
setupMockResponse(200, "red");
108+
Health health = this.healthIndicator.health().block();
109+
assertThat(health.getStatus()).isEqualTo(Status.OUT_OF_SERVICE);
110+
assertHealthDetailsWithStatus(health.getDetails(), "red");
111+
}
112+
113+
private void assertHealthDetailsWithStatus(Map<String, Object> details, String status) {
114+
assertThat(details).contains(entry("cluster_name", "elasticsearch"), entry("status", status),
115+
entry("timed_out", false), entry("number_of_nodes", 1), entry("number_of_data_nodes", 1),
116+
entry("active_primary_shards", 0), entry("active_shards", 0), entry("relocating_shards", 0),
117+
entry("initializing_shards", 0), entry("unassigned_shards", 0), entry("delayed_unassigned_shards", 0),
118+
entry("number_of_pending_tasks", 0), entry("number_of_in_flight_fetch", 0),
119+
entry("task_max_waiting_in_queue_millis", 0), entry("active_shards_percent_as_number", 100.0));
120+
}
121+
122+
private void setupMockResponse(int responseCode, String status) {
123+
// first enqueue an OK response since the HostChecker first sends a HEAD request
124+
// to "/"
125+
this.server.enqueue(new MockResponse());
126+
MockResponse mockResponse = new MockResponse().setResponseCode(HttpStatus.valueOf(responseCode).value())
127+
.setBody(createJsonResult(responseCode, status))
128+
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
129+
this.server.enqueue(mockResponse);
130+
}
131+
132+
private String createJsonResult(int responseCode, String status) {
133+
if (responseCode == 200) {
134+
return String.format(
135+
"{\"cluster_name\":\"elasticsearch\","
136+
+ "\"status\":\"%s\",\"timed_out\":false,\"number_of_nodes\":1,"
137+
+ "\"number_of_data_nodes\":1,\"active_primary_shards\":0,"
138+
+ "\"active_shards\":0,\"relocating_shards\":0,\"initializing_shards\":0,"
139+
+ "\"unassigned_shards\":0,\"delayed_unassigned_shards\":0,"
140+
+ "\"number_of_pending_tasks\":0,\"number_of_in_flight_fetch\":0,"
141+
+ "\"task_max_waiting_in_queue_millis\":0,\"active_shards_percent_as_number\":100.0}",
142+
status);
143+
}
144+
return "{\n \"error\": \"Server Error\",\n \"status\": " + responseCode + "\n}";
145+
}
146+
147+
}

0 commit comments

Comments
 (0)