From 0260e8f4239ba0d5a04caf0e4fc2fcb08edb497b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 28 May 2019 12:03:07 -0400 Subject: [PATCH 1/8] Forgo deprecation when cluster not ready for ifSeqNo --- .../action/delete/DeleteRequest.java | 20 ++++++++++++++++++- .../action/index/IndexRequest.java | 20 ++++++++++++++++--- .../action/update/UpdateHelper.java | 3 +++ .../action/update/UpdateRequestTests.java | 2 ++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 48a81f60ee6fe..905225615753b 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -65,6 +65,8 @@ public class DeleteRequest extends ReplicatedWriteRequest private VersionType versionType = VersionType.INTERNAL; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + private boolean ignoreCASUsingVersionDeprecation = false; + public DeleteRequest() { } @@ -102,7 +104,9 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); + if (ignoreCASUsingVersionDeprecation == false) { + DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); + } return validationException; } @@ -307,4 +311,18 @@ public String toString() { public DeleteRequest setShardId(ShardId shardId) { throw new UnsupportedOperationException("shard id should never be set on DeleteRequest"); } + + /** + * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. + * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. + * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. + */ + public void ignoreCASUsingVersionDeprecation() { + if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; + throw new IllegalStateException("request already uses sequence number based compare and write; " + + "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); + } + this.ignoreCASUsingVersionDeprecation = true; + } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 75a8f17a64bce..8f0f3e68cb8ac 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -113,7 +113,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isRetry = false; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - + private boolean ignoreCASUsingVersionDeprecation = false; public IndexRequest() { } @@ -200,8 +200,9 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); + if (ignoreCASUsingVersionDeprecation == false) { + DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); + } return validationException; } @@ -708,4 +709,17 @@ public IndexRequest setShardId(ShardId shardId) { throw new UnsupportedOperationException("shard id should never be set on IndexRequest"); } + /** + * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. + * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. + * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. + */ + public void ignoreCASUsingVersionDeprecation() { + if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; + throw new IllegalStateException("request already uses sequence number based compare and write; " + + "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); + } + this.ignoreCASUsingVersionDeprecation = true; + } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 5a4e50b4d7375..8ccd7a17cd829 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -251,6 +251,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + finalIndexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } @@ -297,6 +298,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + indexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); case DELETE: @@ -308,6 +310,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + deleteRequest.ignoreCASUsingVersionDeprecation(); } return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); default: diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index a29348caea6aa..d912e11c89556 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -715,12 +715,14 @@ public void testOldClusterFallbackToUseVersion() throws Exception { assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); assertThat(updateUsingVersion.version(), equalTo(version)); + assertNull(updateUsingVersion.validate()); canUseIfSeqNo.set(true); IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); + assertNull(updateUsingSeqNo.validate()); } public void testOldClusterRejectIfSeqNo() { From b3841acc43ef4e5b0918539e2be6e313c8e91b73 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 28 May 2019 18:25:06 -0400 Subject: [PATCH 2/8] serialize the fallback if needed --- .../action/delete/DeleteRequest.java | 29 ++++++------ .../action/index/IndexRequest.java | 28 +++++------ .../action/update/UpdateHelper.java | 24 ++++------ .../action/delete/DeleteRequestTests.java | 46 +++++++++++++++++++ .../action/index/IndexRequestTests.java | 42 +++++++++++++++++ .../action/update/UpdateRequestTests.java | 21 ++++----- 6 files changed, 138 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 905225615753b..594b4918d4bef 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; @@ -35,6 +36,7 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; +import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -65,7 +67,7 @@ public class DeleteRequest extends ReplicatedWriteRequest private VersionType versionType = VersionType.INTERNAL; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private boolean ignoreCASUsingVersionDeprecation = false; + private Tuple fallbackCASUsingVersion; public DeleteRequest() { @@ -104,7 +106,7 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - if (ignoreCASUsingVersionDeprecation == false) { + if (fallbackCASUsingVersion == null) { DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); } @@ -284,12 +286,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeOptionalString(routing()); out.writeOptionalString(parent()); - out.writeLong(version); - out.writeByte(versionType.getValue()); + if (out.getVersion().before(Version.V_6_6_0) && fallbackCASUsingVersion != null) { + out.writeLong(fallbackCASUsingVersion.v1()); + out.writeByte(fallbackCASUsingVersion.v2().getValue()); + } else { + out.writeLong(version); + out.writeByte(versionType.getValue()); + } if (out.getVersion().onOrAfter(Version.V_6_6_0)) { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); - } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + } else if (fallbackCASUsingVersion == null && (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM)) { assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + @@ -315,14 +322,10 @@ public DeleteRequest setShardId(ShardId shardId) { /** * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. - * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. */ - public void ignoreCASUsingVersionDeprecation() { - if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; - throw new IllegalStateException("request already uses sequence number based compare and write; " + - "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); - } - this.ignoreCASUsingVersionDeprecation = true; + public void setFallbackCASUsingVersion(long version, VersionType versionType) { + assert ifSeqNo != UNASSIGNED_SEQ_NO && ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM : "ifSeqNo or ifPrimaryTerm is not set"; + assert this.version == Versions.MATCH_ANY && this.versionType == VersionType.INTERNAL : "version and versionType are set already"; + this.fallbackCASUsingVersion = Tuple.tuple(version, Objects.requireNonNull(versionType)); } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 8f0f3e68cb8ac..f9ef4d1bdeba8 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; @@ -113,7 +114,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isRetry = false; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private boolean ignoreCASUsingVersionDeprecation = false; + private Tuple fallbackCASUsingVersion; public IndexRequest() { } @@ -200,7 +201,7 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - if (ignoreCASUsingVersionDeprecation == false) { + if (fallbackCASUsingVersion == null) { DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); } @@ -640,8 +641,13 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBytesReference(source); out.writeByte(opType.getId()); - out.writeLong(version); - out.writeByte(versionType.getValue()); + if (out.getVersion().before(Version.V_6_6_0) && fallbackCASUsingVersion != null) { + out.writeLong(fallbackCASUsingVersion.v1()); + out.writeByte(fallbackCASUsingVersion.v2().getValue()); + } else { + out.writeLong(version); + out.writeByte(versionType.getValue()); + } out.writeOptionalString(pipeline); out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp); @@ -654,7 +660,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_6_0)) { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); - } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + } else if (fallbackCASUsingVersion == null && (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM)) { assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + @@ -712,14 +718,10 @@ public IndexRequest setShardId(ShardId shardId) { /** * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. - * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. */ - public void ignoreCASUsingVersionDeprecation() { - if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; - throw new IllegalStateException("request already uses sequence number based compare and write; " + - "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); - } - this.ignoreCASUsingVersionDeprecation = true; + public void setFallbackCASUsingVersion(long version, VersionType versionType) { + assert ifSeqNo != UNASSIGNED_SEQ_NO && ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM : "ifSeqNo or ifPrimaryTerm is not set"; + assert this.version == Versions.MATCH_ANY && this.versionType == VersionType.INTERNAL : "version and versionType are set already"; + this.fallbackCASUsingVersion = Tuple.tuple(version, Objects.requireNonNull(versionType)); } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 8ccd7a17cd829..44f0436613341 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -245,13 +245,11 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu final IndexRequest finalIndexRequest = Requests.indexRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) + .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { - finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); - } else { - finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - finalIndexRequest.ignoreCASUsingVersionDeprecation(); + if (canUseIfSeqNo.getAsBoolean() == false) { + finalIndexRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); } return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } @@ -292,25 +290,21 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes final IndexRequest indexRequest = Requests.indexRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) + .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { - indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); - } else { - indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - indexRequest.ignoreCASUsingVersionDeprecation(); + if (canUseIfSeqNo.getAsBoolean() == false) { + indexRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); } return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); case DELETE: DeleteRequest deleteRequest = Requests.deleteRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) + .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { - deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); - } else { - deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - deleteRequest.ignoreCASUsingVersionDeprecation(); + if (canUseIfSeqNo.getAsBoolean() == false) { + deleteRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); } return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); default: diff --git a/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java b/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java index 5f897d0b8349b..64490cf973603 100644 --- a/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java @@ -18,12 +18,20 @@ */ package org.elasticsearch.action.delete; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.equalTo; public class DeleteRequestTests extends ESTestCase { @@ -43,4 +51,42 @@ public void testValidation() { assertThat(validate.validationErrors(), hasItems("type is missing", "id is missing")); } } + + public void testSerializeWithFallbackCAS() throws Exception { + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + long version = randomNonNegativeLong(); + VersionType versionType = randomFrom(VersionType.values()); + DeleteRequest request = new DeleteRequest("test", "_doc", "1"); + request.setIfSeqNo(seqNo); + request.setIfPrimaryTerm(primaryTerm); + request.setFallbackCASUsingVersion(version, versionType); + assertNull(request.validate()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_5_4); + out.setVersion(channelVersion); + request.writeTo(out); + DeleteRequest fallbackRequest = new DeleteRequest(); + StreamInput in = out.bytes().streamInput(); + in.setVersion(channelVersion); + fallbackRequest.readFrom(in); + assertThat(fallbackRequest.version(), equalTo(version)); + assertThat(fallbackRequest.versionType(), equalTo(versionType)); + assertThat(fallbackRequest.ifSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)); + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_6_0, Version.CURRENT); + out.setVersion(channelVersion); + request.writeTo(out); + DeleteRequest fallbackRequest = new DeleteRequest(); + StreamInput in = out.bytes().streamInput(); + in.setVersion(channelVersion); + fallbackRequest.readFrom(in); + assertThat(fallbackRequest.version(), equalTo(Versions.MATCH_ANY)); + assertThat(fallbackRequest.versionType(), equalTo(VersionType.INTERNAL)); + assertThat(fallbackRequest.ifSeqNo(), equalTo(seqNo)); + assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(primaryTerm)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 8ffc477c9e533..95e602f5b8d56 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -33,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -214,4 +216,44 @@ public void testRejectsEmptyStringPipeline() { assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string")); } + + public void testSerializeWithFallbackCAS() throws Exception { + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + long version = randomNonNegativeLong(); + VersionType versionType = randomFrom(VersionType.values()); + IndexRequest request = new IndexRequest("test", "_doc"); + request.source("{}", XContentType.JSON); + request.setIfSeqNo(seqNo); + request.setIfPrimaryTerm(primaryTerm); + request.setFallbackCASUsingVersion(version, versionType); + assertNull(request.validate()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_5_4); + out.setVersion(channelVersion); + request.writeTo(out); + IndexRequest fallbackRequest = new IndexRequest(); + StreamInput in = out.bytes().streamInput(); + in.setVersion(channelVersion); + fallbackRequest.readFrom(in); + assertThat(fallbackRequest.version(), equalTo(version)); + assertThat(fallbackRequest.versionType(), equalTo(versionType)); + assertThat(fallbackRequest.ifSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)); + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_6_0, Version.CURRENT); + out.setVersion(channelVersion); + request.writeTo(out); + IndexRequest fallbackRequest = new IndexRequest(); + StreamInput in = out.bytes().streamInput(); + in.setVersion(channelVersion); + fallbackRequest.readFrom(in); + assertThat(fallbackRequest.version(), equalTo(Versions.MATCH_ANY)); + assertThat(fallbackRequest.versionType(), equalTo(VersionType.INTERNAL)); + assertThat(fallbackRequest.ifSeqNo(), equalTo(seqNo)); + assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(primaryTerm)); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index d912e11c89556..cdbc2b740bd8e 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -60,7 +60,6 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.script.MockScriptEngine.mockInlineScript; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; @@ -711,18 +710,18 @@ public void testOldClusterFallbackToUseVersion() throws Exception { createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); canUseIfSeqNo.set(false); - IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); - assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); - assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); - assertThat(updateUsingVersion.version(), equalTo(version)); - assertNull(updateUsingVersion.validate()); + IndexRequest updateWithFallback = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + assertThat(updateWithFallback.ifSeqNo(), equalTo(seqNo)); + assertThat(updateWithFallback.ifPrimaryTerm(), equalTo(primaryTerm)); + assertThat(updateWithFallback.version(), equalTo(Versions.MATCH_ANY)); + assertNull(updateWithFallback.validate()); canUseIfSeqNo.set(true); - IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); - assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); - assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); - assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); - assertNull(updateUsingSeqNo.validate()); + IndexRequest updateWithoutFallback = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + assertThat(updateWithoutFallback.ifSeqNo(), equalTo(seqNo)); + assertThat(updateWithoutFallback.ifPrimaryTerm(), equalTo(primaryTerm)); + assertThat(updateWithoutFallback.version(), equalTo(Versions.MATCH_ANY)); + assertNull(updateWithoutFallback.validate()); } public void testOldClusterRejectIfSeqNo() { From 2280d9ae4382d2056de97f13edf1d810171c7b9b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 3 Jun 2019 09:59:44 -0400 Subject: [PATCH 3/8] Revert "serialize the fallback if needed" This reverts commit b3841acc43ef4e5b0918539e2be6e313c8e91b73. --- .../action/delete/DeleteRequest.java | 29 ++++++------ .../action/index/IndexRequest.java | 28 ++++++----- .../action/update/UpdateHelper.java | 24 ++++++---- .../action/delete/DeleteRequestTests.java | 46 ------------------- .../action/index/IndexRequestTests.java | 42 ----------------- .../action/update/UpdateRequestTests.java | 21 +++++---- 6 files changed, 52 insertions(+), 138 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 594b4918d4bef..905225615753b 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; @@ -36,7 +35,6 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -67,7 +65,7 @@ public class DeleteRequest extends ReplicatedWriteRequest private VersionType versionType = VersionType.INTERNAL; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private Tuple fallbackCASUsingVersion; + private boolean ignoreCASUsingVersionDeprecation = false; public DeleteRequest() { @@ -106,7 +104,7 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - if (fallbackCASUsingVersion == null) { + if (ignoreCASUsingVersionDeprecation == false) { DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); } @@ -286,17 +284,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeOptionalString(routing()); out.writeOptionalString(parent()); - if (out.getVersion().before(Version.V_6_6_0) && fallbackCASUsingVersion != null) { - out.writeLong(fallbackCASUsingVersion.v1()); - out.writeByte(fallbackCASUsingVersion.v2().getValue()); - } else { - out.writeLong(version); - out.writeByte(versionType.getValue()); - } + out.writeLong(version); + out.writeByte(versionType.getValue()); if (out.getVersion().onOrAfter(Version.V_6_6_0)) { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); - } else if (fallbackCASUsingVersion == null && (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM)) { + } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + @@ -322,10 +315,14 @@ public DeleteRequest setShardId(ShardId shardId) { /** * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. + * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. */ - public void setFallbackCASUsingVersion(long version, VersionType versionType) { - assert ifSeqNo != UNASSIGNED_SEQ_NO && ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM : "ifSeqNo or ifPrimaryTerm is not set"; - assert this.version == Versions.MATCH_ANY && this.versionType == VersionType.INTERNAL : "version and versionType are set already"; - this.fallbackCASUsingVersion = Tuple.tuple(version, Objects.requireNonNull(versionType)); + public void ignoreCASUsingVersionDeprecation() { + if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; + throw new IllegalStateException("request already uses sequence number based compare and write; " + + "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); + } + this.ignoreCASUsingVersionDeprecation = true; } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index f9ef4d1bdeba8..8f0f3e68cb8ac 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; @@ -114,7 +113,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isRetry = false; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private Tuple fallbackCASUsingVersion; + private boolean ignoreCASUsingVersionDeprecation = false; public IndexRequest() { } @@ -201,7 +200,7 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - if (fallbackCASUsingVersion == null) { + if (ignoreCASUsingVersionDeprecation == false) { DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); } @@ -641,13 +640,8 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBytesReference(source); out.writeByte(opType.getId()); - if (out.getVersion().before(Version.V_6_6_0) && fallbackCASUsingVersion != null) { - out.writeLong(fallbackCASUsingVersion.v1()); - out.writeByte(fallbackCASUsingVersion.v2().getValue()); - } else { - out.writeLong(version); - out.writeByte(versionType.getValue()); - } + out.writeLong(version); + out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp); @@ -660,7 +654,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_6_0)) { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); - } else if (fallbackCASUsingVersion == null && (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM)) { + } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + @@ -718,10 +712,14 @@ public IndexRequest setShardId(ShardId shardId) { /** * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. + * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. */ - public void setFallbackCASUsingVersion(long version, VersionType versionType) { - assert ifSeqNo != UNASSIGNED_SEQ_NO && ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM : "ifSeqNo or ifPrimaryTerm is not set"; - assert this.version == Versions.MATCH_ANY && this.versionType == VersionType.INTERNAL : "version and versionType are set already"; - this.fallbackCASUsingVersion = Tuple.tuple(version, Objects.requireNonNull(versionType)); + public void ignoreCASUsingVersionDeprecation() { + if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { + assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; + throw new IllegalStateException("request already uses sequence number based compare and write; " + + "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); + } + this.ignoreCASUsingVersionDeprecation = true; } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 44f0436613341..8ccd7a17cd829 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -245,11 +245,13 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu final IndexRequest finalIndexRequest = Requests.indexRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) - .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean() == false) { - finalIndexRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); + if (canUseIfSeqNo.getAsBoolean()) { + finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); + } else { + finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + finalIndexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } @@ -290,21 +292,25 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes final IndexRequest indexRequest = Requests.indexRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) - .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean() == false) { - indexRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); + if (canUseIfSeqNo.getAsBoolean()) { + indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); + } else { + indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + indexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); case DELETE: DeleteRequest deleteRequest = Requests.deleteRequest(request.index()) .type(request.type()).id(request.id()).routing(routing).parent(parent) - .setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean() == false) { - deleteRequest.setFallbackCASUsingVersion(calculateUpdateVersion(request, getResult), request.versionType()); + if (canUseIfSeqNo.getAsBoolean()) { + deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); + } else { + deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); + deleteRequest.ignoreCASUsingVersionDeprecation(); } return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); default: diff --git a/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java b/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java index 64490cf973603..5f897d0b8349b 100644 --- a/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/delete/DeleteRequestTests.java @@ -18,20 +18,12 @@ */ package org.elasticsearch.action.delete; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.Matchers.equalTo; public class DeleteRequestTests extends ESTestCase { @@ -51,42 +43,4 @@ public void testValidation() { assertThat(validate.validationErrors(), hasItems("type is missing", "id is missing")); } } - - public void testSerializeWithFallbackCAS() throws Exception { - long seqNo = randomNonNegativeLong(); - long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); - long version = randomNonNegativeLong(); - VersionType versionType = randomFrom(VersionType.values()); - DeleteRequest request = new DeleteRequest("test", "_doc", "1"); - request.setIfSeqNo(seqNo); - request.setIfPrimaryTerm(primaryTerm); - request.setFallbackCASUsingVersion(version, versionType); - assertNull(request.validate()); - try (BytesStreamOutput out = new BytesStreamOutput()) { - Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_5_4); - out.setVersion(channelVersion); - request.writeTo(out); - DeleteRequest fallbackRequest = new DeleteRequest(); - StreamInput in = out.bytes().streamInput(); - in.setVersion(channelVersion); - fallbackRequest.readFrom(in); - assertThat(fallbackRequest.version(), equalTo(version)); - assertThat(fallbackRequest.versionType(), equalTo(versionType)); - assertThat(fallbackRequest.ifSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)); - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_6_0, Version.CURRENT); - out.setVersion(channelVersion); - request.writeTo(out); - DeleteRequest fallbackRequest = new DeleteRequest(); - StreamInput in = out.bytes().streamInput(); - in.setVersion(channelVersion); - fallbackRequest.readFrom(in); - assertThat(fallbackRequest.version(), equalTo(Versions.MATCH_ANY)); - assertThat(fallbackRequest.versionType(), equalTo(VersionType.INTERNAL)); - assertThat(fallbackRequest.ifSeqNo(), equalTo(seqNo)); - assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(primaryTerm)); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 95e602f5b8d56..8ffc477c9e533 100644 --- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -34,7 +33,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -216,44 +214,4 @@ public void testRejectsEmptyStringPipeline() { assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string")); } - - public void testSerializeWithFallbackCAS() throws Exception { - long seqNo = randomNonNegativeLong(); - long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); - long version = randomNonNegativeLong(); - VersionType versionType = randomFrom(VersionType.values()); - IndexRequest request = new IndexRequest("test", "_doc"); - request.source("{}", XContentType.JSON); - request.setIfSeqNo(seqNo); - request.setIfPrimaryTerm(primaryTerm); - request.setFallbackCASUsingVersion(version, versionType); - assertNull(request.validate()); - try (BytesStreamOutput out = new BytesStreamOutput()) { - Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_5_4); - out.setVersion(channelVersion); - request.writeTo(out); - IndexRequest fallbackRequest = new IndexRequest(); - StreamInput in = out.bytes().streamInput(); - in.setVersion(channelVersion); - fallbackRequest.readFrom(in); - assertThat(fallbackRequest.version(), equalTo(version)); - assertThat(fallbackRequest.versionType(), equalTo(versionType)); - assertThat(fallbackRequest.ifSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)); - } - try (BytesStreamOutput out = new BytesStreamOutput()) { - Version channelVersion = VersionUtils.randomVersionBetween(random(), Version.V_6_6_0, Version.CURRENT); - out.setVersion(channelVersion); - request.writeTo(out); - IndexRequest fallbackRequest = new IndexRequest(); - StreamInput in = out.bytes().streamInput(); - in.setVersion(channelVersion); - fallbackRequest.readFrom(in); - assertThat(fallbackRequest.version(), equalTo(Versions.MATCH_ANY)); - assertThat(fallbackRequest.versionType(), equalTo(VersionType.INTERNAL)); - assertThat(fallbackRequest.ifSeqNo(), equalTo(seqNo)); - assertThat(fallbackRequest.ifPrimaryTerm(), equalTo(primaryTerm)); - } - } - } diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index cdbc2b740bd8e..d912e11c89556 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -60,6 +60,7 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.script.MockScriptEngine.mockInlineScript; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; @@ -710,18 +711,18 @@ public void testOldClusterFallbackToUseVersion() throws Exception { createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); canUseIfSeqNo.set(false); - IndexRequest updateWithFallback = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); - assertThat(updateWithFallback.ifSeqNo(), equalTo(seqNo)); - assertThat(updateWithFallback.ifPrimaryTerm(), equalTo(primaryTerm)); - assertThat(updateWithFallback.version(), equalTo(Versions.MATCH_ANY)); - assertNull(updateWithFallback.validate()); + IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); + assertThat(updateUsingVersion.version(), equalTo(version)); + assertNull(updateUsingVersion.validate()); canUseIfSeqNo.set(true); - IndexRequest updateWithoutFallback = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); - assertThat(updateWithoutFallback.ifSeqNo(), equalTo(seqNo)); - assertThat(updateWithoutFallback.ifPrimaryTerm(), equalTo(primaryTerm)); - assertThat(updateWithoutFallback.version(), equalTo(Versions.MATCH_ANY)); - assertNull(updateWithoutFallback.validate()); + IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); + assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); + assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); + assertNull(updateUsingSeqNo.validate()); } public void testOldClusterRejectIfSeqNo() { From 9d67c6e37eb9753a1e6db3a2ae3343804c2dd6c1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 3 Jun 2019 14:39:18 -0400 Subject: [PATCH 4/8] Revert "Forgo deprecation when cluster not ready for ifSeqNo" This reverts commit 0260e8f4239ba0d5a04caf0e4fc2fcb08edb497b. --- .../action/delete/DeleteRequest.java | 20 +------------------ .../action/index/IndexRequest.java | 20 +++---------------- .../action/update/UpdateHelper.java | 3 --- .../action/update/UpdateRequestTests.java | 2 -- 4 files changed, 4 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 905225615753b..48a81f60ee6fe 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -65,8 +65,6 @@ public class DeleteRequest extends ReplicatedWriteRequest private VersionType versionType = VersionType.INTERNAL; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private boolean ignoreCASUsingVersionDeprecation = false; - public DeleteRequest() { } @@ -104,9 +102,7 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - if (ignoreCASUsingVersionDeprecation == false) { - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - } + DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); return validationException; } @@ -311,18 +307,4 @@ public String toString() { public DeleteRequest setShardId(ShardId shardId) { throw new UnsupportedOperationException("shard id should never be set on DeleteRequest"); } - - /** - * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. - * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. - * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. - */ - public void ignoreCASUsingVersionDeprecation() { - if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; - throw new IllegalStateException("request already uses sequence number based compare and write; " + - "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); - } - this.ignoreCASUsingVersionDeprecation = true; - } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 8f0f3e68cb8ac..75a8f17a64bce 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -113,7 +113,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private boolean isRetry = false; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - private boolean ignoreCASUsingVersionDeprecation = false; + public IndexRequest() { } @@ -200,9 +200,8 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - if (ignoreCASUsingVersionDeprecation == false) { - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - } + + DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); return validationException; } @@ -709,17 +708,4 @@ public IndexRequest setShardId(ShardId shardId) { throw new UnsupportedOperationException("shard id should never be set on IndexRequest"); } - /** - * If the primary on 6.6+ but replicas on older versions, we can not use CAS using ifSeqNo since it requires all nodes on 6.6+. - * In this case, we have to fall back to use CAS with _version and should not issue a deprecation warning log during validation. - * This flag is merely used to forgo the deprecation log when the cluster is not ready for ifSeqNo. - */ - public void ignoreCASUsingVersionDeprecation() { - if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : ifSeqNo + "[" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"; - throw new IllegalStateException("request already uses sequence number based compare and write; " + - "ifSeqNo [" + ifSeqNo + "] ifPrimaryTerm[" + ifPrimaryTerm + "]"); - } - this.ignoreCASUsingVersionDeprecation = true; - } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 8ccd7a17cd829..5a4e50b4d7375 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -251,7 +251,6 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - finalIndexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } @@ -298,7 +297,6 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - indexRequest.ignoreCASUsingVersionDeprecation(); } return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); case DELETE: @@ -310,7 +308,6 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); - deleteRequest.ignoreCASUsingVersionDeprecation(); } return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); default: diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index d912e11c89556..a29348caea6aa 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -715,14 +715,12 @@ public void testOldClusterFallbackToUseVersion() throws Exception { assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); assertThat(updateUsingVersion.version(), equalTo(version)); - assertNull(updateUsingVersion.validate()); canUseIfSeqNo.set(true); IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); - assertNull(updateUsingSeqNo.validate()); } public void testOldClusterRejectIfSeqNo() { From 983a4f66ee6da364132e7a395f189623a18983e9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 3 Jun 2019 17:00:13 -0400 Subject: [PATCH 5/8] Move CAS deprecation log to BulkShard request --- .../elasticsearch/action/DocWriteRequest.java | 15 +-- .../action/bulk/TransportShardBulkAction.java | 42 +++++-- .../action/delete/DeleteRequest.java | 7 +- .../action/index/IndexRequest.java | 8 +- .../action/update/TransportUpdateAction.java | 4 +- .../action/update/UpdateHelper.java | 37 +++--- .../action/update/UpdateRequest.java | 2 - .../java/org/elasticsearch/node/Node.java | 2 +- .../bulk/TransportShardBulkActionTests.java | 107 ++++++++++++++---- .../action/update/UpdateRequestTests.java | 52 ++++----- .../ESIndexLevelReplicationTestCase.java | 2 +- 11 files changed, 169 insertions(+), 109 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 807ed1a387009..11638a742a024 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -18,13 +18,14 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; @@ -253,15 +254,9 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) thr } } - static void logDeprecationWarnings(DocWriteRequest request, DeprecationLogger logger) { - if (request.versionType() == VersionType.INTERNAL && - request.version() != Versions.MATCH_ANY && - request.version() != Versions.MATCH_DELETED) { - logger.deprecatedAndMaybeLog("occ_internal_version", - "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" + - " the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])", - request.index(), request.type(), request.id()); - } + /** Tests if the cluster is ready for compare and write using sequence numbers. */ + static boolean canUseIfSeqNo(ClusterState clusterState) { + return clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0); } static ActionRequestValidationException validateSeqNoBasedCASParams( diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 09fdcd652dedf..3ded2df94e51f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -49,10 +49,13 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -81,6 +84,7 @@ public class TransportShardBulkAction extends TransportWriteAction performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, + boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); - return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + return performOnPrimary(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); } private static WritePrimaryResult performOnPrimary( - BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + BulkPrimaryExecutionContext context, UpdateHelper updateHelper, boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { while (context.hasMoreOperationsToExecute()) { - executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + executeBulkItemRequest(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); assert context.isInitial(); // either completed and moved to next or reset } return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), @@ -164,7 +169,8 @@ private static WritePrimaryResult performOn } /** Executes bulk item requests and handles request execution exceptions */ - static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, + boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { final DocWriteRequest.OpType opType = context.getCurrent().opType(); @@ -173,7 +179,7 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe if (opType == DocWriteRequest.OpType.UPDATE) { final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); try { - updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), canUseIfSeqNo, nowInMillisSupplier); } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request @@ -209,7 +215,7 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe } assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - + validateDocWriteRequest(context.getRequestToExecute(), canUseIfSeqNo); if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { executeDeleteRequestOnPrimary(context, mappingUpdater); } else { @@ -501,4 +507,24 @@ public void updateMappings(final Mapping update, final ShardId shardId, final St mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update); } } + + private static void validateDocWriteRequest(DocWriteRequest request, boolean canUseIfSeqNo) { + if (canUseIfSeqNo) { + if (request.versionType() == VersionType.INTERNAL + && request.version() != Versions.MATCH_ANY + && request.version() != Versions.MATCH_DELETED) { + DEPRECATION_LOGGER.deprecatedAndMaybeLog("occ_internal_version", + "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" + + " the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])", + request.index(), request.type(), request.id()); + } + } else { + if (request.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + || request.ifPrimaryTerm() != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + assert false : "ifSeqNo [" + request.ifSeqNo() + "], ifPrimaryTerm [" + request.ifPrimaryTerm() + "]"; + throw new IllegalStateException( + "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher."); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 48a81f60ee6fe..652e0b5a9c282 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.delete; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; @@ -53,7 +51,6 @@ */ public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(DeleteRequest.class)); private String type; private String id; @@ -102,8 +99,6 @@ public ActionRequestValidationException validate() { validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } @@ -286,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + "Stream version [" + out.getVersion() + "]"); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 75a8f17a64bce..65a42df04ba60 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.index; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -37,7 +36,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -75,7 +73,6 @@ * @see org.elasticsearch.client.Client#index(IndexRequest) */ public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(IndexRequest.class)); /** * Max length of the source document to include into string() @@ -200,9 +197,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } - - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } @@ -654,7 +648,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); } else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " + "Stream version [" + out.getVersion() + "]"); diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index f9747ca2e5bd0..9b8dc4033c4e2 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -174,7 +175,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final ShardId shardId = request.getShardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); - final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis); + final UpdateHelper.Result result = updateHelper.prepare( + request, indexShard, DocWriteRequest.canUseIfSeqNo(clusterService.state()), threadPool::absoluteTimeInMillis); switch (result.getResponseResult()) { case CREATED: IndexRequest upsertRequest = result.action(); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 5a4e50b4d7375..5155bc6d40229 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -22,12 +22,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -55,7 +53,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; /** @@ -66,27 +63,21 @@ public class UpdateHelper { private static final Logger logger = LogManager.getLogger(UpdateHelper.class); private final ScriptService scriptService; - private final BooleanSupplier canUseIfSeqNo; - public UpdateHelper(ScriptService scriptService, ClusterService clusterService) { - this(scriptService, () -> clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0)); - } - - UpdateHelper(ScriptService scriptService, BooleanSupplier canUseIfSeqNo) { + public UpdateHelper(ScriptService scriptService) { this.scriptService = scriptService; - this.canUseIfSeqNo = canUseIfSeqNo; } /** * Prepares an update request by converting it into an index or delete request or an update response (no action). */ - public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - if (canUseIfSeqNo.getAsBoolean() == false) { + public Result prepare(UpdateRequest request, IndexShard indexShard, boolean canUseIfSeqNo, LongSupplier nowInMillis) { + if (canUseIfSeqNo == false) { ensureIfSeqNoNotProvided(request.ifSeqNo(), request.ifPrimaryTerm()); } final GetResult getResult = indexShard.getService().getForUpdate( request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); - return prepare(indexShard.shardId(), request, getResult, nowInMillis); + return prepare(indexShard.shardId(), request, canUseIfSeqNo, getResult, nowInMillis); } /** @@ -94,7 +85,7 @@ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier * noop). */ @SuppressWarnings("unchecked") - protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) { + protected Result prepare(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, GetResult getResult, LongSupplier nowInMillis) { if (getResult.isExists() == false) { // If the document didn't exist, execute the update request as an upsert return prepareUpsert(shardId, request, getResult, nowInMillis); @@ -103,10 +94,10 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult throw new DocumentSourceMissingException(shardId, request.type(), request.id()); } else if (request.script() == null && request.doc() != null) { // The request has no script, it is a new doc that should be merged with the old document - return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop()); + return prepareUpdateIndexRequest(shardId, request, canUseIfSeqNo, getResult, request.detectNoop()); } else { // The request has a script (or empty script), execute the script and prepare a new index request - return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis); + return prepareUpdateScriptRequest(shardId, request, canUseIfSeqNo, getResult, nowInMillis); } } @@ -223,7 +214,8 @@ static String calculateParent(GetResult getResult, @Nullable IndexRequest update * Prepare the request for merging the existing document with a new one, can optionally detect a noop change. Returns a {@code Result} * containing a new {@code IndexRequest} to be executed on the primary and replicas. */ - Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) { + Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, + GetResult getResult, boolean detectNoop) { final IndexRequest currentRequest = request.doc(); final String routing = calculateRouting(getResult, currentRequest); final String parent = calculateParent(getResult, currentRequest); @@ -247,7 +239,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu .source(updatedSourceAsMap, updateSourceContentType) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { finalIndexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { finalIndexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -261,7 +253,8 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu * either a new {@code IndexRequest} or {@code DeleteRequest} (depending on the script's returned "op" value) to be executed on the * primary and replicas. */ - Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) { + Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, boolean canUseIfSeqNo, + GetResult getResult, LongSupplier nowInMillis) { final IndexRequest currentRequest = request.doc(); final String routing = calculateRouting(getResult, currentRequest); final String parent = calculateParent(getResult, currentRequest); @@ -293,7 +286,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes .source(updatedSourceAsMap, updateSourceContentType) .waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout()) .setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { indexRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { indexRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -304,7 +297,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes .type(request.type()).id(request.id()).routing(routing).parent(parent) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy()); - if (canUseIfSeqNo.getAsBoolean()) { + if (canUseIfSeqNo) { deleteRequest.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm()); } else { deleteRequest.version(calculateUpdateVersion(request, getResult)).versionType(request.versionType()); @@ -394,7 +387,7 @@ public static GetResult extractGetResult(final UpdateRequest request, String con private void ensureIfSeqNoNotProvided(long ifSeqNo, long ifPrimaryTerm) { if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { - assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]"; + assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]"; throw new IllegalStateException( "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher."); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index bd47b89b52f12..f90267eddf35d 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -161,8 +161,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("doc must be specified if doc_as_upsert is enabled", validationException); } - DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER); - return validationException; } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1ecf385b3e3ec..cb9f211e2c0af 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -582,7 +582,7 @@ protected Node( b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); - b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService(), clusterService)); + b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 4cb496c96f12f..9ef584ab0ee61 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; @@ -53,6 +54,8 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -148,7 +151,7 @@ public void testExecuteBulkIndexRequest() throws Exception { UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -175,7 +178,7 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + randomBoolean(), threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); @@ -226,7 +229,7 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { UpdateHelper updateHelper = null; WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + bulkShardRequest, shard, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); // since at least 1 item passed, the tran log location should exist, @@ -280,7 +283,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, randomBoolean(), threadPool::absoluteTimeInMillis, (update, shardId, type) -> { // There should indeed be a mapping update assertNotNull(update); @@ -297,7 +300,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(success); - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, randomBoolean(), threadPool::absoluteTimeInMillis, (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); @@ -335,7 +338,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), errorOnWait ? () -> { throw err; } : () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -376,7 +379,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -418,7 +421,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, randomBoolean(), threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -463,7 +466,7 @@ public void testNoopUpdateRequest() throws Exception { IndexShard shard = mock(IndexShard.class); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(noopUpdateResponse, DocWriteResponse.Result.NOOP, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -474,7 +477,7 @@ public void testNoopUpdateRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -509,7 +512,7 @@ public void testUpdateRequestWithFailure() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -520,7 +523,7 @@ public void testUpdateRequestWithFailure() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -558,7 +561,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -569,7 +572,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -605,7 +608,7 @@ public void testUpdateRequestWithSuccess() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, created ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -616,7 +619,7 @@ public void testUpdateRequestWithSuccess() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -651,7 +654,7 @@ public void testUpdateWithDelete() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, DocWriteResponse.Result.DELETED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -662,7 +665,7 @@ public void testUpdateWithDelete() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -688,7 +691,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { UpdateHelper updateHelper = mock(UpdateHelper.class); final ElasticsearchException err = new ElasticsearchException("oops"); - when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(err); + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenThrow(err); BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); @@ -696,7 +699,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertFalse(context.hasMoreOperationsToExecute()); @@ -729,7 +732,7 @@ public void testTranslogPositionToSync() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); while (context.hasMoreOperationsToExecute()) { - TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + TransportShardBulkAction.executeBulkItemRequest(context, null, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); } @@ -805,7 +808,7 @@ public void testRetries() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn( new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); @@ -814,7 +817,7 @@ public void testRetries() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + bulkShardRequest, shard, updateHelper, true, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); assertThat(result.location, equalTo(resultLocation)); @@ -827,6 +830,64 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); } + public void testDeprecationCASUsingVersion() throws Exception { + IndexShard shard = newStartedShard(true); + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(1, 10)]; + Set deprecatedRequestIds = new HashSet<>(); + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + if (randomBoolean()) { + writeRequest.version(randomNonNegativeLong()); + deprecatedRequestIds.add(writeRequest.id()); + } + items[i] = new BulkItemRequest(i, writeRequest); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, true, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); + closeShards(shard); + if (deprecatedRequestIds.isEmpty() == false) { + assertWarnings(deprecatedRequestIds.stream().map(id -> + "Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use the " + + "`if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [_doc], id [" + id + "])") + .toArray(String[]::new)); + } + } + + public void testRejectCASUsingSeqNo() throws Exception { + IndexShard shard = newStartedShard(true); + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(1, 10)]; + Tuple casWithSeqNo = null; + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + if (randomBoolean()) { + writeRequest.version(randomNonNegativeLong()); + } else { + writeRequest.setIfSeqNo(randomLongBetween(0, Long.MAX_VALUE)); + writeRequest.setIfPrimaryTerm(randomNonNegativeLong()); + if (casWithSeqNo == null) { + casWithSeqNo = Tuple.tuple(writeRequest.ifSeqNo(), writeRequest.ifPrimaryTerm()); + } + } + items[i] = new BulkItemRequest(i, writeRequest); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + if (casWithSeqNo != null) { + AssertionError error = expectThrows(AssertionError.class, () -> + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, false, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {})); + assertThat(error.getMessage(), equalTo("ifSeqNo [" + casWithSeqNo.v1() + "], ifPrimaryTerm [" + casWithSeqNo.v2() + "]")); + } else { + TransportShardBulkAction.performOnPrimary(bulkShardRequest, shard, null, false, + threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); + } + closeShards(shard); + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary. diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index a29348caea6aa..00676a614f98b 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -54,7 +54,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static java.util.Collections.emptyMap; @@ -77,7 +76,6 @@ public class UpdateRequestTests extends ESTestCase { private UpdateHelper updateHelper; - private final AtomicBoolean canUseIfSeqNo = new AtomicBoolean(true); @Override @Before @@ -143,7 +141,7 @@ public void setUp() throws Exception { final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap()); Map engines = Collections.singletonMap(engine.getType(), engine); ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS); - updateHelper = new UpdateHelper(scriptService, canUseIfSeqNo::get); + updateHelper = new UpdateHelper(scriptService); } public void testFromXContent() throws Exception { @@ -400,7 +398,8 @@ public void testNowInScript() throws IOException { long nowInMillis = randomNonNegativeLong(); // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", UNASSIGNED_SEQ_NO, 0, 0, false, null, null); - UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); + UpdateHelper.Result result = updateHelper.prepare( + new ShardId("test", "_na_", 0), updateRequest, randomBoolean(), getResult, () -> nowInMillis); Streamable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); IndexRequest indexAction = (IndexRequest) action; @@ -413,7 +412,8 @@ public void testNowInScript() throws IOException { .scriptedUpsert(true); // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", 0, 1, 0, true, new BytesArray("{}"), null); - UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> 42L); + UpdateHelper.Result result = updateHelper.prepare( + new ShardId("test", "_na_", 0), updateRequest, randomBoolean(), getResult, () -> 42L); Streamable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); } @@ -462,6 +462,7 @@ private void runTimeoutTest(final GetResult getResult, final UpdateRequest updat final UpdateHelper.Result result = updateHelper.prepare( new ShardId("test", "", 0), updateRequest, + randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); final Streamable action = result.action(); @@ -561,8 +562,6 @@ public void testToValidateUpsertRequestAndVersion() { updateRequest.doc("{}", XContentType.JSON); updateRequest.upsert(new IndexRequest("index","type", "id")); assertThat(updateRequest.validate().validationErrors(), contains("can't provide both upsert request and a version")); - assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + - "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [type], id [id])"); } public void testToValidateUpsertRequestWithVersion() { @@ -638,13 +637,13 @@ public void testNoopDetection() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); - UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true); + UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, true); assertThat(result.action(), instanceOf(UpdateResponse.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP)); // Try again, with detectNoop turned off - result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false); + result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, false); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo")); @@ -652,7 +651,7 @@ public void testNoopDetection() throws Exception { // Change the request to be a different doc request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"bar\"}}"))); - result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true); + result = updateHelper.prepareUpdateIndexRequest(shardId, request, randomBoolean(), getResult, true); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); @@ -669,8 +668,8 @@ public void testUpdateScript() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1") .script(mockInlineScript("ctx._source.body = \"foo\"")); - UpdateHelper.Result result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, - ESTestCase::randomNonNegativeLong); + UpdateHelper.Result result = updateHelper.prepareUpdateScriptRequest( + shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(IndexRequest.class)); assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED)); @@ -679,7 +678,7 @@ public void testUpdateScript() throws Exception { // Now where the script changes the op to "delete" request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = delete")); - result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, + result = updateHelper.prepareUpdateScriptRequest(shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(DeleteRequest.class)); @@ -693,7 +692,7 @@ public void testUpdateScript() throws Exception { request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = bad")); } - result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult, + result = updateHelper.prepareUpdateScriptRequest(shardId, request, randomBoolean(), getResult, ESTestCase::randomNonNegativeLong); assertThat(result.action(), instanceOf(UpdateResponse.class)); @@ -710,29 +709,17 @@ public void testOldClusterFallbackToUseVersion() throws Exception { UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent( createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))); - canUseIfSeqNo.set(false); - IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + IndexRequest updateUsingVersion = updateHelper.prepare(shardId, request, false, getResult, () -> randomNonNegativeLong()).action(); assertThat(updateUsingVersion.ifSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(updateUsingVersion.ifPrimaryTerm(), equalTo(UNASSIGNED_PRIMARY_TERM)); assertThat(updateUsingVersion.version(), equalTo(version)); - canUseIfSeqNo.set(true); - IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, getResult, ESTestCase::randomNonNegativeLong).action(); + IndexRequest updateUsingSeqNo = updateHelper.prepare(shardId, request, true, getResult, () -> randomNonNegativeLong()).action(); assertThat(updateUsingSeqNo.ifSeqNo(), equalTo(seqNo)); assertThat(updateUsingSeqNo.ifPrimaryTerm(), equalTo(primaryTerm)); assertThat(updateUsingSeqNo.version(), equalTo(Versions.MATCH_ANY)); } - public void testOldClusterRejectIfSeqNo() { - canUseIfSeqNo.set(false); - long ifSeqNo = randomNonNegativeLong(); - long ifPrimaryTerm = randomNonNegativeLong(); - UpdateRequest request = new UpdateRequest("test", "type1", "1").setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm); - AssertionError error = expectThrows(AssertionError.class, - () -> updateHelper.prepare(request, null, ESTestCase::randomNonNegativeLong)); - assertThat(error.getMessage(), equalTo("setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]")); - } - public void testToString() throws IOException { UpdateRequest request = new UpdateRequest("test", "type1", "1") .script(mockInlineScript("ctx._source.body = \"foo\"")); @@ -744,4 +731,13 @@ public void testToString() throws IOException { assertThat(request.toString(), equalTo("update {[test][type1][1], doc_as_upsert[false], " + "doc[index {[null][null][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}")); } + + public void testOldClusterRejectIfSeqNo() { + long ifSeqNo = randomNonNegativeLong(); + long ifPrimaryTerm = randomNonNegativeLong(); + UpdateRequest request = new UpdateRequest("test", "type1", "1").setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm); + AssertionError error = expectThrows(AssertionError.class, + () -> updateHelper.prepare(request, null, false, ESTestCase::randomNonNegativeLong)); + assertThat(error.getMessage(), equalTo("ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]")); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e6357c8a56dac..c06d8598a99cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -788,7 +788,7 @@ private TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + result = TransportShardBulkAction.performOnPrimary(request, primary, null, true, System::currentTimeMillis, noopMappingUpdater, null); } TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); From 3f5734c9c04c6f1a6046e0afec21c7dfb5bc4efe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 4 Jun 2019 15:17:40 -0400 Subject: [PATCH 6/8] fix tests --- .../java/org/elasticsearch/action/bulk/BulkRequestTests.java | 2 -- .../index/replication/IndexLevelReplicationTests.java | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 2bb64d62e739c..dc1532b8301eb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -316,8 +316,6 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept bulkRequest.add(data, null, null, xContentType); assertThat(bulkRequest.validate().validationErrors(), contains("can't provide both upsert request and a version", "can't provide version in upsert request")); - assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + - "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [index], type [type], id [id])"); } public void testBulkTerminatedByNewline() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 3bd6d452b2519..a431e60f10b1b 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -544,6 +544,8 @@ public void testRequestFailureReplication() throws Exception { assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog", indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0)); } + assertWarnings("Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. " + + "Please use the `if_seq_no` and `if_primary_term` parameters instead. (request for index [test], type [type], id [1])"); } } From 3c8a4b7e1b6517763dce1fe95afdf524e71dc158 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 4 Jun 2019 15:40:35 -0400 Subject: [PATCH 7/8] fix more test --- .../main/resources/rest-api-spec/test/bulk/60_deprecated.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml index 06d1dd1ceb92b..9b2f263ddcbfb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/60_deprecated.yml @@ -3,8 +3,8 @@ "Deprecated parameters should produce warning in Bulk query": - skip: - version: " - 6.6.99" - reason: versioned operations were deprecated in 6.7 + version: " - 6.8.0" + reason: versioned operations were deprecated in 6.7 but we won't issue deprecation unless all nodes are upgraded features: "warnings" - do: From b0b3ccbc1b4226462197d560fe757112e97bad4a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 4 Jun 2019 20:26:01 -0400 Subject: [PATCH 8/8] validate request first --- .../action/bulk/TransportShardBulkAction.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3ded2df94e51f..468e061f4e662 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -170,11 +170,10 @@ private static WritePrimaryResult performOn /** Executes bulk item requests and handles request execution exceptions */ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, - boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) - throws Exception { + boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, + CheckedRunnable waitForMappingUpdate) throws Exception { + validateDocWriteRequest(context.getCurrent(), canUseIfSeqNo); final DocWriteRequest.OpType opType = context.getCurrent().opType(); - final UpdateHelper.Result updateResult; if (opType == DocWriteRequest.OpType.UPDATE) { final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); @@ -215,7 +214,6 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe } assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - validateDocWriteRequest(context.getRequestToExecute(), canUseIfSeqNo); if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { executeDeleteRequestOnPrimary(context, mappingUpdater); } else {