Skip to content

Commit cbe8cbb

Browse files
authored
Safe response consumer for rest5client (#1049)
* safe response consumer for rest5client * unused import
1 parent ac80e07 commit cbe8cbb

File tree

3 files changed

+296
-1
lines changed

3 files changed

+296
-1
lines changed

java-client/src/main/java/co/elastic/clients/transport/rest5_client/Rest5ClientOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public Rest5ClientOptions build() {
209209
}
210210

211211
static Rest5ClientOptions initialOptions() {
212-
return new Rest5ClientOptions(RequestOptions.DEFAULT, false);
212+
return new Rest5ClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false);
213213
}
214214

215215
private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package co.elastic.clients.transport.rest5_client;
21+
22+
import co.elastic.clients.transport.rest5_client.low_level.HttpAsyncResponseConsumerFactory;
23+
import co.elastic.clients.transport.rest5_client.low_level.RequestOptions;
24+
import org.apache.hc.core5.concurrent.FutureCallback;
25+
import org.apache.hc.core5.http.EntityDetails;
26+
import org.apache.hc.core5.http.Header;
27+
import org.apache.hc.core5.http.HttpResponse;
28+
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
29+
import org.apache.hc.core5.http.nio.CapacityChannel;
30+
import org.apache.hc.core5.http.protocol.HttpContext;
31+
32+
import java.nio.ByteBuffer;
33+
import java.util.List;
34+
35+
/**
36+
* A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor.
37+
*/
38+
public class SafeResponseConsumer<T> implements AsyncResponseConsumer<T> {
39+
40+
private final AsyncResponseConsumer<T> delegate;
41+
42+
public SafeResponseConsumer(AsyncResponseConsumer<T> delegate) {
43+
this.delegate = delegate;
44+
}
45+
46+
/**
47+
* A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}.
48+
*/
49+
public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>(
50+
RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer()
51+
);
52+
53+
/**
54+
* Same as {@code RequestOptions.DEFAULT} with a safe consumer factory
55+
*/
56+
public static final RequestOptions DEFAULT_REQUEST_OPTIONS = RequestOptions.DEFAULT
57+
.toBuilder()
58+
.setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY)
59+
.build();
60+
61+
@SuppressWarnings("unchecked")
62+
private static <T extends Throwable> void throwUnchecked(Throwable thr) throws T {
63+
throw (T) thr;
64+
}
65+
66+
@Override
67+
public void consumeResponse(HttpResponse response, EntityDetails entityDetails, HttpContext context,
68+
FutureCallback<T> resultCallback) {
69+
try {
70+
delegate.consumeResponse(response, entityDetails, context, resultCallback);
71+
} catch (Exception e) {
72+
throwUnchecked(e);
73+
} catch (Throwable e) {
74+
throw new RuntimeException("Error consuming response", e);
75+
}
76+
}
77+
78+
@Override
79+
public void informationResponse(HttpResponse response, HttpContext context) {
80+
try {
81+
delegate.informationResponse(response, context);
82+
} catch (Exception e) {
83+
throwUnchecked(e);
84+
} catch (Throwable e) {
85+
throw new RuntimeException("Error information response", e);
86+
}
87+
}
88+
89+
@Override
90+
public void failed(Exception cause) {
91+
try {
92+
delegate.failed(cause);
93+
} catch (Exception e) {
94+
throwUnchecked(e);
95+
} catch (Throwable e) {
96+
throw new RuntimeException("Error handling failure", e);
97+
}
98+
}
99+
100+
@Override
101+
public void updateCapacity(CapacityChannel capacityChannel) {
102+
try {
103+
delegate.updateCapacity(capacityChannel);
104+
} catch (Exception e) {
105+
throwUnchecked(e);
106+
} catch (Throwable e) {
107+
throw new RuntimeException("Error updating capacity", e);
108+
}
109+
}
110+
111+
@Override
112+
public void consume(ByteBuffer src) {
113+
try {
114+
delegate.consume(src);
115+
} catch (Exception e) {
116+
throwUnchecked(e);
117+
} catch (Throwable e) {
118+
throw new RuntimeException("Error consuming data", e);
119+
}
120+
}
121+
122+
@Override
123+
public void streamEnd(List<? extends Header> trailers) {
124+
try {
125+
delegate.streamEnd(trailers);
126+
} catch (Exception e) {
127+
throwUnchecked(e);
128+
} catch (Throwable e) {
129+
throw new RuntimeException("Error triggering stream end", e);
130+
}
131+
}
132+
133+
@Override
134+
public void releaseResources() {
135+
delegate.releaseResources();
136+
}
137+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package co.elastic.clients.transport.rest5_client.low_level;
21+
22+
import co.elastic.clients.transport.rest5_client.SafeResponseConsumer;
23+
import com.sun.net.httpserver.HttpServer;
24+
import org.apache.hc.core5.http.ContentType;
25+
import org.apache.hc.core5.http.HttpHost;
26+
import org.apache.hc.core5.http.HttpResponse;
27+
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
28+
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
29+
import org.apache.hc.core5.http.protocol.HttpContext;
30+
import org.junit.jupiter.api.AfterAll;
31+
import org.junit.jupiter.api.Assertions;
32+
import org.junit.jupiter.api.BeforeAll;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.net.InetAddress;
36+
import java.net.InetSocketAddress;
37+
import java.nio.charset.StandardCharsets;
38+
39+
public class SafeResponseConsumerTest {
40+
41+
static HttpServer Server;
42+
static HttpHost ESHost;
43+
44+
// A consumer factory that throws an Error, to simulate the effect of an OOME
45+
static HttpAsyncResponseConsumerFactory FailingConsumerFactory =
46+
() -> new BasicAsyncResponseConsumer(new BufferedByteConsumer(100 * 1024 * 1024)) {
47+
@Override
48+
public void informationResponse(HttpResponse response, HttpContext context) {
49+
super.informationResponse(response, context);
50+
}
51+
52+
@Override
53+
protected BasicClassicHttpResponse buildResult(HttpResponse response, ByteArrayEntity entity,
54+
ContentType contentType) {
55+
super.buildResult(response, entity, contentType);
56+
throw new Error("Error in buildResult");
57+
}
58+
};
59+
60+
@BeforeAll
61+
public static void setup() throws Exception {
62+
Server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
63+
Server.start();
64+
65+
Server.createContext("/", exchange -> {
66+
String path = exchange.getRequestURI().getPath();
67+
exchange.getResponseHeaders().set("Content-Type", "application/json");
68+
exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch");
69+
70+
if (path.equals("/")) {
71+
byte[] bytes = Info.getBytes(StandardCharsets.UTF_8);
72+
exchange.sendResponseHeaders(200, bytes.length);
73+
exchange.getResponseBody().write(bytes);
74+
exchange.close();
75+
return;
76+
}
77+
78+
exchange.sendResponseHeaders(404, -1);
79+
exchange.close();
80+
});
81+
82+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
83+
try {
84+
Server.stop(1);
85+
} catch (Exception e) {
86+
// Ignore
87+
}
88+
}));
89+
90+
ESHost = new HttpHost(Server.getAddress().getAddress(), Server.getAddress().getPort());
91+
}
92+
93+
@AfterAll
94+
public static void tearDown() {
95+
Server.stop(0);
96+
}
97+
98+
// testReactorDeath cannot be tested, as the io reactor thread gets stuck and the test never completes
99+
100+
@Test
101+
public void testReactorSurvival() throws Exception {
102+
103+
// Request options that will simulate an OOME and wrapped in the safe consumer that will
104+
// avoid the reactor's death
105+
RequestOptions.Builder protectedFailingOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
106+
protectedFailingOptionsBuilder.setHttpAsyncResponseConsumerFactory(() ->
107+
new SafeResponseConsumer<>(FailingConsumerFactory.createHttpAsyncResponseConsumer())
108+
);
109+
RequestOptions protectedFailingOptions = protectedFailingOptionsBuilder.build();
110+
111+
Rest5Client restClient = Rest5Client.builder(ESHost).build();
112+
113+
// First request, to warm things up.
114+
// An "indice exists" request, that has no response body
115+
Request existsReq = new Request("HEAD", "/index-name");
116+
restClient.performRequest(existsReq);
117+
118+
try {
119+
Request infoReq = new Request("GET", "/");
120+
infoReq.setOptions(protectedFailingOptions);
121+
122+
restClient.performRequest(infoReq);
123+
Assertions.fail("First request should not succeed");
124+
} catch (Exception t) {
125+
System.err.println("Request 1 error");
126+
}
127+
{
128+
// 2nd request with no specific options
129+
Request infoReq = new Request("GET", "/");
130+
131+
Response resp = restClient.performRequest(infoReq);
132+
Assertions.assertEquals(200, resp.getStatusCode());
133+
}
134+
{
135+
// final request to make sure that the reactor isn't closed
136+
restClient.performRequest(existsReq);
137+
}
138+
restClient.close();
139+
}
140+
141+
private static final String Info = "{\n" +
142+
" \"cluster_name\": \"foo\",\n" +
143+
" \"cluster_uuid\": \"bar\",\n" +
144+
" \"version\": {\n" +
145+
" \"build_date\": \"2022-01-28T08:36:04.875279988Z\",\n" +
146+
" \"minimum_wire_compatibility_version\": \"6.8.0\",\n" +
147+
" \"build_hash\": \"bee86328705acaa9a6daede7140defd4d9ec56bd\",\n" +
148+
" \"number\": \"7.17.0\",\n" +
149+
" \"lucene_version\": \"8.11.1\",\n" +
150+
" \"minimum_index_compatibility_version\": \"6.0.0-beta1\",\n" +
151+
" \"build_flavor\": \"default\",\n" +
152+
" \"build_snapshot\": false,\n" +
153+
" \"build_type\": \"docker\"\n" +
154+
" },\n" +
155+
" \"name\": \"instance-0000000000\",\n" +
156+
" \"tagline\": \"You Know, for Search\"\n" +
157+
"}";
158+
}

0 commit comments

Comments
 (0)