Skip to content

Conversation

@Hakky54
Copy link
Contributor

@Hakky54 Hakky54 commented Mar 13, 2020

This pull request is to enable the RestHighLevelClient to handle an Elasticsearch response which has compressed content. It was already possible to do this with the low level client, see here AsyncElasticsearchCompressedRequest
Compression can enabled within a node configuration with the following property: http.compression: true Compression can be triggered by a request from a client. Therefor you also need to provide additional information within the header of the request to Elasticsearch if a client really wants to enable it. That is possible with the following RequestOptions:

RequestOptions.Builder requestOptions = RequestOptions.DEFAULT.toBuilder()
    .addHeader("Accept-Encoding", "gzip")

With these two properties Elasticsearch will return a gzip compressed response body. The RestHighLevelClient can send a request with the above request options but it couldn't handle the response yet. This feature request contains the option to handle a compressed response if it is compressed with gzip.

@Hakky54
Copy link
Contributor Author

Hakky54 commented Mar 14, 2020

The other possible way to implement could be at this method level at line number 272 of org.elasticsearch.client.RestClient.convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) That would be less code duplicated compared to the already made changes within this pull request.

That would like the following code snippet:

    private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException {
        RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
        int statusCode = httpResponse.getStatusLine().getStatusCode();
        Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), decompressResponseEntityIfRequired(httpResponse));
        if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
            onResponse(node);
            if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                throw new WarningFailureException(response);
            }
            return new ResponseOrResponseException(response);
        }
        ResponseException responseException = new ResponseException(response);
        if (isRetryStatus(statusCode)) {
            //mark host dead and retry against next one
            onFailure(node);
            return new ResponseOrResponseException(responseException);
        }
        //mark host alive and don't retry, as the error should be a request problem
        onResponse(node);
        throw responseException;
    }

    private HttpResponse decompressResponseEntityIfRequired(HttpResponse httpResponse) throws IOException {
        for (Header header : httpResponse.getHeaders(HttpHeaders.CONTENT_ENCODING)) {
            if ("gzip".equalsIgnoreCase(header.getValue())) {
                String decompressedContent = decompressWithGzip(EntityUtils.toByteArray(httpResponse.getEntity()));
                HttpEntity httpEntity = new NStringEntity(decompressedContent, ContentType.get(httpResponse.getEntity()));
                httpResponse.setEntity(httpEntity);
                httpResponse.removeHeader(header);
                break;
            }
        }
        return httpResponse;
    }

    private static String decompressWithGzip(byte[] compressedBytes) throws IOException {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedBytes);
             GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gzipInputStream, StandardCharsets.ISO_8859_1))) {

            return bufferedReader.lines()
                .collect(Collectors.joining("\n"));
        }
    }

This code will decompress responses made from requests with the Rest High Level Client and even Low Level Client. The only downside of this alternative code (so not the one within the code changes but within this comment) is that it is not backwards compatible with existing consumer who have already have their own solution for decompression at the low-level-client. They will not be getting a compressed response anymore from the low-level-client as it has already be decompressed. The change within the rest-high-level client will probably be backwards compatible as decompression was never supported, so no consumer will be affected by this change.

I like the change at RestClient class as it has less code duplicate and the low-level-client as well as the rest-high-level-client can benefit of this feature in one go, but I am afraid that users of the low-level-client will be affected. So it would be safe to go for the change within the RestHighLevelClient class. But I am curious what the reviewers are thinking :)

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (:Core/Features/Java High Level REST Client)

@jakelandis jakelandis requested a review from andreidan March 27, 2020 14:09
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for using Elasticsearch @Hakky54 and for putting together this PR. Great catch on the lack of support for decompressing the response entity.

We use the Apache HTTP client that already has support for decompressing the http entity using the GzipDecompressingEntity. I propose add support for decompressing the entity in the parseEntity method if the Content-Encoding header contains gzip using the apache
GzipDecompressingEntity to wrap the HttpEntity.

I also propose we enhance the HttpCompressionIT with an integration test for the high level rest client.

I believe the low level rest client should remain as it is for now, as that client puts the user in control with most things related to the HTTP interaction.

Thanks for working on this.

@Hakky54
Copy link
Contributor Author

Hakky54 commented Mar 31, 2020

Hi @andreidan thank you for your feedback and I am glad that you appreciate my pull request ! It looks like my initial solution was too verbose, I didn't know of the existing of the GzipDecompressingEntity. I have refactored it and pushed it here. I also enhanced the integration test within the HttpCompressionIT. Please let me know what else could be improved :)

@Hakky54 Hakky54 requested a review from andreidan March 31, 2020 09:38
Copy link
Contributor

@andreidan andreidan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this @Hakky54 This looks pretty good so far.

I've left a few (mostly minor) suggestions. Thanks for working on this!

@Hakky54 Hakky54 requested a review from andreidan April 2, 2020 09:34
@andreidan
Copy link
Contributor

@elasticmachine ok to test

@andreidan andreidan merged commit 4a195b5 into elastic:master Apr 2, 2020
@Hakky54 Hakky54 deleted the feature/add-support-for-decompression-of-compressed-response branch April 2, 2020 16:51
@Hakky54
Copy link
Contributor Author

Hakky54 commented Apr 2, 2020

Thanks @andreidan It feels awesome to contribute back to the community and Elastic!

@javanna
Copy link
Member

javanna commented Apr 3, 2020

Heya, I had left a comment on the original issue, see #53555 (comment) .

We use the async http client rather than the http client, although the former depends on the latter. We are free to use classes from the blocking variant, but they are not always a good fit. The discussion in https://issues.apache.org/jira/browse/HTTPCLIENT-1822 seems to suggest that more work is required to implement automatic decompression using the async primitives that the async client offers, and highlights the steps needed, that I suppose are valid no matter if implemented within the async http client or as a user of it.

I wonder if the implementation that we have introduced is equivalent, and why we have introduced it in our own client and not upstream for all the other users of async http client to benefit from.

@andreidan
Copy link
Contributor

@javanna thanks for raising the apache issue to our attention, I was not aware of it. I agree this would be nice to be handled by the underlying client (and the appropriate place for this) transparently but until there's support in the httpclient I think it's alright for us to support it.

We're handling the decompression when parsing the response, which will keep the networking async so I think this remains in line with the async client guarantees. Did we miss something in terms of being aligned with the async client invariants?

@javanna
Copy link
Member

javanna commented Apr 3, 2020

until there's support in the httpclient I think it's alright for us to support it.

Does this mean that we are going to contribute transparent content decompression back to apache http async client?

I guess it is ok to decompress this way after all for now. I would open the discussion on making automatic content decompression available in the low-level client: it may be possible to use the same approach given the limited set of features it exposes from the underlying async client, or we may have to first contribute back a solution to the async http client library. I don't think users expect to have to decompress stuff themselves. And backwards compatibility should not prevent us from making this improvement.

ollik1 pushed a commit to ollik1/elasticsearch that referenced this pull request Apr 6, 2020
Backporting PR elastic#53533 to
the fork. This can be dropped once we upgrade to 7.8.0 or later.
@andreidan
Copy link
Contributor

Does this mean that we are going to contribute transparent content decompression back to apache http async client?

The way this is implemented at this stage, we can't port it back as it'll have to be a new implementation altogether as this implementation is ES HLRC specific. There isn't anyone in our team (afaik) actively pursuing this.

With regards to using automatic decompression in the LLRC I think you're correct in what the options are.

andreidan pushed a commit to andreidan/elasticsearch that referenced this pull request Apr 6, 2020
…ghLevelClient (elastic#53533)

Added decompression of gzip when gzip value is return as an header from Elasticsearch

(cherry picked from commit 4a195b5)
Signed-off-by: Andrei Dan <[email protected]>
andreidan added a commit that referenced this pull request Apr 6, 2020
…ghLevelClient (#53533) (#54811)

Added decompression of gzip when gzip value is return as an header from Elasticsearch

(cherry picked from commit 4a195b5)
Signed-off-by: Andrei Dan <[email protected]>

Co-authored-by: Hakky54 <[email protected]>
@Hakky54
Copy link
Contributor Author

Hakky54 commented Apr 6, 2020

Should I move the code changes to the LLRC or wait for now till this has been discussed within your team?

@andreidan
Copy link
Contributor

@Hakky54 sorry for the late reply. We discussed this today and agreed having this supported in the LLRC would be very useful, so if you'd like to continue working on this that would be very much appreciated.

@Hakky54
Copy link
Contributor Author

Hakky54 commented Apr 17, 2020

Hi @andreidan that's awesome to hear! I couldn't push the changes to this pull request because I deleted the repository after the branch got merged. I applied the changes within a new feature branch and created a new pull request here: #55413 Could you have a look at it?

Thanks @javanna for proposing to move this option to the llrc!

@rand0m86
Copy link

rand0m86 commented Sep 15, 2020

Hi @andreidan, I see you removed label backport pending from this ticket so let me just double check - are there any plans to add this support to 6.8 branch?

You know I would be totally fine if the methods on RestHighLevelClient would not be final - it would be a way to override parseEntity method and add required functionality ourselves but what are the alternatives? Do you have a code snippet for achieving same result or we need to reimplement high level client?

@rand0m86
Copy link

For anyone else interested in getting GZIP working without update to version 7+ you need to tweak a few places.

The most important bit - response consumer, it should wrap ContentBufferEntity into GzipDecompressingEntity:

/**
 * Basically a copy of {@link HeapBufferedAsyncResponseConsumer} with a support of gzip encoding.
 *
 * Once ES provides out of the box support for compression this class should be dropped.
 */
class GzipSupportingResponseConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {
    private static final int DEFAULT_BUFFER_LENGTH = 4096;
    private final int bufferLimitBytes;
    private volatile HttpResponse response;
    private volatile SimpleInputBuffer buf;

    /**
     * Creates a new instance of this consumer with the provided buffer limit
     */
    GzipSupportingResponseConsumer(int bufferLimit) {
        if (bufferLimit <= 0) {
            throw new IllegalArgumentException("bufferLimit must be greater than 0");
        }
        this.bufferLimitBytes = bufferLimit;
    }

    /**
     * Get the limit of the buffer.
     */
    public int getBufferLimit() {
        return bufferLimitBytes;
    }

    @Override
    protected void onResponseReceived(HttpResponse response) {
        this.response = response;
    }

    @Override
    protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
        long len = entity.getContentLength();
        if (len > bufferLimitBytes) {
            throw new ContentTooLongException("entity content is too long [" + len +
                    "] for the configured buffer limit [" + bufferLimitBytes + "]");
        }
        if (len < 0) {
            len = DEFAULT_BUFFER_LENGTH;
        }
        this.buf = new SimpleInputBuffer((int) len, getByteBufferAllocator());
        ContentBufferEntity cbEntity = new ContentBufferEntity(entity, this.buf);
        this.response.setEntity(new GzipDecompressingEntity(cbEntity));
    }

    /**
     * Returns the instance of {@link ByteBufferAllocator} to use for content buffering.
     * Allows to plug in any {@link ByteBufferAllocator} implementation.
     */
    protected ByteBufferAllocator getByteBufferAllocator() {
        return HeapByteBufferAllocator.INSTANCE;
    }

    @Override
    protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
        this.buf.consumeContent(decoder);
    }

    @Override
    protected HttpResponse buildResult(HttpContext context) {
        return response;
    }

    @Override
    protected void releaseResources() {
        response = null;
    }
}

Next, implement custom HttpAsyncResponseConsumerFactory:

/**
 * Consumer factory which provides {@link GzipSupportingResponseConsumer} instead of
 * default {@link HeapBufferedResponseConsumerFactory} to enable GZIP support for ES calls.
 */
class CompressedHttpAsyncResponseConsumerFactory implements HttpAsyncResponseConsumerFactory {
    //default buffer limit is 100MB
    static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024;

    @Override
    public HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer() {
        return new GzipSupportingResponseConsumer(DEFAULT_BUFFER_LIMIT);
    }
}

Next, use custom made RequestOptions for every high level client call:

private RequestOptions createCustomRequestOptions() {
    RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
    builder.setHttpAsyncResponseConsumerFactory(new CompressedHttpAsyncResponseConsumerFactory());
    return builder.build();
}

private void whatever(SearchRequest searchRequest) {
    SearchResponse response = client.search(searchRequest, createCustomRequestOptions()); // make sure you create options only once, there is no sense to recreate it on each call
}

And now you can enable GZIP on HTTP level:

RestClientBuilder builder = RestClient.builder(httpHost);
builder.setHttpClientConfigCallback(clientBuilder -> {
    clientBuilder.addInterceptorFirst((HttpRequestInterceptor) (request, context) ->
        request.setHeader("Accept-Encoding", "gzip"));

    // Bonus time - compress all your request payloads, not only ES responses!
    clientBuilder.addInterceptorLast((HttpRequestInterceptor) (request, context) -> {
        if (request instanceof HttpEntityEnclosingRequest) {
            HttpEntityEnclosingRequest requestWithPayload = (HttpEntityEnclosingRequest) request;
            requestWithPayload.setEntity(new GzipCompressingEntity(requestWithPayload.getEntity()));
        }
    });
    return clientBuilder;
});

swallez pushed a commit to swallez/elasticsearch that referenced this pull request Sep 30, 2020
…ghLevelClient (elastic#53533)

Added decompression of gzip when gzip value is return as an header from Elasticsearch
@swallez
Copy link
Contributor

swallez commented Sep 30, 2020

@rand0m86 we have finally decided to backport this PR to the 6.8 branch. This has been done in PR #63087

@rand0m86
Copy link

Awesome news, thanks @swallez!

Once it's released I can really simplify my configuration code.
Do you have an idea when the next 6.8.x release version will be available?

Also, any plans to use my approach for compressing request payloads for #62044 (and backport it to 6.8) or do you need help in contributing to that one?

@swallez
Copy link
Contributor

swallez commented Oct 1, 2020

@rand0m86 we are targeting a maintenance release for 6.8 in the near future, likely the end of October. So the wait shouldn't be very long.

About request payloads we're considering adding something like RestClientBuilder.enableCompression() that will enable compression for both requests and responses. Stay tuned!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants