Skip to content

Commit 67be92c

Browse files
authored
[client] Add simple support for gzip compression (#63230)
Adds a `RestClient.setCompressionEnabled()` setting that will gzip- compress request bodies and add a `Accept-Encoding: gzip` header so that the ES server can send compressed responses.
1 parent 62a74d0 commit 67be92c

File tree

6 files changed

+279
-18
lines changed

6 files changed

+279
-18
lines changed

client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.http.HttpResponse;
2929
import org.apache.http.client.AuthCache;
3030
import org.apache.http.client.ClientProtocolException;
31+
import org.apache.http.client.entity.GzipCompressingEntity;
3132
import org.apache.http.client.entity.GzipDecompressingEntity;
3233
import org.apache.http.client.config.RequestConfig;
3334
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
@@ -50,8 +51,11 @@
5051
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
5152

5253
import javax.net.ssl.SSLHandshakeException;
54+
import java.io.ByteArrayInputStream;
55+
import java.io.ByteArrayOutputStream;
5356
import java.io.Closeable;
5457
import java.io.IOException;
58+
import java.io.InputStream;
5559
import java.net.ConnectException;
5660
import java.net.SocketTimeoutException;
5761
import java.net.URI;
@@ -76,6 +80,7 @@
7680
import java.util.concurrent.ExecutionException;
7781
import java.util.concurrent.atomic.AtomicInteger;
7882
import java.util.stream.Collectors;
83+
import java.util.zip.GZIPOutputStream;
7984

8085
import static java.nio.charset.StandardCharsets.UTF_8;
8186
import static java.util.Collections.singletonList;
@@ -112,15 +117,18 @@ public class RestClient implements Closeable {
112117
private final NodeSelector nodeSelector;
113118
private volatile NodeTuple<List<Node>> nodeTuple;
114119
private final WarningsHandler warningsHandler;
120+
private final boolean compressionEnabled;
115121

116122
RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List<Node> nodes, String pathPrefix,
117-
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) {
123+
FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode,
124+
boolean compressionEnabled) {
118125
this.client = client;
119126
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
120127
this.failureListener = failureListener;
121128
this.pathPrefix = pathPrefix;
122129
this.nodeSelector = nodeSelector;
123130
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
131+
this.compressionEnabled = compressionEnabled;
124132
setNodes(nodes);
125133
}
126134

@@ -543,34 +551,37 @@ private static void addSuppressedException(Exception suppressedException, Except
543551
}
544552
}
545553

546-
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
554+
private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
547555
switch(method.toUpperCase(Locale.ROOT)) {
548556
case HttpDeleteWithEntity.METHOD_NAME:
549-
return addRequestBody(new HttpDeleteWithEntity(uri), entity);
557+
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
550558
case HttpGetWithEntity.METHOD_NAME:
551-
return addRequestBody(new HttpGetWithEntity(uri), entity);
559+
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
552560
case HttpHead.METHOD_NAME:
553-
return addRequestBody(new HttpHead(uri), entity);
561+
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
554562
case HttpOptions.METHOD_NAME:
555-
return addRequestBody(new HttpOptions(uri), entity);
563+
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
556564
case HttpPatch.METHOD_NAME:
557-
return addRequestBody(new HttpPatch(uri), entity);
565+
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
558566
case HttpPost.METHOD_NAME:
559567
HttpPost httpPost = new HttpPost(uri);
560-
addRequestBody(httpPost, entity);
568+
addRequestBody(httpPost, entity, compressionEnabled);
561569
return httpPost;
562570
case HttpPut.METHOD_NAME:
563-
return addRequestBody(new HttpPut(uri), entity);
571+
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
564572
case HttpTrace.METHOD_NAME:
565-
return addRequestBody(new HttpTrace(uri), entity);
573+
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
566574
default:
567575
throw new UnsupportedOperationException("http method not supported: " + method);
568576
}
569577
}
570578

571-
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
579+
private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
572580
if (entity != null) {
573581
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
582+
if (compressionEnabled) {
583+
entity = new ContentCompressingEntity(entity);
584+
}
574585
((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
575586
} else {
576587
throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
@@ -732,7 +743,7 @@ private class InternalRequest {
732743
String ignoreString = params.remove("ignore");
733744
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
734745
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
735-
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
746+
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
736747
this.cancellable = Cancellable.fromRequest(httpRequest);
737748
setHeaders(httpRequest, request.getOptions().getHeaders());
738749
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
@@ -752,6 +763,9 @@ private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeade
752763
httpRequest.addHeader(defaultHeader);
753764
}
754765
}
766+
if (compressionEnabled) {
767+
httpRequest.addHeader("Accept-Encoding", "gzip");
768+
}
755769
}
756770

757771
private void setRequestConfig(HttpRequestBase httpRequest, RequestConfig requestConfig) {
@@ -874,4 +888,36 @@ private static Exception extractAndWrapCause(Exception exception) {
874888
}
875889
return new RuntimeException("error while performing request", exception);
876890
}
891+
892+
/**
893+
* A gzip compressing entity that also implements {@code getContent()}.
894+
*/
895+
public static class ContentCompressingEntity extends GzipCompressingEntity {
896+
897+
public ContentCompressingEntity(HttpEntity entity) {
898+
super(entity);
899+
}
900+
901+
@Override
902+
public InputStream getContent() throws IOException {
903+
ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024);
904+
try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
905+
wrappedEntity.writeTo(gzipOut);
906+
}
907+
return out.asInput();
908+
}
909+
}
910+
911+
/**
912+
* A ByteArrayOutputStream that can be turned into an input stream without copying the underlying buffer.
913+
*/
914+
private static class ByteArrayInputOutputStream extends ByteArrayOutputStream {
915+
ByteArrayInputOutputStream(int size) {
916+
super(size);
917+
}
918+
919+
public InputStream asInput() {
920+
return new ByteArrayInputStream(this.buf, 0, this.count);
921+
}
922+
}
877923
}

client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public final class RestClientBuilder {
5555
private String pathPrefix;
5656
private NodeSelector nodeSelector = NodeSelector.ANY;
5757
private boolean strictDeprecationMode = false;
58+
private boolean compressionEnabled = false;
5859

5960
/**
6061
* Creates a new builder instance and sets the hosts that the client will send requests to.
@@ -181,6 +182,15 @@ public RestClientBuilder setStrictDeprecationMode(boolean strictDeprecationMode)
181182
return this;
182183
}
183184

185+
/**
186+
* Whether the REST client should compress requests using gzip content encoding and add the "Accept-Encoding: gzip"
187+
* header to receive compressed responses.
188+
*/
189+
public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
190+
this.compressionEnabled = compressionEnabled;
191+
return this;
192+
}
193+
184194
/**
185195
* Creates a new {@link RestClient} based on the provided configuration.
186196
*/
@@ -191,7 +201,7 @@ public RestClient build() {
191201
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
192202
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
193203
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
194-
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
204+
pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled);
195205
httpClient.start();
196206
return restClient;
197207
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import com.sun.net.httpserver.HttpExchange;
23+
import com.sun.net.httpserver.HttpHandler;
24+
import com.sun.net.httpserver.HttpServer;
25+
import org.apache.http.HttpEntity;
26+
import org.apache.http.HttpHost;
27+
import org.apache.http.entity.ContentType;
28+
import org.apache.http.entity.StringEntity;
29+
import org.elasticsearch.mocksocket.MockHttpServer;
30+
import org.junit.AfterClass;
31+
import org.junit.Assert;
32+
import org.junit.BeforeClass;
33+
34+
import java.io.ByteArrayOutputStream;
35+
import java.io.IOException;
36+
import java.io.InputStream;
37+
import java.io.OutputStream;
38+
import java.net.InetAddress;
39+
import java.net.InetSocketAddress;
40+
import java.nio.charset.StandardCharsets;
41+
import java.util.concurrent.CompletableFuture;
42+
import java.util.zip.GZIPInputStream;
43+
import java.util.zip.GZIPOutputStream;
44+
45+
public class RestClientGzipCompressionTests extends RestClientTestCase {
46+
47+
private static HttpServer httpServer;
48+
49+
@BeforeClass
50+
public static void startHttpServer() throws Exception {
51+
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
52+
httpServer.createContext("/", new GzipResponseHandler());
53+
httpServer.start();
54+
}
55+
56+
@AfterClass
57+
public static void stopHttpServers() throws IOException {
58+
httpServer.stop(0);
59+
httpServer = null;
60+
}
61+
62+
/**
63+
* A response handler that accepts gzip-encoded data and replies request and response encoding values
64+
* followed by the request body. The response is compressed if "Accept-Encoding" is "gzip".
65+
*/
66+
private static class GzipResponseHandler implements HttpHandler {
67+
@Override
68+
public void handle(HttpExchange exchange) throws IOException {
69+
70+
// Decode body (if any)
71+
String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding");
72+
InputStream body = exchange.getRequestBody();
73+
if ("gzip".equals(contentEncoding)) {
74+
body = new GZIPInputStream(body);
75+
}
76+
byte[] bytes = readAll(body);
77+
78+
boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding"));
79+
if (compress) {
80+
exchange.getResponseHeaders().add("Content-Encoding", "gzip");
81+
}
82+
83+
exchange.sendResponseHeaders(200, 0);
84+
85+
// Encode response if needed
86+
OutputStream out = exchange.getResponseBody();
87+
if (compress) {
88+
out = new GZIPOutputStream(out);
89+
}
90+
91+
// Outputs <request-encoding|null>#<response-encoding|null>#<request-body>
92+
out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8));
93+
out.write('#');
94+
out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8));
95+
out.write('#');
96+
out.write(bytes);
97+
out.close();
98+
99+
exchange.close();
100+
}
101+
}
102+
103+
/** Read all bytes of an input stream and close it. */
104+
private static byte[] readAll(InputStream in) throws IOException {
105+
byte[] buffer = new byte[1024];
106+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
107+
int len = 0;
108+
while ((len = in.read(buffer)) > 0) {
109+
bos.write(buffer, 0, len);
110+
}
111+
in.close();
112+
return bos.toByteArray();
113+
}
114+
115+
private RestClient createClient(boolean enableCompression) {
116+
InetSocketAddress address = httpServer.getAddress();
117+
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
118+
.setCompressionEnabled(enableCompression)
119+
.build();
120+
}
121+
122+
public void testGzipHeaderSync() throws Exception {
123+
RestClient restClient = createClient(false);
124+
125+
// Send non-compressed request, expect compressed response
126+
Request request = new Request("POST", "/");
127+
request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN));
128+
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build());
129+
130+
Response response = restClient.performRequest(request);
131+
132+
HttpEntity entity = response.getEntity();
133+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
134+
Assert.assertEquals("null#gzip#plain request, gzip response", content);
135+
136+
restClient.close();
137+
}
138+
139+
public void testGzipHeaderAsync() throws Exception {
140+
RestClient restClient = createClient(false);
141+
142+
// Send non-compressed request, expect compressed response
143+
Request request = new Request("POST", "/");
144+
request.setEntity(new StringEntity("plain request, gzip response", ContentType.TEXT_PLAIN));
145+
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Accept-Encoding", "gzip").build());
146+
147+
FutureResponse futureResponse = new FutureResponse();
148+
restClient.performRequestAsync(request, futureResponse);
149+
Response response = futureResponse.get();
150+
151+
HttpEntity entity = response.getEntity();
152+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
153+
Assert.assertEquals("null#gzip#plain request, gzip response", content);
154+
155+
restClient.close();
156+
}
157+
158+
public void testCompressingClientSync() throws Exception {
159+
RestClient restClient = createClient(true);
160+
161+
Request request = new Request("POST", "/");
162+
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
163+
164+
Response response = restClient.performRequest(request);
165+
166+
HttpEntity entity = response.getEntity();
167+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
168+
Assert.assertEquals("gzip#gzip#compressing client", content);
169+
170+
restClient.close();
171+
}
172+
173+
public void testCompressingClientAsync() throws Exception {
174+
InetSocketAddress address = httpServer.getAddress();
175+
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
176+
.setCompressionEnabled(true)
177+
.build();
178+
179+
Request request = new Request("POST", "/");
180+
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));
181+
182+
FutureResponse futureResponse = new FutureResponse();
183+
restClient.performRequestAsync(request, futureResponse);
184+
Response response = futureResponse.get();
185+
186+
// Server should report it had a compressed request and sent back a compressed response
187+
HttpEntity entity = response.getEntity();
188+
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
189+
Assert.assertEquals("gzip#gzip#compressing client", content);
190+
191+
restClient.close();
192+
}
193+
194+
public static class FutureResponse extends CompletableFuture<Response> implements ResponseListener {
195+
@Override
196+
public void onSuccess(Response response) {
197+
this.complete(response);
198+
}
199+
200+
@Override
201+
public void onFailure(Exception exception) {
202+
this.completeExceptionally(exception);
203+
}
204+
}
205+
}

client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public RestClient createRestClient(NodeSelector nodeSelector) {
6767
}
6868
nodes = Collections.unmodifiableList(nodes);
6969
failureListener = new HostsTrackingFailureListener();
70-
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false);
70+
return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false);
7171
}
7272

7373
/**

0 commit comments

Comments
 (0)