diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java index 941862d980195..7765b1053443e 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.plugin.noop.action.bulk; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -82,7 +83,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } private static class BulkRestBuilderListener extends RestBuilderListener { - private final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, "update", + private final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE, new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 1L, DocWriteResponse.Result.CREATED)); private final RestRequest request; diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java index e1efa1356d19f..aff5863bd9298 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.noop.action.bulk; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; @@ -34,7 +35,7 @@ import org.elasticsearch.transport.TransportService; public class TransportNoopBulkAction extends HandledTransportAction { - private static final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, "update", + private static final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE, new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 1L, DocWriteResponse.Result.CREATED)); @Inject diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/core/src/main/java/org/elasticsearch/action/DocWriteRequest.java new file mode 100644 index 0000000000000..09db7089ff629 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -0,0 +1,203 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action; + +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.VersionType; + +import java.io.IOException; +import java.util.Locale; + +/** + * Generic interface to group ActionRequest, which perform writes to a single document + * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} + */ +public interface DocWriteRequest extends IndicesRequest { + + /** + * Get the index that this request operates on + * @return the index + */ + String index(); + + /** + * Get the type that this request operates on + * @return the type + */ + String type(); + + /** + * Get the id of the document for this request + * @return the id + */ + String id(); + + /** + * Get the options for this request + * @return the indices options + */ + IndicesOptions indicesOptions(); + + /** + * Set the routing for this request + * @return the Request + */ + T routing(String routing); + + /** + * Get the routing for this request + * @return the Routing + */ + String routing(); + + + /** + * Get the parent for this request + * @return the Parent + */ + String parent(); + + /** + * Get the document version for this request + * @return the document version + */ + long version(); + + /** + * Sets the version, which will perform the operation only if a matching + * version exists and no changes happened on the doc since then. + */ + T version(long version); + + /** + * Get the document version type for this request + * @return the document version type + */ + VersionType versionType(); + + /** + * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. + */ + T versionType(VersionType versionType); + + /** + * Get the requested document operation type of the request + * @return the operation type {@link OpType} + */ + OpType opType(); + + /** + * Requested operation type to perform on the document + */ + enum OpType { + /** + * Index the source. If there an existing document with the id, it will + * be replaced. + */ + INDEX(0), + /** + * Creates the resource. Simply adds it to the index, if there is an existing + * document with the id, then it won't be removed. + */ + CREATE(1), + /** Updates a document */ + UPDATE(2), + /** Deletes a document */ + DELETE(3); + + private final byte op; + private final String lowercase; + + OpType(int op) { + this.op = (byte) op; + this.lowercase = this.toString().toLowerCase(Locale.ROOT); + } + + public byte getId() { + return op; + } + + public String getLowercase() { + return lowercase; + } + + public static OpType fromId(byte id) { + switch (id) { + case 0: return INDEX; + case 1: return CREATE; + case 2: return UPDATE; + case 3: return DELETE; + default: throw new IllegalArgumentException("Unknown opType: [" + id + "]"); + } + } + + public static OpType fromString(String sOpType) { + String lowerCase = sOpType.toLowerCase(Locale.ROOT); + for (OpType opType : OpType.values()) { + if (opType.getLowercase().equals(lowerCase)) { + return opType; + } + } + throw new IllegalArgumentException("Unknown opType: [" + sOpType + "]"); + } + } + + /** read a document write (index/delete/update) request */ + static DocWriteRequest readDocumentRequest(StreamInput in) throws IOException { + byte type = in.readByte(); + DocWriteRequest docWriteRequest; + if (type == 0) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.readFrom(in); + docWriteRequest = indexRequest; + } else if (type == 1) { + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.readFrom(in); + docWriteRequest = deleteRequest; + } else if (type == 2) { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.readFrom(in); + docWriteRequest = updateRequest; + } else { + throw new IllegalStateException("invalid request type [" + type+ " ]"); + } + return docWriteRequest; + } + + /** write a document write (index/delete/update) request*/ + static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) throws IOException { + if (request instanceof IndexRequest) { + out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); + } else if (request instanceof DeleteRequest) { + out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); + } else { + throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java deleted file mode 100644 index a90f013a6b9ab..0000000000000 --- a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action; - -import org.elasticsearch.action.support.IndicesOptions; - -/** - * Generic interface to group ActionRequest, which work on single document level - * - * Forces this class return index/type/id getters - */ -public interface DocumentRequest extends IndicesRequest { - - /** - * Get the index that this request operates on - * @return the index - */ - String index(); - - /** - * Get the type that this request operates on - * @return the type - */ - String type(); - - /** - * Get the id of the document for this request - * @return the id - */ - String id(); - - /** - * Get the options for this request - * @return the indices options - */ - IndicesOptions indicesOptions(); - - /** - * Set the routing for this request - * @return the Request - */ - T routing(String routing); - - /** - * Get the routing for this request - * @return the Routing - */ - String routing(); - - - /** - * Get the parent for this request - * @return the Parent - */ - String parent(); - -} diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 760c5781aea0a..0f4835de2e9ea 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -36,7 +36,7 @@ public class BulkItemRequest implements Streamable { private int id; - private ActionRequest request; + private DocWriteRequest request; private volatile BulkItemResponse primaryResponse; private volatile boolean ignoreOnReplica; @@ -44,7 +44,7 @@ public class BulkItemRequest implements Streamable { } - public BulkItemRequest(int id, ActionRequest request) { + public BulkItemRequest(int id, DocWriteRequest request) { assert request instanceof IndicesRequest; this.id = id; this.request = request; @@ -54,14 +54,13 @@ public int id() { return id; } - public ActionRequest request() { + public DocWriteRequest request() { return request; } public String index() { - IndicesRequest indicesRequest = (IndicesRequest) request; - assert indicesRequest.indices().length == 1; - return indicesRequest.indices()[0]; + assert request.indices().length == 1; + return request.indices()[0]; } BulkItemResponse getPrimaryResponse() { @@ -92,15 +91,7 @@ public static BulkItemRequest readBulkItem(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); - byte type = in.readByte(); - if (type == 0) { - request = new IndexRequest(); - } else if (type == 1) { - request = new DeleteRequest(); - } else if (type == 2) { - request = new UpdateRequest(); - } - request.readFrom(in); + request = DocWriteRequest.readDocumentRequest(in); if (in.readBoolean()) { primaryResponse = BulkItemResponse.readBulkItem(in); } @@ -110,14 +101,7 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - if (request instanceof IndexRequest) { - out.writeByte((byte) 0); - } else if (request instanceof DeleteRequest) { - out.writeByte((byte) 1); - } else if (request instanceof UpdateRequest) { - out.writeByte((byte) 2); - } - request.writeTo(out); + DocWriteRequest.writeDocumentRequest(out, request); out.writeOptionalStreamable(primaryResponse); out.writeBoolean(ignoreOnReplica); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 711ee3254020d..094769d48cdc5 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -21,7 +21,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; @@ -51,7 +53,7 @@ public RestStatus status() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startObject(opType); + builder.startObject(opType.getLowercase()); if (failure == null) { response.innerToXContent(builder, params); builder.field(Fields.STATUS, response.status().getStatus()); @@ -185,7 +187,7 @@ public String toString() { private int id; - private String opType; + private OpType opType; private DocWriteResponse response; @@ -195,13 +197,13 @@ public String toString() { } - public BulkItemResponse(int id, String opType, DocWriteResponse response) { + public BulkItemResponse(int id, OpType opType, DocWriteResponse response) { this.id = id; this.opType = opType; this.response = response; } - public BulkItemResponse(int id, String opType, Failure failure) { + public BulkItemResponse(int id, OpType opType, Failure failure) { this.id = id; this.opType = opType; this.failure = failure; @@ -217,7 +219,7 @@ public int getItemId() { /** * The operation type ("index", "create" or "delete"). */ - public String getOpType() { + public OpType getOpType() { return this.opType; } @@ -302,7 +304,11 @@ public static BulkItemResponse readBulkItem(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); - opType = in.readString(); + if (in.getVersion().onOrAfter(Version.V_5_3_0_UNRELEASED)) { + opType = OpType.fromId(in.readByte()); + } else { + opType = OpType.fromString(in.readString()); + } byte type = in.readByte(); if (type == 0) { response = new IndexResponse(); @@ -323,7 +329,11 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - out.writeString(opType); + if (out.getVersion().onOrAfter(Version.V_5_3_0_UNRELEASED)) { + out.writeByte(opType.getId()); + } else { + out.writeString(opType.getLowercase()); + } if (response == null) { out.writeByte((byte) 2); } else { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 966809881ca04..6dacb21b23903 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; @@ -250,24 +250,24 @@ public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws Inter * (for example, if no id is provided, one will be generated, or usage of the create flag). */ public BulkProcessor add(IndexRequest request) { - return add((ActionRequest) request); + return add((DocWriteRequest) request); } /** * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkProcessor add(DeleteRequest request) { - return add((ActionRequest) request); + return add((DocWriteRequest) request); } /** * Adds either a delete or an index request. */ - public BulkProcessor add(ActionRequest request) { + public BulkProcessor add(DocWriteRequest request) { return add(request, null); } - public BulkProcessor add(ActionRequest request, @Nullable Object payload) { + public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) { internalAdd(request, payload); return this; } @@ -282,7 +282,7 @@ protected void ensureOpen() { } } - private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { + private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index afd249be2dea2..846c76c98c2d5 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; @@ -70,7 +71,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques * {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare * the one with the least casts. */ - final List requests = new ArrayList<>(); + final List requests = new ArrayList<>(); List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; @@ -85,14 +86,14 @@ public BulkRequest() { /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(ActionRequest... requests) { - for (ActionRequest request : requests) { + public BulkRequest add(DocWriteRequest... requests) { + for (DocWriteRequest request : requests) { add(request, null); } return this; } - public BulkRequest add(ActionRequest request) { + public BulkRequest add(DocWriteRequest request) { return add(request, null); } @@ -102,7 +103,7 @@ public BulkRequest add(ActionRequest request) { * @param payload Optional payload * @return the current bulk request */ - public BulkRequest add(ActionRequest request, @Nullable Object payload) { + public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { if (request instanceof IndexRequest) { add((IndexRequest) request, payload); } else if (request instanceof DeleteRequest) { @@ -118,8 +119,8 @@ public BulkRequest add(ActionRequest request, @Nullable Object payload) { /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(Iterable requests) { - for (ActionRequest request : requests) { + public BulkRequest add(Iterable requests) { + for (DocWriteRequest request : requests) { add(request); } return this; @@ -205,10 +206,10 @@ private void addPayload(Object payload) { /** * The list of requests in this bulk request. */ - public List requests() { + public List requests() { return this.requests; } - + /** * The list of optional payloads associated with requests in the same order as the requests. Note, elements within * it might be null if no payload has been provided. @@ -507,7 +508,7 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt * @return Whether this bulk request contains index request with an ingest pipeline enabled. */ public boolean hasIndexRequestsWithPipelines() { - for (ActionRequest actionRequest : requests) { + for (DocWriteRequest actionRequest : requests) { if (actionRequest instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { @@ -525,13 +526,13 @@ public ActionRequestValidationException validate() { if (requests.isEmpty()) { validationException = addValidationError("no requests added", validationException); } - for (ActionRequest request : requests) { + for (DocWriteRequest request : requests) { // We first check if refresh has been set if (((WriteRequest) request).getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( "RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException); } - ActionRequestValidationException ex = request.validate(); + ActionRequestValidationException ex = ((WriteRequest) request).validate(); if (ex != null) { if (validationException == null) { validationException = new ActionRequestValidationException(); @@ -549,20 +550,7 @@ public void readFrom(StreamInput in) throws IOException { waitForActiveShards = ActiveShardCount.readFrom(in); int size = in.readVInt(); for (int i = 0; i < size; i++) { - byte type = in.readByte(); - if (type == 0) { - IndexRequest request = new IndexRequest(); - request.readFrom(in); - requests.add(request); - } else if (type == 1) { - DeleteRequest request = new DeleteRequest(); - request.readFrom(in); - requests.add(request); - } else if (type == 2) { - UpdateRequest request = new UpdateRequest(); - request.readFrom(in); - requests.add(request); - } + requests.add(DocWriteRequest.readDocumentRequest(in)); } refreshPolicy = RefreshPolicy.readFrom(in); timeout = new TimeValue(in); @@ -573,15 +561,8 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); - for (ActionRequest request : requests) { - if (request instanceof IndexRequest) { - out.writeByte((byte) 0); - } else if (request instanceof DeleteRequest) { - out.writeByte((byte) 1); - } else if (request instanceof UpdateRequest) { - out.writeByte((byte) 2); - } - request.writeTo(out); + for (DocWriteRequest request : requests) { + DocWriteRequest.writeDocumentRequest(out, request); } refreshPolicy.writeTo(out); timeout.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 1a512ccff9a56..0fa20bd5f1216 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -28,7 +28,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -142,15 +142,9 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener autoCreateIndices = new HashSet<>(); - for (ActionRequest request : bulkRequest.requests) { - if (request instanceof DocumentRequest) { - DocumentRequest req = (DocumentRequest) request; - autoCreateIndices.add(req.index()); - } else { - throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); - } - } + final Set autoCreateIndices = bulkRequest.requests.stream() + .map(DocWriteRequest::index) + .collect(Collectors.toSet()); final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); ClusterState state = clusterService.state(); for (String index : autoCreateIndices) { @@ -176,7 +170,7 @@ public void onFailure(Exception e) { if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { // fail all requests involving this index, if create didnt work for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); + DocWriteRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } @@ -211,30 +205,13 @@ boolean shouldAutoCreate(String index, ClusterState state) { return autoCreateIndex.shouldAutoCreate(index, state); } - private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, ActionRequest request, String index, Exception e) { - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - if (index.equals(indexRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e))); - return true; - } - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - if (index.equals(deleteRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e))); - return true; - } - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - if (index.equals(updateRequest.index())) { - responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e))); - return true; - } - } else { - throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); - } - return false; - } + private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocWriteRequest request, String index, Exception e) { + if (index.equals(request.index())) { + responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e))); + return true; + } + return false; + } /** * This method executes the {@link BulkRequest} and calls the given listener once the request returns. @@ -259,95 +236,56 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); MetaData metaData = clusterState.metaData(); for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); + DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored - if (request == null) { + if (docWriteRequest == null) { continue; } - DocumentRequest documentRequest = (DocumentRequest) request; - if (addFailureIfIndexIsUnavailable(documentRequest, bulkRequest, responses, i, concreteIndices, metaData)) { + if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) { continue; } - Index concreteIndex = concreteIndices.resolveIfAbsent(documentRequest); - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - MappingMetaData mappingMd = null; - final IndexMetaData indexMetaData = metaData.index(concreteIndex); - if (indexMetaData != null) { - mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); - } - try { - indexRequest.resolveRouting(metaData); - indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); - } catch (ElasticsearchParseException | RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), indexRequest.type(), indexRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - } else if (request instanceof DeleteRequest) { - try { - TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)request); - } catch(RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "delete", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - - } else if (request instanceof UpdateRequest) { - try { - TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)request); - } catch(RoutingMissingException e) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e); - BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "update", failure); - responses.set(i, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(i, null); - } - } else { - throw new AssertionError("request type not supported: [" + request.getClass().getName() + "]"); - } + Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); + try { + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + IndexRequest indexRequest = (IndexRequest) docWriteRequest; + MappingMetaData mappingMd = null; + final IndexMetaData indexMetaData = metaData.index(concreteIndex); + if (indexMetaData != null) { + mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); + } + indexRequest.resolveRouting(metaData); + indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName()); + break; + case UPDATE: + TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest); + break; + case DELETE: + TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest); + break; + default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); + } + } catch (ElasticsearchParseException | RoutingMissingException e) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e); + BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); + responses.set(i, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(i, null); + } } // first, go over all the requests and create a ShardId -> Operations mapping Map> requestsByShard = new HashMap<>(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - ActionRequest request = bulkRequest.requests.get(i); - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(indexRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, indexRequest.id(), indexRequest.routing()).shardId(); - List list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.id(), deleteRequest.routing()).shardId(); - List list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index()).getName(); - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.id(), updateRequest.routing()).shardId(); - List list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); + DocWriteRequest request = bulkRequest.requests.get(i); + if (request == null) { + continue; } + String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); + List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); + shardRequests.add(new BulkItemRequest(i, request)); } if (requestsByShard.isEmpty()) { @@ -387,19 +325,9 @@ public void onFailure(Exception e) { // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - if (request.request() instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH), - new BulkItemResponse.Failure(indexName, indexRequest.type(), indexRequest.id(), e))); - } else if (request.request() instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), "delete", - new BulkItemResponse.Failure(indexName, deleteRequest.type(), deleteRequest.id(), e))); - } else if (request.request() instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request.request(); - responses.set(request.id(), new BulkItemResponse(request.id(), "update", - new BulkItemResponse.Failure(indexName, updateRequest.type(), updateRequest.id(), e))); - } + DocWriteRequest docWriteRequest = request.request(); + responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); } if (counter.decrementAndGet() == 0) { finishHim(); @@ -413,9 +341,9 @@ private void finishHim() { } } - private boolean addFailureIfIndexIsUnavailable(DocumentRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, - final ConcreteIndices concreteIndices, - final MetaData metaData) { + private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, + final ConcreteIndices concreteIndices, + final MetaData metaData) { Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); Exception unavailableException = null; if (concreteIndex == null) { @@ -436,15 +364,7 @@ private boolean addFailureIfIndexIsUnavailable(DocumentRequest request, BulkRequ if (unavailableException != null) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), unavailableException); - String operationType = "unknown"; - if (request instanceof IndexRequest) { - operationType = "index"; - } else if (request instanceof DeleteRequest) { - operationType = "delete"; - } else if (request instanceof UpdateRequest) { - operationType = "update"; - } - BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, operationType, failure); + BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); @@ -467,7 +387,7 @@ Index getConcreteIndex(String indexOrAlias) { return indices.get(indexOrAlias); } - Index resolveIfAbsent(DocumentRequest request) { + Index resolveIfAbsent(DocWriteRequest request) { Index concreteIndex = indices.get(request.index()); if (concreteIndex == null) { concreteIndex = indexNameExpressionResolver.concreteSingleIndex(state, request); @@ -508,7 +428,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen }); } - static final class BulkRequestModifier implements Iterator { + static final class BulkRequestModifier implements Iterator { final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; @@ -524,7 +444,7 @@ static final class BulkRequestModifier implements Iterator { } @Override - public ActionRequest next() { + public DocWriteRequest next() { return bulkRequest.requests().get(++currentSlot); } @@ -543,10 +463,10 @@ BulkRequest getBulkRequest() { modifiedBulkRequest.timeout(bulkRequest.timeout()); int slot = 0; - List requests = bulkRequest.requests(); + List requests = bulkRequest.requests(); originalSlots = new int[requests.size()]; // oversize, but that's ok for (int i = 0; i < requests.size(); i++) { - ActionRequest request = requests.get(i); + DocWriteRequest request = requests.get(i); if (failedSlots.get(i) == false) { modifiedBulkRequest.add(request); originalSlots[slot++] = i; @@ -575,7 +495,7 @@ void markCurrentItemAsFailed(Exception e) { // 3) Continue with the next request in the bulk. failedSlots.set(currentSlot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); - itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType().lowercase(), failure)); + itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index c1303c6b56cfd..f34c89c3f013b 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -23,7 +23,8 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.TransportDeleteAction; @@ -31,7 +32,6 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -57,7 +57,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -67,14 +66,9 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; -/** - * Performs the index operation. - */ +/** Performs shard-level bulk (index, delete or update) operations */ public class TransportShardBulkAction extends TransportWriteAction { - private static final String OP_TYPE_UPDATE = "update"; - private static final String OP_TYPE_DELETE = "delete"; - public static final String ACTION_NAME = BulkAction.NAME + "[s]"; private final UpdateHelper updateHelper; @@ -116,8 +110,7 @@ protected WriteResult onPrimaryShard(BulkShardRequest request VersionType[] preVersionTypes = new VersionType[request.items().length]; Translog.Location location = null; for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { - BulkItemRequest item = request.items()[requestIndex]; - location = handleItem(metaData, request, primary, preVersions, preVersionTypes, location, requestIndex, item); + location = executeBulkItemRequest(metaData, primary, request, preVersions, preVersionTypes, location, requestIndex); } BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; @@ -129,203 +122,87 @@ protected WriteResult onPrimaryShard(BulkShardRequest request return new WriteResult<>(response, location); } - private Translog.Location handleItem(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { - if (item.request() instanceof IndexRequest) { - location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item); - } else if (item.request() instanceof DeleteRequest) { - location = delete(request, indexShard, preVersions, preVersionTypes, location, requestIndex, item); - } else if (item.request() instanceof UpdateRequest) { - Tuple tuple = update(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item); - location = tuple.v1(); - item = tuple.v2(); - } else { - throw new IllegalStateException("Unexpected index operation: " + item.request()); - } - - assert item.getPrimaryResponse() != null; - assert preVersionTypes[requestIndex] != null; - return location; - } - - private Translog.Location index(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { - IndexRequest indexRequest = (IndexRequest) item.request(); - preVersions[requestIndex] = indexRequest.version(); - preVersionTypes[requestIndex] = indexRequest.versionType(); + /** Executes bulk item requests and handles request execution exceptions */ + private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard indexShard, + BulkShardRequest request, + long[] preVersions, VersionType[] preVersionTypes, + Translog.Location location, int requestIndex) { + preVersions[requestIndex] = request.items()[requestIndex].request().version(); + preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType(); + DocWriteRequest.OpType opType = request.items()[requestIndex].request().opType(); try { - WriteResult result = shardIndexOperation(request, indexRequest, metaData, indexShard, true); - location = locationToSync(location, result.getLocation()); - // add the response - IndexResponse indexResponse = result.getResponse(); - setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse)); - } catch (Exception e) { - // rethrow the failure if we are going to retry on primary and let parent failure to handle it - if (retryPrimaryException(e)) { - // restore updated versions... - for (int j = 0; j < requestIndex; j++) { - applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]); - } - throw (ElasticsearchException) e; - } - logFailure(e, "index", request.shardId(), indexRequest); - // if its a conflict failure, and we already executed the request on a primary (and we execute it - // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) - // then just use the response we got from the successful execution - if (item.getPrimaryResponse() != null && isConflictException(e)) { - setResponse(item, item.getPrimaryResponse()); + WriteResult writeResult = innerExecuteBulkItemRequest(metaData, indexShard, + request, requestIndex); + if (writeResult.getLocation() != null) { + location = locationToSync(location, writeResult.getLocation()); } else { - setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), - new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e))); + assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP + : "only noop operation can have null next operation"; } - } - return location; - } - - private > void logFailure(Throwable t, String operation, ShardId shardId, ReplicationRequest request) { - if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) { - logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", shardId, operation, request), t); - } else { - logger.debug((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", shardId, operation, request), t); - } - } - - private Translog.Location delete(BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { - DeleteRequest deleteRequest = (DeleteRequest) item.request(); - preVersions[requestIndex] = deleteRequest.version(); - preVersionTypes[requestIndex] = deleteRequest.versionType(); - - try { + // update the bulk item request because update request execution can mutate the bulk item request + BulkItemRequest item = request.items()[requestIndex]; // add the response - final WriteResult writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); - DeleteResponse deleteResponse = writeResult.getResponse(); - location = locationToSync(location, writeResult.getLocation()); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse)); + setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse())); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { // restore updated versions... for (int j = 0; j < requestIndex; j++) { - applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]); + DocWriteRequest docWriteRequest = request.items()[j].request(); + docWriteRequest.version(preVersions[j]); + docWriteRequest.versionType(preVersionTypes[j]); } throw (ElasticsearchException) e; } - logFailure(e, "delete", request.shardId(), deleteRequest); + BulkItemRequest item = request.items()[requestIndex]; + DocWriteRequest docWriteRequest = item.request(); + if (isConflictException(e)) { + logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + request.shardId(), docWriteRequest.opType().getLowercase(), request), e); + } else { + logger.debug((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + request.shardId(), docWriteRequest.opType().getLowercase(), request), e); + } // if its a conflict failure, and we already executed the request on a primary (and we execute it // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) // then just use the response we got from the successful execution if (item.getPrimaryResponse() != null && isConflictException(e)) { setResponse(item, item.getPrimaryResponse()); } else { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, - new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e))); + setResponse(item, new BulkItemResponse(item.id(), docWriteRequest.opType(), + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), e))); } } + assert request.items()[requestIndex].getPrimaryResponse() != null; + assert preVersionTypes[requestIndex] != null; return location; } - private Tuple update(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { - UpdateRequest updateRequest = (UpdateRequest) item.request(); - preVersions[requestIndex] = updateRequest.version(); - preVersionTypes[requestIndex] = updateRequest.versionType(); - // We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE - for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) { - UpdateResult updateResult; - try { - updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard); - } catch (Exception t) { - updateResult = new UpdateResult(null, null, false, t, null); - } - if (updateResult.success()) { - if (updateResult.writeResult != null) { - location = locationToSync(location, updateResult.writeResult.getLocation()); - } - switch (updateResult.result.getResponseResult()) { - case CREATED: - case UPDATED: - @SuppressWarnings("unchecked") - WriteResult result = updateResult.writeResult; - IndexRequest indexRequest = updateResult.request(); - BytesReference indexSourceAsBytes = indexRequest.source(); - // add the response - IndexResponse indexResponse = result.getResponse(); - UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); - if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) || - (updateRequest.fields() != null && updateRequest.fields().length > 0)) { - Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse)); - break; - case DELETED: - @SuppressWarnings("unchecked") - WriteResult writeResult = updateResult.writeResult; - DeleteResponse response = writeResult.getResponse(); - DeleteRequest deleteRequest = updateResult.request(); - updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); - // Replace the update request to the translated delete request to execute on the replica. - item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse)); - break; - case NOOP: - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult)); - item.setIgnoreOnReplica(); // no need to go to the replica - break; - default: - throw new IllegalStateException("Illegal operation " + updateResult.result.getResponseResult()); + private WriteResult innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard, + BulkShardRequest request, int requestIndex) throws Exception { + DocWriteRequest itemRequest = request.items()[requestIndex].request(); + switch (itemRequest.opType()) { + case CREATE: + case INDEX: + return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction); + case UPDATE: + int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict(); + for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { + try { + return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest)); + } catch (Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (attemptCount == maxAttempts // bubble up exception when we run out of attempts + || (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict + throw e; + } + } } - // NOTE: Breaking out of the retry_on_conflict loop! - break; - } else if (updateResult.failure()) { - Throwable e = updateResult.error; - if (updateResult.retry) { - // updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration - if (updateAttemptsCount >= updateRequest.retryOnConflict()) { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, - new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e))); - } - } else { - // rethrow the failure if we are going to retry on primary and let parent failure to handle it - if (retryPrimaryException(e)) { - // restore updated versions... - for (int j = 0; j < requestIndex; j++) { - applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]); - } - throw (ElasticsearchException) e; - } - // if its a conflict failure, and we already executed the request on a primary (and we execute it - // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) - // then just use the response we got from the successful execution - if (item.getPrimaryResponse() != null && isConflictException(e)) { - setResponse(item, item.getPrimaryResponse()); - } else if (updateResult.result == null) { - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e))); - } else { - switch (updateResult.result.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = updateResult.request(); - logFailure(e, "index", request.shardId(), indexRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, - new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e))); - break; - case DELETED: - DeleteRequest deleteRequest = updateResult.request(); - logFailure(e, "delete", request.shardId(), deleteRequest); - setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, - new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e))); - break; - default: - throw new IllegalStateException("Illegal operation " + updateResult.result.getResponseResult()); - } - } - // NOTE: Breaking out of the retry_on_conflict loop! - break; - } - - } + throw new IllegalStateException("version conflict exception should bubble up on last attempt"); + case DELETE: + return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard); + default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); } - return Tuple.tuple(location, item); } private void setResponse(BulkItemRequest request, BulkItemResponse response) { @@ -338,106 +215,49 @@ private void setResponse(BulkItemRequest request, BulkItemResponse response) { } } - private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, IndexMetaData metaData, - IndexShard indexShard, boolean processed) throws Exception { - - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - if (!processed) { - indexRequest.process(mappingMd, allowIdGeneration, request.index()); - } - return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); - } - - static class UpdateResult { - - final UpdateHelper.Result result; - final ActionRequest actionRequest; - final boolean retry; - final Throwable error; - final WriteResult writeResult; - final UpdateResponse noopResult; - - UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) { - this.result = result; - this.actionRequest = actionRequest; - this.retry = retry; - this.error = error; - this.writeResult = writeResult; - this.noopResult = null; - } - - UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, WriteResult writeResult) { - this.result = result; - this.actionRequest = actionRequest; - this.writeResult = writeResult; - this.retry = false; - this.error = null; - this.noopResult = null; - } - - public UpdateResult(UpdateHelper.Result result, UpdateResponse updateResponse) { - this.result = result; - this.noopResult = updateResponse; - this.actionRequest = null; - this.writeResult = null; - this.retry = false; - this.error = null; - } - - - boolean failure() { - return error != null; - } - - boolean success() { - return noopResult != null || writeResult != null; - } - - @SuppressWarnings("unchecked") - T request() { - return (T) actionRequest; - } - - - } - - private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) { + /** + * Executes update request, doing a get and translating update to a index or delete operation + * NOTE: all operations except NOOP, reassigns the bulk item request + */ + private WriteResult shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard, + BulkShardRequest request, + int requestIndex, UpdateRequest updateRequest) + throws Exception { + // Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard, threadPool::estimatedTimeInMillis); switch (translate.getResponseResult()) { case CREATED: case UPDATED: IndexRequest indexRequest = translate.action(); - try { - WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, metaData, indexShard, false); - return new UpdateResult(translate, indexRequest, result); - } catch (Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - boolean retry = false; - if (cause instanceof VersionConflictEngineException) { - retry = true; - } - return new UpdateResult(translate, indexRequest, retry, cause, null); + MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); + indexRequest.process(mappingMd, allowIdGeneration, request.index()); + WriteResult writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); + BytesReference indexSourceAsBytes = indexRequest.source(); + IndexResponse indexResponse = writeResult.getResponse(); + UpdateResponse update = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); + if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) || + (updateRequest.fields() != null && updateRequest.fields().length > 0)) { + Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); + update.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); } + // Replace the update request to the translated index request to execute on the replica. + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); + return new WriteResult<>(update, writeResult.getLocation()); case DELETED: DeleteRequest deleteRequest = translate.action(); - try { - WriteResult result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); - return new UpdateResult(translate, deleteRequest, result); - } catch (Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - boolean retry = false; - if (cause instanceof VersionConflictEngineException) { - retry = true; - } - return new UpdateResult(translate, deleteRequest, retry, cause, null); - } + WriteResult deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); + DeleteResponse response = deleteResult.getResponse(); + UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); + deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null)); + // Replace the update request to the translated delete request to execute on the replica. + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); + return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation()); case NOOP: - UpdateResponse updateResponse = translate.action(); + BulkItemRequest item = request.items()[requestIndex]; indexShard.noopUpdate(updateRequest.type()); - return new UpdateResult(translate, updateResponse); - default: - throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } + item.setIgnoreOnReplica(); // no need to go to the replica + return new WriteResult<>(translate.action(), null); + default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); } } @Override @@ -448,50 +268,32 @@ protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShar if (item == null || item.isIgnoreOnReplica()) { continue; } - if (item.request() instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) item.request(); - try { - Engine.Index operation = TransportIndexAction.executeIndexRequestOnReplica(indexRequest, indexShard); - location = locationToSync(location, operation.getTranslogLocation()); - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!ignoreReplicaException(e)) { - throw e; - } + DocWriteRequest docWriteRequest = item.request(); + final Engine.Operation operation; + try { + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + operation = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), indexShard); + break; + case DELETE: + operation = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), indexShard); + break; + default: throw new IllegalStateException("Unexpected request operation type on replica: " + + docWriteRequest.opType().getLowercase()); } - } else if (item.request() instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) item.request(); - try { - Engine.Delete delete = TransportDeleteAction.executeDeleteRequestOnReplica(deleteRequest, indexShard); - indexShard.delete(delete); - location = locationToSync(location, delete.getTranslogLocation()); - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!ignoreReplicaException(e)) { - throw e; - } + location = locationToSync(location, operation.getTranslogLocation()); + } catch (Exception e) { + // if its not an ignore replica failure, we need to make sure to bubble up the failure + // so we will fail the shard + if (!ignoreReplicaException(e)) { + throw e; } - } else { - throw new IllegalStateException("Unexpected index operation: " + item.request()); } } return location; } - private void applyVersion(BulkItemRequest item, long version, VersionType versionType) { - if (item.request() instanceof IndexRequest) { - ((IndexRequest) item.request()).version(version).versionType(versionType); - } else if (item.request() instanceof DeleteRequest) { - ((DeleteRequest) item.request()).version(version).versionType(); - } else if (item.request() instanceof UpdateRequest) { - ((UpdateRequest) item.request()).version(version).versionType(); - } else { - // log? - } - } - private Translog.Location locationToSync(Translog.Location current, Translog.Location next) { /* here we are moving forward in the translog with each operation. Under the hood * this might cross translog files which is ok since from the user perspective diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index b033b15e8e3fd..3e72cdaf43393 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -45,7 +45,7 @@ * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ReplicatedWriteRequest implements DocumentRequest { +public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest { private String type; private String id; @@ -170,28 +170,33 @@ public String routing() { return this.routing; } - /** - * Sets the version, which will cause the delete operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public DeleteRequest version(long version) { this.version = version; return this; } + @Override public long version() { return this.version; } + @Override public DeleteRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } + @Override + public OpType opType() { + return OpType.DELETE; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 0c263c8c8d631..71df06e35f3dc 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -22,7 +22,7 @@ import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; @@ -70,68 +70,7 @@ * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ReplicatedWriteRequest implements DocumentRequest { - - /** - * Operation type controls if the type of the index operation. - */ - public enum OpType { - /** - * Index the source. If there an existing document with the id, it will - * be replaced. - */ - INDEX((byte) 0), - /** - * Creates the resource. Simply adds it to the index, if there is an existing - * document with the id, then it won't be removed. - */ - CREATE((byte) 1); - - private final byte id; - private final String lowercase; - - OpType(byte id) { - this.id = id; - this.lowercase = this.toString().toLowerCase(Locale.ENGLISH); - } - - /** - * The internal representation of the operation type. - */ - public byte id() { - return id; - } - - public String lowercase() { - return this.lowercase; - } - - /** - * Constructs the operation type from its internal representation. - */ - public static OpType fromId(byte id) { - if (id == 0) { - return INDEX; - } else if (id == 1) { - return CREATE; - } else { - throw new IllegalArgumentException("No type match for [" + id + "]"); - } - } - - public static OpType fromString(String sOpType) { - String lowersOpType = sOpType.toLowerCase(Locale.ROOT); - switch (lowersOpType) { - case "create": - return OpType.CREATE; - case "index": - return OpType.INDEX; - default: - throw new IllegalArgumentException("opType [" + sOpType + "] not allowed, either [index] or [create] are allowed"); - } - } - - } +public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest { private String type; private String id; @@ -513,16 +452,27 @@ public IndexRequest source(byte[] source, int offset, int length) { * Sets the type of operation to perform. */ public IndexRequest opType(OpType opType) { + if (opType != OpType.CREATE && opType != OpType.INDEX) { + throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]"); + } this.opType = opType; return this; } /** - * Sets a string representation of the {@link #opType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can + * Sets a string representation of the {@link #opType(OpType)}. Can * be either "index" or "create". */ public IndexRequest opType(String opType) { - return opType(OpType.fromString(opType)); + String op = opType.toLowerCase(Locale.ROOT); + if (op.equals("create")) { + opType(OpType.CREATE); + } else if (op.equals("index")) { + opType(OpType.INDEX); + } else { + throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]"); + } + return this; } @@ -537,17 +487,12 @@ public IndexRequest create(boolean create) { } } - /** - * The type of operation to perform. - */ + @Override public OpType opType() { return this.opType; } - /** - * Sets the version, which will cause the index operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public IndexRequest version(long version) { this.version = version; return this; @@ -557,6 +502,7 @@ public IndexRequest version(long version) { * Returns stored version. If currently stored version is {@link Versions#MATCH_ANY} and * opType is {@link OpType#CREATE}, returns {@link Versions#MATCH_DELETED}. */ + @Override public long version() { return resolveVersionDefaults(); } @@ -572,14 +518,13 @@ private long resolveVersionDefaults() { } } - /** - * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. - */ + @Override public IndexRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } @@ -671,7 +616,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(timestamp); out.writeOptionalWriteable(ttl); out.writeBytesReference(source); - out.writeByte(opType.id()); + out.writeByte(opType.getId()); // ES versions below 5.1.2 don't know about resolveVersionDefaults but resolve the version eagerly (which messes with validation). if (out.getVersion().before(Version.V_5_1_2_UNRELEASED)) { out.writeLong(resolveVersionDefaults()); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java index f5d676b3e6072..ba012af4f220a 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -172,7 +173,7 @@ public IndexRequestBuilder setContentType(XContentType contentType) { /** * Sets the type of operation to perform. */ - public IndexRequestBuilder setOpType(IndexRequest.OpType opType) { + public IndexRequestBuilder setOpType(DocWriteRequest.OpType opType) { request.opType(opType); return this; } diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java index a660ede0ba84d..b83713e3a6afc 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsRequest.java @@ -21,7 +21,6 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RealtimeRequest; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.get.MultiGetRequest; @@ -56,7 +55,7 @@ * Note, the {@link #index()}, {@link #type(String)} and {@link #id(String)} are * required. */ -public class TermVectorsRequest extends SingleShardRequest implements DocumentRequest, RealtimeRequest { +public class TermVectorsRequest extends SingleShardRequest implements RealtimeRequest { private String type; @@ -200,7 +199,6 @@ public TermVectorsRequest type(String type) { /** * Returns the type of document to get the term vector for. */ - @Override public String type() { return type; } @@ -208,7 +206,6 @@ public String type() { /** * Returns the id of document the term vector is requested for. */ - @Override public String id() { return id; } @@ -250,18 +247,15 @@ public TermVectorsRequest doc(BytesReference doc, boolean generateRandomId) { /** * @return The routing for this request. */ - @Override public String routing() { return routing; } - @Override public TermVectorsRequest routing(String routing) { this.routing = routing; return this; } - @Override public String parent() { return parent; } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index a57c3c3479c5c..460a59848a350 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.update; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; @@ -53,7 +53,7 @@ /** */ public class UpdateRequest extends InstanceShardOperationRequest - implements DocumentRequest, WriteRequest { + implements DocWriteRequest, WriteRequest { private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(UpdateRequest.class)); @@ -470,31 +470,33 @@ public int retryOnConflict() { return this.retryOnConflict; } - /** - * Sets the version, which will cause the index operation to only be performed if a matching - * version exists and no changes happened on the doc since then. - */ + @Override public UpdateRequest version(long version) { this.version = version; return this; } + @Override public long version() { return this.version; } - /** - * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. - */ + @Override public UpdateRequest versionType(VersionType versionType) { this.versionType = versionType; return this; } + @Override public VersionType versionType() { return this.versionType; } + @Override + public OpType opType() { + return OpType.UPDATE; + } + @Override public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) { this.refreshPolicy = refreshPolicy; diff --git a/core/src/main/java/org/elasticsearch/index/mapper/Uid.java b/core/src/main/java/org/elasticsearch/index/mapper/Uid.java index 2a8938b4ab7ff..344c8dc0cc091 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/Uid.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/Uid.java @@ -21,12 +21,10 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.common.lucene.BytesRefs; import java.util.Collection; import java.util.Collections; -import java.util.List; /** * diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 57acb5598f0f8..8129c66e76641 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateApplier; @@ -68,7 +68,7 @@ protected void doRun() throws Exception { }); } - public void executeBulkRequest(Iterable actionRequests, + public void executeBulkRequest(Iterable actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler) { threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { @@ -80,7 +80,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - for (ActionRequest actionRequest : actionRequests) { + for (DocWriteRequest actionRequest : actionRequests) { if ((actionRequest instanceof IndexRequest)) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index 3b1cf48c40d1c..0e67c1ac49c86 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -84,7 +84,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } if (sOpType != null) { - indexRequest.opType(IndexRequest.OpType.fromString(sOpType)); + indexRequest.opType(sOpType); } return channel -> diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index 491314cbd19ba..149ae086a38d6 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -26,7 +26,7 @@ import java.util.Set; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.index.shard.ShardId; @@ -113,12 +113,12 @@ public void onFailure(Exception e) { }); List originalResponses = new ArrayList<>(); - for (ActionRequest actionRequest : bulkRequest.requests()) { + for (DocWriteRequest actionRequest : bulkRequest.requests()) { IndexRequest indexRequest = (IndexRequest) actionRequest; IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), indexRequest.id(), 1, true); originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), - indexRequest.opType().lowercase(), indexResponse)); + indexRequest.opType(), indexResponse)); } bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0)); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 8c383e98a8b82..a4a5f6f5ba658 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -20,8 +20,8 @@ package org.elasticsearch.action.bulk; import org.apache.lucene.util.Constants; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -113,7 +113,7 @@ public void testBulkAllowExplicitIndex() throws Exception { public void testBulkAddIterable() { BulkRequest bulkRequest = Requests.bulkRequest(); - List requests = new ArrayList<>(); + List requests = new ArrayList<>(); requests.add(new IndexRequest("test", "test", "id").source("field", "value")); requests.add(new UpdateRequest("test", "test", "id").doc("field", "value")); requests.add(new DeleteRequest("test", "test", "id")); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 7e3b44f79bb09..a824b61a8a699 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -47,6 +47,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.function.Function; +import static org.elasticsearch.action.DocWriteRequest.OpType; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import org.elasticsearch.script.ScriptType; @@ -316,7 +317,7 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat(response.getItems()[i].getVersion(), equalTo(1L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(1L)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(1)); @@ -354,7 +355,7 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat(response.getItems()[i].getVersion(), equalTo(2L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(2L)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(2)); @@ -378,7 +379,7 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat(response.getItems()[i].getVersion(), equalTo(3L)); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); } } @@ -395,7 +396,7 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); } builder = client().prepareBulk(); @@ -411,7 +412,7 @@ public void testBulkUpdateLargerVolume() throws Exception { assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); assertThat(response.getItems()[i].getIndex(), equalTo("test")); assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); for (int j = 0; j < 5; j++) { GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).get(); assertThat(getResponse.isExists(), equalTo(false)); @@ -754,12 +755,12 @@ public void testThatFailedUpdateRequestReturnsCorrectType() throws Exception { assertNoFailures(indexBulkItemResponse); assertThat(bulkItemResponse.getItems().length, is(6)); - assertThat(bulkItemResponse.getItems()[0].getOpType(), is("index")); - assertThat(bulkItemResponse.getItems()[1].getOpType(), is("index")); - assertThat(bulkItemResponse.getItems()[2].getOpType(), is("update")); - assertThat(bulkItemResponse.getItems()[3].getOpType(), is("update")); - assertThat(bulkItemResponse.getItems()[4].getOpType(), is("delete")); - assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete")); + assertThat(bulkItemResponse.getItems()[0].getOpType(), is(OpType.INDEX)); + assertThat(bulkItemResponse.getItems()[1].getOpType(), is(OpType.INDEX)); + assertThat(bulkItemResponse.getItems()[2].getOpType(), is(OpType.UPDATE)); + assertThat(bulkItemResponse.getItems()[3].getOpType(), is(OpType.UPDATE)); + assertThat(bulkItemResponse.getItems()[4].getOpType(), is(OpType.DELETE)); + assertThat(bulkItemResponse.getItems()[5].getOpType(), is(OpType.DELETE)); } private static String indexOrAlias() { @@ -804,9 +805,9 @@ public void testFailedRequestsOnClosedIndex() throws Exception { assertThat(bulkResponse.hasFailures(), is(true)); BulkItemResponse[] responseItems = bulkResponse.getItems(); assertThat(responseItems.length, is(3)); - assertThat(responseItems[0].getOpType(), is("index")); - assertThat(responseItems[1].getOpType(), is("update")); - assertThat(responseItems[2].getOpType(), is("delete")); + assertThat(responseItems[0].getOpType(), is(OpType.INDEX)); + assertThat(responseItems[1].getOpType(), is(OpType.UPDATE)); + assertThat(responseItems[2].getOpType(), is(OpType.DELETE)); } // issue 9821 @@ -816,9 +817,9 @@ public void testInvalidIndexNamesCorrectOpType() { .add(client().prepareUpdate().setIndex("INVALID.NAME").setType("type1").setId("1").setDoc("field", randomInt())) .add(client().prepareDelete().setIndex("INVALID.NAME").setType("type1").setId("1")).get(); assertThat(bulkResponse.getItems().length, is(3)); - assertThat(bulkResponse.getItems()[0].getOpType(), is("index")); - assertThat(bulkResponse.getItems()[1].getOpType(), is("update")); - assertThat(bulkResponse.getItems()[2].getOpType(), is("delete")); + assertThat(bulkResponse.getItems()[0].getOpType(), is(OpType.INDEX)); + assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE)); + assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE)); } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index 0cb4fee3ce72d..ea36597499528 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; @@ -229,11 +230,11 @@ public void bulk(BulkRequest request, ActionListener listener) { } private BulkItemResponse successfulResponse() { - return new BulkItemResponse(1, "update", new DeleteResponse()); + return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse()); } private BulkItemResponse failedResponse() { - return new BulkItemResponse(1, "update", new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full"))); + return new BulkItemResponse(1, OpType.INDEX, new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full"))); } } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 306711278455a..f18b9354a3971 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -79,7 +80,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { @Captor ArgumentCaptor> remoteResponseHandler; @Captor - ArgumentCaptor> bulkDocsItr; + ArgumentCaptor> bulkDocsItr; /** The actual action we want to test, with real indexing mocked */ TestTransportBulkAction action; @@ -191,7 +192,7 @@ public void testIngestLocal() throws Exception { assertTrue(failureCalled.get()); // now check success - Iterator req = bulkDocsItr.getValue().iterator(); + Iterator req = bulkDocsItr.getValue().iterator(); failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index f5252f39fa2f2..8873620d1a953 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.common.unit.TimeValue; @@ -46,18 +47,24 @@ public void testIndexRequestOpTypeFromString() throws Exception { String createUpper = "CREATE"; String indexUpper = "INDEX"; - assertThat(IndexRequest.OpType.fromString(create), equalTo(IndexRequest.OpType.CREATE)); - assertThat(IndexRequest.OpType.fromString(index), equalTo(IndexRequest.OpType.INDEX)); - assertThat(IndexRequest.OpType.fromString(createUpper), equalTo(IndexRequest.OpType.CREATE)); - assertThat(IndexRequest.OpType.fromString(indexUpper), equalTo(IndexRequest.OpType.INDEX)); + IndexRequest indexRequest = new IndexRequest(""); + indexRequest.opType(create); + assertThat(indexRequest.opType() , equalTo(DocWriteRequest.OpType.CREATE)); + indexRequest.opType(createUpper); + assertThat(indexRequest.opType() , equalTo(DocWriteRequest.OpType.CREATE)); + indexRequest.opType(index); + assertThat(indexRequest.opType() , equalTo(DocWriteRequest.OpType.INDEX)); + indexRequest.opType(indexUpper); + assertThat(indexRequest.opType() , equalTo(DocWriteRequest.OpType.INDEX)); } public void testReadBogusString() { try { - IndexRequest.OpType.fromString("foobar"); + IndexRequest indexRequest = new IndexRequest(""); + indexRequest.opType("foobar"); fail("Expected IllegalArgumentException"); } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("opType [foobar] not allowed")); + assertThat(e.getMessage(), equalTo("opType must be 'create' or 'index', found: [foobar]")); } } diff --git a/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java b/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java index 91c2655b3c2ca..2c20c819de75c 100644 --- a/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java +++ b/core/src/test/java/org/elasticsearch/document/DocumentActionsIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.document; +import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; @@ -191,31 +192,31 @@ public void testBulk() throws Exception { assertThat(bulkResponse.getItems().length, equalTo(5)); assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[0].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[0].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[0].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[0].getId(), equalTo("1")); assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[1].getOpType(), equalTo("create")); + assertThat(bulkResponse.getItems()[1].getOpType(), equalTo(OpType.CREATE)); assertThat(bulkResponse.getItems()[1].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[1].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[1].getId(), equalTo("2")); assertThat(bulkResponse.getItems()[2].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[2].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[2].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[2].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[2].getType(), equalTo("type1")); String generatedId3 = bulkResponse.getItems()[2].getId(); assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false)); - assertThat(bulkResponse.getItems()[3].getOpType(), equalTo("delete")); + assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE)); assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1")); assertThat(bulkResponse.getItems()[3].getId(), equalTo("1")); assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true)); - assertThat(bulkResponse.getItems()[4].getOpType(), equalTo("index")); + assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX)); assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName())); assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1")); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 8edb4eaf59afd..947cb3f18d1c4 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -320,7 +320,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { int numRequest = scaledRandomIntBetween(8, 64); int numIndexRequests = 0; for (int i = 0; i < numRequest; i++) { - ActionRequest request; + DocWriteRequest request; if (randomBoolean()) { if (randomBoolean()) { request = new DeleteRequest("_index", "_type", "_id"); diff --git a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java index d8cf1e7b5ec61..2490134db4e85 100644 --- a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java +++ b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.routing; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -259,7 +260,7 @@ public void testRequiredRoutingBulk() throws Exception { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("index")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); @@ -280,7 +281,7 @@ public void testRequiredRoutingBulk() throws Exception { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("update")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); @@ -301,7 +302,7 @@ public void testRequiredRoutingBulk() throws Exception { for (BulkItemResponse bulkItemResponse : bulkResponse) { assertThat(bulkItemResponse.isFailed(), equalTo(true)); - assertThat(bulkItemResponse.getOpType(), equalTo("delete")); + assertThat(bulkItemResponse.getOpType(), equalTo(DocWriteRequest.OpType.DELETE)); assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); diff --git a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java index fe76fb4f97b14..bb0aea0704469 100644 --- a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -21,9 +21,9 @@ import org.apache.lucene.util.TestUtil; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.lucene.uid.Versions; @@ -719,7 +719,7 @@ public void testDeleteNotLost() throws Exception { client() .prepareIndex("test", "type", "id") .setSource("foo", "bar") - .setOpType(IndexRequest.OpType.INDEX) + .setOpType(DocWriteRequest.OpType.INDEX) .setVersion(10) .setVersionType(VersionType.EXTERNAL) .execute() @@ -788,7 +788,7 @@ public void testGCDeletesZero() throws Exception { client() .prepareIndex("test", "type", "id") .setSource("foo", "bar") - .setOpType(IndexRequest.OpType.INDEX) + .setOpType(DocWriteRequest.OpType.INDEX) .setVersion(10) .setVersionType(VersionType.EXTERNAL) .execute() diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 1e14fed000ad0..eaf23f9cbde98 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -367,20 +368,20 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) { } switch (item.getOpType()) { - case "index": - case "create": - IndexResponse ir = item.getResponse(); - if (ir.getResult() == DocWriteResponse.Result.CREATED) { - task.countCreated(); - } else { - task.countUpdated(); - } - break; - case "delete": - task.countDeleted(); - break; - default: - throw new IllegalArgumentException("Unknown op type: " + item.getOpType()); + case CREATE: + case INDEX: + if (item.getResponse().getResult() == DocWriteResponse.Result.CREATED) { + task.countCreated(); + } else { + task.countUpdated(); + } + break; + case UPDATE: + task.countUpdated(); + break; + case DELETE: + task.countDeleted(); + break; } // Track the indexes we've seen so we can refresh them if requested destinationIndicesThisBatch.add(item.getIndex()); @@ -512,7 +513,7 @@ void setScroll(String scroll) { /** * Wrapper for the {@link IndexRequest} or {@link DeleteRequest} that are used in this action class. */ - interface RequestWrapper { + interface RequestWrapper> { void setIndex(String index); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index a55c3725bb591..f800cd04a532d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -28,6 +28,8 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.DocWriteResponse.Result; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -47,7 +49,6 @@ import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; @@ -259,35 +260,36 @@ public void testBulkResponseSetsLotsOfStatus() { BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 100)]; for (int i = 0; i < responses.length; i++) { ShardId shardId = new ShardId(new Index("name", "uid"), 0); - String opType; if (rarely()) { - opType = randomSimpleString(random()); versionConflicts++; - responses[i] = new BulkItemResponse(i, opType, new Failure(shardId.getIndexName(), "type", "id" + i, - new VersionConflictEngineException(shardId, "type", "id", "test"))); + responses[i] = new BulkItemResponse(i, randomFrom(DocWriteRequest.OpType.values()), + new Failure(shardId.getIndexName(), "type", "id" + i, + new VersionConflictEngineException(shardId, "type", "id", "test"))); continue; } boolean createdResponse; + DocWriteRequest.OpType opType; switch (randomIntBetween(0, 2)) { case 0: - opType = randomFrom("index", "create"); createdResponse = true; + opType = DocWriteRequest.OpType.CREATE; created++; break; case 1: - opType = randomFrom("index", "create"); createdResponse = false; + opType = randomFrom(DocWriteRequest.OpType.INDEX, DocWriteRequest.OpType.UPDATE); updated++; break; case 2: - opType = "delete"; createdResponse = false; + opType = DocWriteRequest.OpType.DELETE; deleted++; break; default: throw new RuntimeException("Bad scenario"); } - responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); + responses[i] = new BulkItemResponse(i, opType, + new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); } new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); @@ -360,7 +362,8 @@ public void testSearchTimeoutsAbortRequest() throws Exception { public void testBulkFailuresAbortRequest() throws Exception { Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction(); - BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong()); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] + {new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong()); action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getBulkFailures(), contains(failure)); @@ -779,33 +782,29 @@ RequestBuilder extends ActionRequestBuilder> } BulkItemResponse[] responses = new BulkItemResponse[bulk.requests().size()]; for (int i = 0; i < bulk.requests().size(); i++) { - ActionRequest item = bulk.requests().get(i); - String opType; + DocWriteRequest item = bulk.requests().get(i); DocWriteResponse response; - ShardId shardId = new ShardId(new Index(((ReplicationRequest) item).index(), "uuid"), 0); + ShardId shardId = new ShardId(new Index(item.index(), "uuid"), 0); if (item instanceof IndexRequest) { IndexRequest index = (IndexRequest) item; - opType = index.opType().lowercase(); response = new IndexResponse(shardId, index.type(), index.id(), randomIntBetween(0, Integer.MAX_VALUE), true); } else if (item instanceof UpdateRequest) { UpdateRequest update = (UpdateRequest) item; - opType = "update"; response = new UpdateResponse(shardId, update.type(), update.id(), - randomIntBetween(0, Integer.MAX_VALUE), DocWriteResponse.Result.CREATED); + randomIntBetween(0, Integer.MAX_VALUE), Result.CREATED); } else if (item instanceof DeleteRequest) { DeleteRequest delete = (DeleteRequest) item; - opType = "delete"; response = new DeleteResponse(shardId, delete.type(), delete.id(), randomIntBetween(0, Integer.MAX_VALUE), true); } else { throw new RuntimeException("Unknown request: " + item); } if (i == toReject) { - responses[i] = new BulkItemResponse(i, opType, + responses[i] = new BulkItemResponse(i, item.opType(), new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException())); } else { - responses[i] = new BulkItemResponse(i, opType, response); + responses[i] = new BulkItemResponse(i, item.opType(), response); } } listener.onResponse((Response) new BulkResponse(responses, 1)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java index b81be4a1bb24f..9bfa41da7f31a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java @@ -28,7 +28,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import static org.elasticsearch.action.index.IndexRequest.OpType.CREATE; +import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexVersioningTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexVersioningTests.java index 2988fcb5ca6b1..1ab0613103f09 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexVersioningTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexVersioningTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.action.get.GetResponse; -import static org.elasticsearch.action.index.IndexRequest.OpType.CREATE; +import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE; import static org.elasticsearch.index.VersionType.EXTERNAL; import static org.elasticsearch.index.VersionType.INTERNAL;