-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Simplify bulk request execution #20109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Currently, bulk item requests can be any ActionRequest, this commit restricts bulk item requests to DocumentRequest. This simplifies handling failures during bulk requests. Additionally, a new enum is added to DocumentRequest to represent the intended operation to be performed by a document request. Now, index operation type also uses the new enum to specify whether the request should create or index a document.
ff103b3 to
f207ecc
Compare
This commit refactors execution of shard-level bulk operations to use the same failure handling for index, delete and update operations.
f207ecc to
14908f8
Compare
1fa265b to
5677cb6
Compare
5677cb6 to
248ac24
Compare
e515328 to
40b4f39
Compare
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this very carefully and it looks great. It's an amazing restructuring and I'm happy you took it on yourself. I left some minor comments. I also think we can potentially simplify further and make the DocumentRequest inherit from WriteReplicationRequest (and call DocWriteRequest) . I'm not sure but it would be great if you can give it a go.
| UpdateRequest updateRequest = new UpdateRequest(); | ||
| updateRequest.readFrom(in); | ||
| request = updateRequest; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an else and throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion
|
|
||
| OpType(int op) { | ||
| this.op = (byte) op; | ||
| this.lowercase = this.toString().toLowerCase(Locale.ENGLISH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we typically use ROOT for these things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to ROOT, but there are a few places (e.g. DocWriteResponse) where we use ENGLISH too. maybe we should change it to ROOT as well?
| out.writeVInt(requests.size()); | ||
| for (ActionRequest<?> request : requests) { | ||
| for (DocumentRequest<?> request : requests) { | ||
| if (request instanceof IndexRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fold these reads and writes into static methods of DocumentRequest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| if (request == null) { | ||
| continue; | ||
| } | ||
| String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| setResponse(item, item.getPrimaryResponse()); | ||
| BulkItemRequest item = request.items()[requestIndex]; | ||
| DocumentRequest<?> documentRequest = item.request(); | ||
| if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we stay consistent and use isConflictException? (I know it was like this before)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| location = locationToSync(location, result.getLocation()); | ||
| WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard, | ||
| request, requestIndex); | ||
| if (writeResult.getResponse().getResult() != DocWriteResponse.Result.NOOP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not check if writeResult.getLocation() != null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to be more strict as only a noop result can be valid with a null location. changed it to be an assert instead.
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeVInt(id); | ||
| out.writeString(opType); | ||
| out.writeByte(opType.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need bwc here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bwc added
Currently, update action delegates to index and delete actions for replication using a dedicated transport action. This change makes update a replication operation, removing the dedicated transport action. This simplifies bulk execution and removes duplicate logic for update retries and translation. This consolidates the interface for single document write requests. Now on the primary, the update request is translated to an index or delete request before execution and the translated request is sent to copies for replication.
e68723a to
eee0d18
Compare
|
Thanks @bleskes for the review :). I addressed all the minor comments.
I really like the idea of making DocumentRequest inherit from WriteReplicationRequest. While giving this a go, I refactored update operation to be a replication operation (updates are a DocumentWriteRequest but does't use the replication operation instead delegates to index and delete operations for replication). This cleans up the dedicated transport update action ( |
8f01c60 to
02ea4b8
Compare
02ea4b8 to
42bc2d1
Compare
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @areek . Let's leave the merger/inheritance of DocumentRequest and ReplicationRequest alone. None of the solutions seem ideal (update request is not a replication request and the fact it's running on the primary is not a good thing). This PR is great enough and it would be a shame to delay it. We can revisit / evaluate again when have a better idea.
I do have two small asks (next to a question I left in the comments)
- Can we rename
DocumentRequesttoDocWriteRequest? it will be inline with theDocWriteResponsewe have today. - It seems we can get rid of the
<?>in all theDcoumentRequest<?>- I think it will make it simpler to read. See https://gist.github.com/bleskes/7ec26991d7be6c816103b8cac65353a4
| return new UpdateResult(translate, indexRequest, retry, cause, null); | ||
| } else { | ||
| assert translate.getResponseResult() == DocWriteResponse.Result.UPDATED; | ||
| update.setGetResult(updateHelper.extractGetResult(updateRequest, updateRequest.concreteIndex(), indexResponse.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), indexSourceAsBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we now return always the get result on updates? it seems to be different before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to match the logic in TransportUpdateAction.shardOperation but that caused CI failures, I reverted the change to be the same as update operation in bulk
|
It also seems CI isn't happy. |
|
Thanks @bleskes for the feedback. I updated the PR, addressing your comments and the CI is happy. Could you take a look |
|
LGTM. Awesome @areek |
This is a bespoke backport of elastic#20109 for 5.x: Currently, bulk item requests can be any ActionRequest, this PR restricts bulk item requests to DocumentRequest. This simplifies handling failures during bulk requests. Additionally, a new enum is added to DocumentRequest to represent the intended operation to be performed by a document request (create, index, update and delete), which was previously represented with a mix of strings and index request operation type. Now, index request operation type reuses the new enum to specify whether the request should create or index a document. Restricting bulk requests to DocumentRequest further simplifies execution of shard-level bulk operations to use the same failure handling for index, delete and update operations. This PR also fixes a bug which executed delete operations twice for replica copies while executing bulk requests. Relates to elastic#19105 and elastic#20109
Currently, bulk item requests can be any ActionRequest, this PR
restricts bulk item requests to DocumentRequest. This simplifies
handling failures during bulk requests. Additionally, a new enum
is added to DocumentRequest to represent the intended operation
to be performed by a document request (
create,index,updateand
delete), which was previously represented with a mix of stringsand index request operation type.
Now, index request operation type reuses the new enum to specify
whether the request should
createorindexa document.Restricting bulk requests to DocumentRequest further simplifies
execution of shard-level bulk operations to use the same failure
handling for index, delete and update operations.
This PR also fixes a bug which executed delete operations twice for
replica copies while executing bulk requests.
relates #19105