diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml index 06d1dd1ceb92b..9b2f263ddcbfb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml @@ -3,8 +3,8 @@ "Deprecated parameters should produce warning in Bulk query": - skip: - version: " - 6.6.99" - reason: versioned operations were deprecated in 6.7 + version: " - 6.8.0" + reason: versioned operations were deprecated in 6.7 but we won't issue deprecation unless all nodes are upgraded features: "warnings" - do: diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 807ed1a387009..11638a742a024 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -18,13 +18,14 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; @@ -253,15 +254,9 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) thr } } - static void logDeprecationWarnings(DocWriteRequest request, DeprecationLogger logger) { - if (request.versionType() == VersionType.INTERNAL && - request.version() != Versions.MATCH_ANY && - request.version() != Versions.MATCH_DELETED) { - logger.deprecatedAndMaybeLog("occ_internal_version", - "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" + - " the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])", - request.index(), request.type(), request.id()); - } + /** Tests if the cluster is ready for compare and write using sequence numbers. */ + static boolean canUseIfSeqNo(ClusterState clusterState) { + return clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0); } static ActionRequestValidationException validateSeqNoBasedCASParams( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 09fdcd652dedf..468e061f4e662 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -49,10 +49,13 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -81,6 +84,7 @@ public class TransportShardBulkAction extends TransportWriteAction performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, + boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); - return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + return performOnPrimary(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); } private static WritePrimaryResult performOnPrimary( - BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + BulkPrimaryExecutionContext context, UpdateHelper updateHelper, boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { while (context.hasMoreOperationsToExecute()) { - executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + executeBulkItemRequest(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); assert context.isInitial(); // either completed and moved to next or reset } return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), @@ -164,16 +169,16 @@ private static WritePrimaryResult performOn } /** Executes bulk item requests and handles request execution exceptions */ - static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) - throws Exception { + static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, + boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, + CheckedRunnable waitForMappingUpdate) throws Exception { + validateDocWriteRequest(context.getCurrent(), canUseIfSeqNo); final DocWriteRequest.OpType opType = context.getCurrent().opType(); - final UpdateHelper.Result updateResult; if (opType == DocWriteRequest.OpType.UPDATE) { final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); try { - updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), canUseIfSeqNo, nowInMillisSupplier); } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request @@ -209,7 +214,6 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe } assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { executeDeleteRequestOnPrimary(context, mappingUpdater); } else { @@ -501,4 +505,24 @@ public void updateMappings(final Mapping update, final ShardId shardId, final St mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update); } } + + private static void validateDocWriteRequest(DocWriteRequest request, boolean canUseIfSeqNo) { + if (canUseIfSeqNo) { + if (request.versionType() == VersionType.INTERNAL + && request.version() != Versions.MATCH_ANY + && request.version() != Versions.MATCH_DELETED) { + DEPRECATION_LOGGER.deprecatedAndMaybeLog("occ_internal_version", + "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" + + " the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])", + request.index(), request.type(), request.id()); + } + } else { + if (request.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + || request.ifPrimaryTerm() != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + assert false : "ifSeqNo [" + request.ifSeqNo() + "], ifPrimaryTerm [" + request.ifPrimaryTerm() + "]"; + throw new IllegalStateException( + "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher."); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 48a81f60ee6fe..652e0b5a9c282 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.delete; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; @@ -53,7 +51,6 @@ */ public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(DeleteRequest.class)); private String type; private String id; @@ -102,8 +99,6 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } @@ -286,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + "Stream version [" + out.getVersion() + "]"); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 75a8f17a64bce..65a42df04ba60 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.index; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -37,7 +36,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -75,7 +73,6 @@ * @see org.elasticsearch.client.Client#index(IndexRequest) */ public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(IndexRequest.class)); /** * Max length of the source document to include into string() @@ -200,9 +197,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } @@ -654,7 +648,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + "Stream version [" + out.getVersion() + "]"); diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index f9747ca2e5bd0..9b8dc4033c4e2 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -174,7 +175,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final ShardId shardId = request.getShardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); - final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis); + final UpdateHelper.Result result = updateHelper.prepare( + request, indexShard, DocWriteRequest.canUseIfSeqNo(clusterService.state()), threadPool::absoluteTimeInMillis); switch (result.getResponseResult()) { case CREATED: IndexRequest upsertRequest = result.action(); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 5a4e50b4d7375..5155bc6d40229 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -22,12 +22,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -55,7 +53,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; /** @@ -66,27 +63,21 @@ public class UpdateHelper { private static final Logger logger = LogManager.getLogger(UpdateHelper.class); private final ScriptService scriptService; - private final BooleanSupplier canUseIfSeqNo; - public UpdateHelper(ScriptService scriptService, ClusterService clusterService) { - this(scriptService, () -> clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0)); - } - - UpdateHelper(ScriptService scriptService, BooleanSupplier canUseIfSeqNo) { + public UpdateHelper(ScriptService scriptService) { this.scriptService = scriptService; - this.canUseIfSeqNo = canUseIfSeqNo; } /** * Prepares an update request by converting it into an index or delete request or an update response (no action). */ - public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - if (canUseIfSeqNo.getAsBoolean() == false) { + public Result prepare(UpdateRequest request, IndexShard indexShard, boolean canUseIfSeqNo, LongSupplier nowInMillis) { + if (canUseIfSeqNo == false) { ensureIfSeqNoNotProvided(request.ifSeqNo(), request.ifPrimaryTerm()); } final GetResult getResult = indexShard.getService().getForUpdate( request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); - return prepare(indexShard.shardId(), request, getResult, nowInMillis); + return prepare(indexShard.shardId(), request, canUseIfSeqNo, getResult, nowInMillis); } /** @@ -94,7 +85,7 @@ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier * noop). */ @SuppressWarnings("unchecked") - protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) { + protected Result prepare(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, GetResult getResult, LongSupplier nowInMillis) { if (getResult.isExists() == false) { // If the document didn't exist, execute the update request as an upsert return prepareUpsert(shardId, request, getResult, nowInMillis); @@ -103,10 +94,10 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult throw new DocumentSourceMissingException(shardId, request.type(), request.id()); } else if (request.script() == null && request.doc() != null) { // The request has no script, it is a new doc that should be merged with the old document - return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop()); + return prepareUpdateIndexRequest(shardId, request, canUseIfSeqNo, getResult, request.detectNoop()); } else { // The request has a script (or empty script), execute the script and prepare a new index request - return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis); + return prepareUpdateScriptRequest(shardId, request, canUseIfSeqNo, getResult, nowInMillis); } } @@ -223,7 +214,8 @@ static String calculateParent(GetResult getResult, @Nullable IndexRequest update * Prepare the request for merging the existing document with a new one, can optionally detect a noop change. Returns a {@code Result} * containing a new {@code IndexRequest} to be executed on the primary and replicas. */ - Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) { + Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, + GetResult getResult, boolean detectNoop) { final IndexRequest currentRequest = request.doc(); final String routing = calculateRouting(getResult, currentRequest); final String parent = calculateParent(getResult, currentRequest); @@ -247,7 +239,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu .source(updatedSourceAsMap, updateSourceContentType) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -261,7 +253,8 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu * either a new {@code IndexRequest} or {@code DeleteRequest} (depending on the script's returned "op" value) to be executed on the * primary and replicas. */ - Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) { + Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, + GetResult getResult, LongSupplier nowInMillis) { final IndexRequest currentRequest = request.doc(); final String routing = calculateRouting(getResult, currentRequest); final String parent = calculateParent(getResult, currentRequest); @@ -293,7 +286,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes .source(updatedSourceAsMap, updateSourceContentType) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -304,7 +297,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes .type(request.type()).id(request.id()).routing(routing).parent(parent) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -394,7 +387,7 @@ public static GetResult extractGetResult(final UpdateRequest request, String con private void ensureIfSeqNoNotProvided(long ifSeqNo, long ifPrimaryTerm) { if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher."); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index bd47b89b52f12..f90267eddf35d 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -161,8 +161,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("doc must be specified if doc_as_upsert is enabled", validationException); } - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1ecf385b3e3ec..cb9f211e2c0af 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -582,7 +582,7 @@ protected Node( b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); - b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService(), clusterService)); + b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 2bb64d62e739c..dc1532b8301eb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -316,8 +316,6 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept bulkRequest.add(data, null, null, xContentType); assertThat(bulkRequest.validate().validationErrors(), contains("can't provide both upsert request and a version", "can't provide version in upsert request")); - assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + - "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [type], id [id])"); } public void testBulkTerminatedByNewline() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 4cb496c96f12f..9ef584ab0ee61 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; @@ -53,6 +54,8 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -148,7 +151,7 @@ public void testExecuteBulkIndexRequest() throws Exception { UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -175,7 +178,7 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + randomBoolean(), threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); @@ -226,7 +229,7 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { UpdateHelper updateHelper = null; WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + bulkShardRequest, shard, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); // since at least 1 item passed, the tran log location should exist, @@ -280,7 +283,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, randomBoolean(), threadPool::absoluteTimeInMillis, (update, shardId, type) -> { // There should indeed be a mapping update assertNotNull(update); @@ -297,7 +300,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(success); - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, randomBoolean(), threadPool::absoluteTimeInMillis, (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); @@ -335,7 +338,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), errorOnWait ? () -> { throw err; } : () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -376,7 +379,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -418,7 +421,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -463,7 +466,7 @@ public void testNoopUpdateRequest() throws Exception { IndexShard shard = mock(IndexShard.class); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(noopUpdateResponse, DocWriteResponse.Result.NOOP, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -474,7 +477,7 @@ public void testNoopUpdateRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -509,7 +512,7 @@ public void testUpdateRequestWithFailure() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -520,7 +523,7 @@ public void testUpdateRequestWithFailure() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -558,7 +561,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -569,7 +572,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -605,7 +608,7 @@ public void testUpdateRequestWithSuccess() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, created ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -616,7 +619,7 @@ public void testUpdateRequestWithSuccess() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -651,7 +654,7 @@ public void testUpdateWithDelete() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, DocWriteResponse.Result.DELETED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -662,7 +665,7 @@ public void testUpdateWithDelete() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -688,7 +691,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { UpdateHelper updateHelper = mock(UpdateHelper.class); final ElasticsearchException err = new ElasticsearchException("oops"); - when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(err); + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenThrow(err); BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); @@ -696,7 +699,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -729,7 +732,7 @@ public void testTranslogPositionToSync() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); while (context.hasMoreOperationsToExecute()) { - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); } @@ -805,7 +808,7 @@ public void testRetries() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -814,7 +817,7 @@ public void testRetries() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + bulkShardRequest, shard, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertThat(result.location, equalTo(resultLocation)); @@ -827,6 +830,64 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); } + public void testDeprecationCASUsingVersion() throws Exception { + IndexShard shard = newStartedShard(true); + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(1, 10)]; + Set deprecatedRequestIds = new HashSet<>(); + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + if (randomBoolean()) { + writeRequest.version(randomNonNegativeLong()); + deprecatedRequestIds.add(writeRequest.id()); + } + items[i] = new BulkItemRequest(i, writeRequest); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, true, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); + closeShards(shard); + if (deprecatedRequestIds.isEmpty() == false) { + assertWarnings(deprecatedRequestIds.stream().map(id -> + "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use the " + + "`if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [_doc], id [" + id + "])") + .toArray(String[]::new)); + } + } + + public void testRejectCASUsingSeqNo() throws Exception { + IndexShard shard = newStartedShard(true); + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(1, 10)]; + Tuple casWithSeqNo = null; + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + if (randomBoolean()) { + writeRequest.version(randomNonNegativeLong()); + } else { + writeRequest.setIfSeqNo(randomLongBetween(0, Long.MAX_VALUE)); + writeRequest.setIfPrimaryTerm(randomNonNegativeLong()); + if (casWithSeqNo == null) { + casWithSeqNo = Tuple.tuple(writeRequest.ifSeqNo(), writeRequest.ifPrimaryTerm()); + } + } + items[i] = new BulkItemRequest(i, writeRequest); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + if (casWithSeqNo != null) { + AssertionError error = expectThrows(AssertionError.class, () -> + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, false, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {})); + assertThat(error.getMessage(), equalTo("ifSeqNo [" + casWithSeqNo.v1() + "], ifPrimaryTerm [" + casWithSeqNo.v2() + "]")); + } else { + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, false, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); + } + closeShards(shard); + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary. diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index a29348caea6aa..00676a614f98b 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -54,7 +54,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static java.util.Collections.emptyMap; @@ -77,7 +76,6 @@ public class UpdateRequestTests extends ESTestCase { private UpdateHelper updateHelper; - private final AtomicBoolean canUseIfSeqNo = new AtomicBoolean(true); @Override @Before @@ -143,7 +141,7 @@ public void setUp() throws Exception { final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap()); Map engines = Collections.singletonMap(engine.getType(), engine); ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS); - updateHelper = new UpdateHelper(scriptService, canUseIfSeqNo::get); + updateHelper = new UpdateHelper(scriptService); } public void testFromXContent() throws Exception { @@ -400,7 +398,8 @@ public void testNowInScript() throws IOException { long nowInMillis = randomNonNegativeLong(); // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", UNASSIGNED_SEQ_NO, 0, 0, false, null, null); - UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); + UpdateHelper.Result result = updateHelper.prepare( + new ShardId("test", "_na_", 0), updateRequest, randomBoolean(), getResult, () -> nowInMillis); Streamable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); IndexRequest indexAction = (IndexRequest) action; @@ -413,7 +412,8 @@ public void testNowInScript() throws IOException { .scriptedUpsert(true); // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", 0, 1, 0, true, new BytesArray("{}"), null); - UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> 42L); + UpdateHelper.Result result = updateHelper.prepare( + new ShardId("test", "_na_", 0), updateRequest, randomBoolean(), getResult, () -> 42L); Streamable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); } @@ -462,6 +462,7 @@ private void runTimeoutTest(final GetResult getResult, final UpdateRequest updat final UpdateHelper.Result result = updateHelper.prepare( new ShardId("test", "", 0), updateRequest, + randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); final Streamable action = result.action(); @@ -561,8 +562,6 @@ public void testToValidateUpsertRequestAndVersion() { updateRequest.doc("{}", XContentType.JSON); updateRequest.upsert(new IndexRequest("index","type", "id")); assertThat(updateRequest.validate().validationErrors(), contains("can't provide both upsert request and a version")); - assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + - "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [type], id [id])"); } public void testToValidateUpsertRequestWithVersion() { @@ -638,13 +637,13 @@ public void testNoopDetection() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); - UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true); + UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, true); assertThat(result.action(), instanceOf(UpdateResponse.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP)); // Try again, with detectNoop turned off - result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false); + result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, false); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo")); @@ -652,7 +651,7 @@ public void testNoopDetection() throws Exception { // Change the request to be a different doc request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"bar\"}}"))); - result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true); + result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, true); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); @@ -669,8 +668,8 @@ public void testUpdateScript() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1") .script(mockInlineScript("ctx._source.body = \"foo\"")); - UpdateHelper.Result result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, - ESTestCase::randomNonNegativeLong); + UpdateHelper.Result result = updateHelper.prepareUpdateScriptRequest( + shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); @@ -679,7 +678,7 @@ public void testUpdateScript() throws Exception { // Now where the script changes the op to "delete" request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = delete")); - result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, + result = updateHelper.prepareUpdateScriptRequest(shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(DeleteRequest.class)); @@ -693,7 +692,7 @@ public void testUpdateScript() throws Exception { request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = bad")); } - result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, + result = updateHelper.prepareUpdateScriptRequest(shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(UpdateResponse.class)); @@ -710,29 +709,17 @@ public void testOldClusterFallbackToUseVersion() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); - canUseIfSeqNo.set(false); - IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, false, getResult, () -> randomNonNegativeLong()).action(); assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); assertThat(updateUsingVersion.version(), equalTo(version)); - canUseIfSeqNo.set(true); - IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, true, getResult, () -> randomNonNegativeLong()).action(); assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); } - public void testOldClusterRejectIfSeqNo() { - canUseIfSeqNo.set(false); - long ifSeqNo = randomNonNegativeLong(); - long ifPrimaryTerm = randomNonNegativeLong(); - UpdateRequest request = new UpdateRequest("test", "type1", "1").setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm); - AssertionError error = expectThrows(AssertionError.class, - () -> updateHelper.prepare(request, null, ESTestCase::randomNonNegativeLong)); - assertThat(error.getMessage(), equalTo("setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]")); - } - public void testToString() throws IOException { UpdateRequest request = new UpdateRequest("test", "type1", "1") .script(mockInlineScript("ctx._source.body = \"foo\"")); @@ -744,4 +731,13 @@ public void testToString() throws IOException { assertThat(request.toString(), equalTo("update {[test][type1][1], doc_as_upsert[false], " + "doc[index {[null][null][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}")); } + + public void testOldClusterRejectIfSeqNo() { + long ifSeqNo = randomNonNegativeLong(); + long ifPrimaryTerm = randomNonNegativeLong(); + UpdateRequest request = new UpdateRequest("test", "type1", "1").setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm); + AssertionError error = expectThrows(AssertionError.class, + () -> updateHelper.prepare(request, null, false, ESTestCase::randomNonNegativeLong)); + assertThat(error.getMessage(), equalTo("ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]")); + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 3bd6d452b2519..a431e60f10b1b 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -544,6 +544,8 @@ public void testRequestFailureReplication() throws Exception { assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog", indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0)); } + assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + + "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [test], type [type], id [1])"); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e6357c8a56dac..c06d8598a99cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -788,7 +788,7 @@ private TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + result = TransportShardBulkAction.performOnPrimary(request, primary, null, true, System::currentTimeMillis, noopMappingUpdater, null); } TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);