-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Description
Elasticsearch version (bin/elasticsearch --version):
Tested on both 5.6 and 6.2
Plugins installed: [ None ]
JVM version (java -version): 1.8.0_31-b13
OS version (uname -a if on a Unix-like system): Using the Dockerized version from https://quay.io/repository/pires/docker-elasticsearch
Description of the problem including expected versus actual behavior:
(This is pretty much just a copy of my original description at: https://discuss.elastic.co/t/using-java-high-level-rest-client-does-not-auto-retry-bulk-requests/121724)
If the BulkProcessor is made to use the High Level Rest Client to issue requests, it is unable to issue retries even though it passes through the Retry handler's canRetry logic
Steps to reproduce:
-
Create an index in ES
-
Configure the thread_pool to limit the bulk requests and make more likely a rejection. In my case I used a very small queue size of 2
-
To demonstrate "expected" functionality, create a
BulkProcessorwhich submits requests usingTransportClient
BulkProcessor.builder(client, listener);
-
Submit requests which result in rejection. You will find that the resulting
BulkResponsedoes not contain failures (unlessBackoffPolicywas exhausted), however querying the /_cat/thread_pool will show rejections, and the document count should have went up based on the total submitted, indicating all documents eventually made it via retries. -
Create a BulkProcessor which submits requests using the High Level Rest client's client.bulkAsyncmethod:
BulkProcessor.Builder builder(BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); }
-
Submit requests at a rate to create rejection
-
Perform the same set of inserts, you will find that the
BulkResponsecontains failures and the individualFailureobjects have an ElasticsearchException which contain "type=es_rejected_execution_exception"
Additional Notes
I think the "root" cause is that with the High Level Rest Client, the ElasticsearchException that is extracted is not one of the sub-types such as EsRejectedExceptionException (this is actually documented behavior in the fromXContent method of ElasticsearchException)
I made a naive attempt to modify fromXContent to return the correct typed ElasticsearchException, but in its current form this results in a deadlock during retry attempts due (I think) to the synchronization that occurs in BulkProcessor. You can make it work by setting high enough concurrency but this is a workaround.
Probably not relevant (except for anyone that might stumble on this same issue): We are using Apache Flink with an Elasticsearch sink. We identified this issue during attempts to upgrade from ES 5.6 to 6.2 to get additional features. However Flink's pending ES6 support is High Level Rest client based, and does not include TransportClient support for 6.2. It has code attempting to perform retries but it is never triggered due to the same issue with typed exceptions (and in fact, would deadlock in any case).