-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Bulk processor#awaitClose to close scheduler #29263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering. I discovered this when adding some tests for the bulk processor, as we mainly test it with the transport client, and the same tests run with the high-level REST client already uncovered a bug.
|
Pinging @elastic/es-core-infra |
nik9000
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
tlrx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks a lot for all the tests. I added minor comments.
| Response response = client().performRequest("PUT", "/test-ro", Collections.emptyMap(), entity); | ||
| assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); | ||
|
|
||
| //ensureGreen(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you remove this?
| try (BulkProcessor processor = initBulkProcessorBuilder(listener) | ||
| .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) | ||
| //set interval and size to high values | ||
| .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we always use these values, maybe we could set them in initBulkProcessorBuilder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we almost always use the same values, but not exactly the same, there are subtle differences. I took these tests from the existing BulkProcessorIT tests, and adapted them from transport client to rest high level client. The idea was to not touch anything and see if things still work the same way, which they kind of do (up to bugs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
| } | ||
| return this.bulkRequestHandler.awaitClose(timeout, unit); | ||
| boolean awaitClose = this.bulkRequestHandler.awaitClose(timeout, unit); | ||
| onClose.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should ensure that the onClose.run() is always executed (like in a finally block) if something goes wrong in bulkRequestHandler.awaitClose(). I don't have a strong feeling about this, and if we decide to add it we should also have a test for it, but I think it would be safer.
tlrx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering.
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering.
When the
BulkProcessoris used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once theBulkProcessoris closed. There are two ways to close theBulkProcessorthough, one is the ordinaryclosemethod and the other one isawaitClose. The former closes the scheduler while the latter doesn't, leaving threads lingering.I discovered this when adding some tests for the bulk processor, as we mainly test it with the transport client, and the same tests run with the high-level REST client already uncovered a bug.