-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Make index and delete operation execute as a single bulk item #21964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make index and delete operation execute as a single bulk item #21964
Conversation
|
@areek thanks. I quickly looked at it and I have a question. What happens when people issue an index request now via the transport client? I believe they still go through the the transportIndexAction mechanics? we need to make sure it's translated and we never send a reroute request to the primaries (I expected some change in the execute method of that action). Also I think we need to change the way the rest layer work (we can do it as a follow up if you prefer) |
|
Thanks for the feedback @bleskes!
when a transport client issues an index request it still goes through transportIndexAction (just like it does for node clients). The only difference in this PR is we use shardBulk action to execute the actual operation.
I took this approach initially, i.e. delegate to bulk action on the transportIndexAction's |
7989e01 to
c8fe192
Compare
|
@bleskes I have updated the PR to use single-item bulk action to execute index and delete operation. Now we delegate single write operation requests to bulk action in
As |
Performance testing by @danielmitterdorfer revealed single index/delete operations have similar performance (indexing throughput) to equivalent single item bulk request. This PR reduces the code paths to executing single write operations, by reusing the logic in (shard) bulk action for executing single operation as a single-item bulk request.
c8fe192 to
c5b09ad
Compare
|
@areek I think with this change, we don't need the logic for IndexRequests in |
| final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); | ||
| final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); | ||
| builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); | ||
| builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to keep INDEX TP around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INDEX TP is currently only used by TransportUpdateAction. I agree that we should remove it or at least rename it to better reflect where it is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, right, renaming it to update would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can migrate TransportUpdateAction to redirect to a single item bulk as well (as a follow up). At that point we can rename bulk to index :)
|
thanks @martijnvg for the comment, I will remove the logic index requests logic in |
|
@bleskes Can you take a look, I reverted index and delete transport actions to be |
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. Left very minor comments and questions. I also asked @jaymode and @martijnvg for some validation.
| } | ||
|
|
||
| public <Request extends ReplicatedWriteRequest<Request>, Response extends ReplicationResponse & WriteResponse> | ||
| WritePrimaryResult<Request, Response> executeSingleItemBulkRequestOnPrimary( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if we can put this something else .. maybe a static method on TransportIndexAction ? It's a shame to cluster this class with BWC code..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved these BWC method to static methods to SingleWriteOperationUtility, (suggestion for better name welcomed). The only downside is now shardOperationOnPrimary and shardOperationOnReplica for bulk shard action is made public, as they are called from transport index and delete actions
| Request request, IndexShard primary) throws Exception { | ||
| BulkItemRequest[] itemRequests = new BulkItemRequest[1]; | ||
| WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); | ||
| request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels weird to mutate the incoming request here. Why is it needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pointing it out. This was left over from using the bulk action directly, I removed mutating the request
| * Performs the delete operation. | ||
| */ | ||
| public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest,DeleteResponse> { | ||
| public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering - should we mark this as deprecated, so we want use internally? not sure how much it buys us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I marked both transport index/delete action as deprecated, now the only other usage of these actions are in TransportUpdateAction. should we use the bulk action directly as a follow up?
|
|
||
| @Override | ||
| protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) { | ||
| ClusterState state = clusterService.state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can't override the execute method itself, I think it will be good to get a sanity check from @jaymode and @martijnvg about potential implications of running the request filter chain twice
| bulkAction.execute(task, bulkRequest, new ActionListener<BulkResponse>() { | ||
| @Override | ||
| public void onResponse(BulkResponse bulkItemResponses) { | ||
| assert bulkItemResponses.getItems().length == 1: "expected only one item in bulk request"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is also the same code here as in delete, I wonder if we should add a generic BWC class that will do all this shared munging (and the code from the bulk action as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I converted them to static helper functions and moved them to SingleWriteOperationUtilty
| * Result of taking the action on the primary. | ||
| */ | ||
| protected class WritePrimaryResult extends PrimaryResult implements RespondingWriteResult { | ||
| protected static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering - why was this needed (the change to static and the explicit generic references)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed because we deal with two types of WritePrimaryResult in indexAction/deleteAction.shardOperationOnPrimary/Replica. WritePrimaryResult with type BulkShardRequest/BulkShardResponse and Index/delete request and response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. kk.
| final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); | ||
| final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); | ||
| builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); | ||
| builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can migrate TransportUpdateAction to redirect to a single item bulk as well (as a follow up). At that point we can rename bulk to index :)
| import static org.mockito.Mockito.verifyZeroInteractions; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| public class TransportIndexActionIngestTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any test that makes sure that single item index ops work with ingest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already test for ingest with bulk action (TransportBulkActionIngestTests) which covers ingest with single item ops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need a way to make sure igest via single item index works?
| checkWriteAction(timeout, | ||
| client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout)); | ||
|
|
||
| checkWriteAction(autoCreateIndex, timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can we remove autoCreateIndex? I mean it can still be created? what am I missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually due to TransportUpdateAction behaving differently (on how it handles cluster blocks when autoCreateIndex is true, instead of throwing cluster block exception, transport update throws master not discovered exception). IMO, update action should be fixed to be consistent here and throw a cluster block exception instead. I re-worked the tests, so the distinction is clear.
| private IndexNameExpressionResolver indexNameExpressionResolver; | ||
| private AutoCreateIndex autoCreateIndex; | ||
| private Settings settings; | ||
| private TransportIndexAction transportIndexAction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should stop using it :) - maybe have a test utility to index a single item using a bulk action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to using bulk action instead
3136ae8 to
180ceef
Compare
|
Thanks @bleskes for the feedback, I addressed your comments. After running bwc tests |
|
thx @areek . I played with this a bit and came to this: areek/elasticsearch@enhancement/use_shard_bulk_for_single_ops...bleskes:use_shard_bulk_for_single_ops What do you think? Also - I think the wait for refresh flag issues are unrelated. They have been showing up on normal CI. I started chasing them too |
|
retest this please |
bleskes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @martijnvg do you mind taking a look at the ingest tests?
|
@danielmitterdorfer has gracefully agreed to benchmark this PR to validate that performance is comparable to his initial research |
martijnvg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, left one minor comment.
| completionHandler.getValue().accept(null); | ||
| assertTrue(action.isExecuted); | ||
| assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one | ||
| verifyZeroInteractions(transportService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also verify that interaction happened with ingestService mock here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index: core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java (date 1482389279000)
+++ core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java (revision )
@@ -28,15 +28,12 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
-import org.elasticsearch.cluster.action.shard.ShardStateAction;
-import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
-import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
@@ -54,7 +51,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
@@ -64,6 +60,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@@ -224,6 +221,7 @@
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
+ verify(executionService, times(1)).executeBulkRequest(any(), any(), any());
// now check success
Iterator<DocWriteRequest> req = bulkDocsItr.getValue().iterator();
@@ -311,6 +309,7 @@
} else {
assertSame(remoteNode1, node.getValue());
}
+ verifyZeroInteractions(executionService);
}
public void testSingleItemBulkActionIngestForward() throws Exception {
|
@areek, @bleskes I have benchmarked the single index operation with async translog fsync *) of the latest version on Areek's branch ("contender") against the merge base (baseline 8aca504). The overhead is basically not measurable:
I used a custom track (ltaxis) which is a stripped down version of the standard nyc_taxis track ("only" 15 million records). The main reason is that this benchmark is doing so much requests that it would not be feasible to run the full nyc_taxis track. *) async translog fsync is enabled to avoid accidentally bottlenecking on the fsync operation. |
…rt of elastic#21964) Performance testing by @danielmitterdorfer revealed single index/delete operations have similar performance (indexing throughput) to equivalent single item bulk request. This PR reduces the code paths to executing single write operations, by reusing the logic in (shard) bulk action for executing single operation as a single-item bulk request. relates to elastic#21964
… of #21964) (#22812) * Make index and delete operation execute as a single bulk item (backport of #21964) Performance testing by @danielmitterdorfer revealed single index/delete operations have similar performance (indexing throughput) to equivalent single item bulk request. This PR reduces the code paths to executing single write operations, by reusing the logic in (shard) bulk action for executing single operation as a single-item bulk request. relates to #21964 * remove awaitfix for IndexWithShadowReplica tests
Currently, update action internally uses deprecated index and delete transport actions. As of elastic#21964, these tranport actions were deprecated in favour of using single item bulk request. In this commit, update action uses single item bulk action.
Currently, update action internally uses deprecated index and delete transport actions. As of #21964, these tranport actions were deprecated in favour of using single item bulk request. In this commit, update action uses single item bulk action.
As @bleskes pointed out in elastic#23069 there were inconsistencies in version handling on 5.x and 5.3 from master due to backport of elastic#21964. This change ensures versions are handled uniformly and fixes minor issues in shard bulk action to be similar to master fixes elastic#23069
Performance testing by @danielmitterdorfer revealed single
index/delete operations have similar performance (indexing
throughput) to equivalent single item bulk request.
This PR reduces the code paths to executing single write
operations, by reusing the logic in (shard) bulk action for
executing single operation as a single-item bulk request.