From 46720f0e6a674e5b5e8fc91a6f51e5e071e46add Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 10 Jul 2018 19:10:42 -0400 Subject: [PATCH 1/9] Relax versionType check in translog With the presence of sequence number, we no longer use versionType to resolve out of order collision in replication and recovery requests. However, we can only remove the versionType from translog in 7.0+ as it is required in a mixed cluster between 6.x and 5.x. This change relaxes the versionType check when comparing two translog operations so that a 6.x can work with a 7.x (without versionType). --- .../index/engine/InternalEngine.java | 16 ------------ .../index/translog/TranslogWriter.java | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 92c64d415ad0b..dae9e6ff398c1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) { return false; } - private boolean assertVersionType(final Engine.Operation operation) { - if (operation.origin() == Operation.Origin.REPLICA || - operation.origin() == Operation.Origin.PEER_RECOVERY || - operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - // ensure that replica operation has expected version type for replication - // ensure that versionTypeForReplicationAndRecovery is idempotent - assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery() - : "unexpected version type in request from [" + operation.origin().name() + "] " + - "found [" + operation.versionType().name() + "] " + - "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]"; - } - return true; - } - private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (origin == Operation.Origin.PRIMARY) { assert assertOriginPrimarySequenceNumber(seqNo); @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); - assert assertVersionType(index); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); @@ -1096,7 +1081,6 @@ private void updateDocs(final Term uid, final List docs, public DeleteResult delete(Delete delete) throws IOException { versionMap.enforceSafeAccess(); assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); - assert assertVersionType(delete); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index b89b21c52588a..a04bf58afa99f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -40,6 +40,7 @@ import java.nio.file.StandardOpenOption; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -202,9 +203,27 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - throw new AssertionError( - "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + - "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + // we need to relax versionType from this check as versionType is removed in 7.x + final boolean sameOp; + if (prvOp instanceof Translog.Index && newOp instanceof Translog.Index) { + final Translog.Index o1 = (Translog.Index) prvOp; + final Translog.Index o2 = (Translog.Index) newOp; + sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) + && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) + && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); + } else if (prvOp instanceof Translog.Delete && newOp instanceof Translog.Delete) { + final Translog.Delete o1 = (Translog.Delete) prvOp; + final Translog.Delete o2 = (Translog.Delete) newOp; + sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) + && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); + } else { + sameOp = false; + } + if (sameOp == false) { + throw new AssertionError( + "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); + } } } else { seenSequenceNumbers.put(seqNo, From 700eee48a46a6623c4fc4456266b0d49b7c75e91 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 11 Jul 2018 10:57:26 -0400 Subject: [PATCH 2/9] Let remove this in 7.0 --- .../index/engine/InternalEngine.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dae9e6ff398c1..92c64d415ad0b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -704,6 +704,20 @@ private boolean canOptimizeAddDocument(Index index) { return false; } + private boolean assertVersionType(final Engine.Operation operation) { + if (operation.origin() == Operation.Origin.REPLICA || + operation.origin() == Operation.Origin.PEER_RECOVERY || + operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + // ensure that replica operation has expected version type for replication + // ensure that versionTypeForReplicationAndRecovery is idempotent + assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery() + : "unexpected version type in request from [" + operation.origin().name() + "] " + + "found [" + operation.versionType().name() + "] " + + "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]"; + } + return true; + } + private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (origin == Operation.Origin.PRIMARY) { assert assertOriginPrimarySequenceNumber(seqNo); @@ -743,6 +757,7 @@ public IndexResult index(Index index) throws IOException { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); + assert assertVersionType(index); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); @@ -1081,6 +1096,7 @@ private void updateDocs(final Term uid, final List docs, public DeleteResult delete(Delete delete) throws IOException { versionMap.enforceSafeAccess(); assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); + assert assertVersionType(delete); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: From e58c69eed016525e66ef931df011355c3878f0b6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 11 Jul 2018 10:36:16 -0400 Subject: [PATCH 3/9] remove versionType --- .../action/bulk/TransportShardBulkAction.java | 5 +- .../org/elasticsearch/index/VersionType.java | 15 ------ .../index/engine/InternalEngine.java | 23 --------- .../elasticsearch/index/shard/IndexShard.java | 15 +++--- .../index/translog/Translog.java | 49 ++++++------------- .../index/translog/TranslogWriter.java | 2 +- .../resync/ResyncReplicationRequestTests.java | 4 +- .../index/engine/InternalEngineTests.java | 20 ++++---- .../RecoveryDuringReplicationTests.java | 1 - .../index/shard/IndexShardTests.java | 27 +++++----- .../index/translog/TranslogTests.java | 25 +++++----- .../indices/recovery/RecoveryTests.java | 10 ++-- .../index/engine/TranslogHandler.java | 6 +-- .../index/shard/IndexShardTestCase.java | 4 +- 14 files changed, 70 insertions(+), 136 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index a78421a2328cb..15a98077eac4a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType()) .routing(indexRequest.routing()); result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), - indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(), - indexRequest.isRetry(), sourceToParse); + indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse); break; case DELETE: DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest; result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), - deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery()); + deleteRequest.type(), deleteRequest.id()); break; default: throw new IllegalStateException("Unexpected request operation type on replica: " diff --git a/server/src/main/java/org/elasticsearch/index/VersionType.java b/server/src/main/java/org/elasticsearch/index/VersionType.java index b350252dc9c41..eb05ff809076c 100644 --- a/server/src/main/java/org/elasticsearch/index/VersionType.java +++ b/server/src/main/java/org/elasticsearch/index/VersionType.java @@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) { // not allowing Versions.NOT_FOUND as it is not a valid input value. return version > 0L || version == Versions.MATCH_ANY; } - - @Override - public VersionType versionTypeForReplicationAndRecovery() { - // replicas get the version from the primary after increment. The same version is stored in - // the transaction log. -> the should use the external semantics. - return EXTERNAL; - } }, EXTERNAL((byte) 1) { @Override @@ -333,14 +326,6 @@ public byte getValue() { */ public abstract boolean validateVersionForReads(long version); - /** - * Some version types require different semantics for primary and replicas. This version allows - * the type to override the default behavior. - */ - public VersionType versionTypeForReplicationAndRecovery() { - return this; - } - public static VersionType fromString(String versionType) { if ("internal".equals(versionType)) { return INTERNAL; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 92c64d415ad0b..6f06f881791f2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) { return false; } - private boolean assertVersionType(final Engine.Operation operation) { - if (operation.origin() == Operation.Origin.REPLICA || - operation.origin() == Operation.Origin.PEER_RECOVERY || - operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - // ensure that replica operation has expected version type for replication - // ensure that versionTypeForReplicationAndRecovery is idempotent - assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery() - : "unexpected version type in request from [" + operation.origin().name() + "] " + - "found [" + operation.versionType().name() + "] " + - "expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]"; - } - return true; - } - private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { if (origin == Operation.Origin.PRIMARY) { assert assertOriginPrimarySequenceNumber(seqNo); @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); - assert assertVersionType(index); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); @@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; } versionMap.enforceSafeAccess(); - // drop out of order operations - assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity @@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List docs, public DeleteResult delete(Delete delete) throws IOException { versionMap.enforceSafeAccess(); assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); - assert assertVersionType(delete); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: @@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException { private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); - // drop out of order operations - assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" - + delete.versionType() + "]"; maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5bd8f9abc6e04..b6a9a93723c17 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -649,10 +649,10 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } - public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType, - long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) + public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, + boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry, + return applyIndexOperation(seqNo, primaryTerm, version, VersionType.EXTERNAL, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } @@ -740,9 +740,8 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String ty Engine.Operation.Origin.PRIMARY); } - public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id, - VersionType versionType) throws IOException { - return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA); + public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { + return applyDeleteOperation(seqNo, primaryTerm, version, type, id, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, @@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), - index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin, + VersionType.EXTERNAL, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), - delete.versionType().versionTypeForReplicationAndRecovery(), origin); + VersionType.EXTERNAL, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 63055d933e43e..680bf3768c3aa 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -1011,7 +1010,8 @@ public static class Index implements Operation { public static final int FORMAT_6_0 = 8; // since 6.0.0 public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 - public static final int SERIALIZATION_FORMAT = FORMAT_NO_PARENT; + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; // since 7.0 + public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String id; private final long autoGeneratedIdTimestamp; @@ -1019,7 +1019,6 @@ public static class Index implements Operation { private final long seqNo; private final long primaryTerm; private final long version; - private final VersionType versionType; private final BytesReference source; private final String routing; @@ -1034,8 +1033,9 @@ private Index(final StreamInput in) throws IOException { in.readOptionalString(); // _parent } this.version = in.readLong(); - this.versionType = VersionType.fromValue(in.readByte()); - assert versionType.validateVersionForWrites(this.version) : "invalid version for writes: " + this.version; + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // _version_type + } this.autoGeneratedIdTimestamp = in.readLong(); seqNo = in.readLong(); primaryTerm = in.readLong(); @@ -1049,15 +1049,14 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { this.seqNo = indexResult.getSeqNo(); this.primaryTerm = index.primaryTerm(); this.version = indexResult.getVersion(); - this.versionType = index.versionType(); this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp(); } public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) { - this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1); + this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, source, null, -1); } - public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType, + public Index(String type, String id, long seqNo, long primaryTerm, long version, byte[] source, String routing, long autoGeneratedIdTimestamp) { this.type = type; this.id = id; @@ -1065,7 +1064,6 @@ public Index(String type, String id, long seqNo, long primaryTerm, long version, this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; - this.versionType = versionType; this.routing = routing; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; } @@ -1110,10 +1108,6 @@ public long version() { return this.version; } - public VersionType versionType() { - return versionType; - } - @Override public Source getSource() { return new Source(source, routing); @@ -1126,8 +1120,6 @@ private void write(final StreamOutput out) throws IOException { out.writeBytesReference(source); out.writeOptionalString(routing); out.writeLong(version); - - out.writeByte(versionType.getValue()); out.writeLong(autoGeneratedIdTimestamp); out.writeLong(seqNo); out.writeLong(primaryTerm); @@ -1149,7 +1141,6 @@ public boolean equals(Object o) { primaryTerm != index.primaryTerm || id.equals(index.id) == false || type.equals(index.type) == false || - versionType != index.versionType || autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp || source.equals(index.source) == false) { return false; @@ -1168,7 +1159,6 @@ public int hashCode() { result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + versionType.hashCode(); result = 31 * result + source.hashCode(); result = 31 * result + (routing != null ? routing.hashCode() : 0); result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp); @@ -1194,14 +1184,14 @@ public long getAutoGeneratedIdTimestamp() { public static class Delete implements Operation { private static final int FORMAT_6_0 = 4; // 6.0 - * - public static final int SERIALIZATION_FORMAT = FORMAT_6_0; + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_6_0 + 1; // since 7.0 + public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String type, id; private final Term uid; private final long seqNo; private final long primaryTerm; private final long version; - private final VersionType versionType; private Delete(final StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT @@ -1210,29 +1200,29 @@ private Delete(final StreamInput in) throws IOException { id = in.readString(); uid = new Term(in.readString(), in.readBytesRef()); this.version = in.readLong(); - this.versionType = VersionType.fromValue(in.readByte()); - assert versionType.validateVersionForWrites(this.version); + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // versionType + } seqNo = in.readLong(); primaryTerm = in.readLong(); } public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { - this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion(), delete.versionType()); + this(delete.type(), delete.id(), delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion()); } /** utility for testing */ public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) { - this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL); + this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY); } - public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) { + public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version) { this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); this.uid = uid; this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.version = version; - this.versionType = versionType; } @Override @@ -1271,10 +1261,6 @@ public long version() { return this.version; } - public VersionType versionType() { - return this.versionType; - } - @Override public Source getSource() { throw new IllegalStateException("trying to read doc source from delete operation"); @@ -1287,7 +1273,6 @@ private void write(final StreamOutput out) throws IOException { out.writeString(uid.field()); out.writeBytesRef(uid.bytes()); out.writeLong(version); - out.writeByte(versionType.getValue()); out.writeLong(seqNo); out.writeLong(primaryTerm); } @@ -1306,8 +1291,7 @@ public boolean equals(Object o) { return version == delete.version && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && - uid.equals(delete.uid) && - versionType == delete.versionType; + uid.equals(delete.uid); } @Override @@ -1316,7 +1300,6 @@ public int hashCode() { result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); - result = 31 * result + versionType.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index a04bf58afa99f..8bafb0bb51c31 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -203,7 +203,7 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - // we need to relax versionType from this check as versionType is removed in 7.x + // we need to exclude versionType from this check because it's removed in 7.0 final boolean sameOp; if (prvOp instanceof Translog.Index && newOp instanceof Translog.Index) { final Translog.Index o1 = (Translog.Index) prvOp; diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index 914c2b87422db..15b8e1c99d266 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -21,9 +21,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.Index; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; @@ -38,7 +36,7 @@ public class ResyncReplicationRequestTests extends ESTestCase { public void testSerialization() throws IOException { final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), - Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); + randomNonNegativeLong(), bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index}); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2e89a66805ce1..f2dd5e89f9e74 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1183,7 +1183,7 @@ public void testVersioningNewCreate() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + VersionType.EXTERNAL, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1197,7 +1197,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + VersionType.EXTERNAL, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); assertTrue(indexResult.isCreated()); @@ -1216,7 +1216,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(), - update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + VersionType.EXTERNAL, REPLICA, 0, -1, false); updateResult = replicaEngine.index(update); assertThat(updateResult.getVersion(), equalTo(2L)); assertFalse(updateResult.isCreated()); @@ -1269,7 +1269,7 @@ public void testVersioningNewIndex() throws IOException { Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, 0, -1, false); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1418,7 +1418,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? VersionType.EXTERNAL : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis(), -1, false ); @@ -1427,7 +1427,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? VersionType.EXTERNAL : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } @@ -3221,7 +3221,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3235,7 +3235,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertEquals(1, topDocs.totalHits); } - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); replicaEngine.refresh("test"); @@ -3255,7 +3255,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() Engine.IndexResult result = engine.index(firstIndexRequest); assertThat(result.getVersion(), equalTo(1L)); - Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica); assertThat(indexReplicaResult.getVersion(), equalTo(1L)); @@ -3269,7 +3269,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertEquals(1, topDocs.totalHits); } - Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(secondIndexRequestReplica); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index ee97ba14fe09e..f01d5e54a2e16 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -186,7 +186,6 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { remainingReplica.applyIndexOperationOnReplica( remainingReplica.getLocalCheckpoint() + 1, 1, - VersionType.EXTERNAL, randomNonNegativeLong(), false, SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ac52378fc6b9d..2e07ec950a572 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -72,7 +72,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -1545,17 +1544,17 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { * - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed. */ final IndexShard shard = newStartedShard(false); - shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id", VersionType.EXTERNAL); + shard.applyDeleteOperationOnReplica(1, 2, "_doc", "id"); shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation - shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id", new BytesArray("{}"), XContentType.JSON)); - shard.applyIndexOperationOnReplica(3, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(3, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-3", new BytesArray("{}"), XContentType.JSON)); // Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery. shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - shard.applyIndexOperationOnReplica(2, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(2, 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-2", new BytesArray("{}"), XContentType.JSON)); - shard.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(shard.shardId().getIndexName(), "_doc", "id-5", new BytesArray("{}"), XContentType.JSON)); final int translogOps; @@ -1646,8 +1645,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { updateMappings(otherShard, shard.indexSettings().getIndexMetaData()); SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), "_doc", "1", new BytesArray("{}"), XContentType.JSON); - otherShard.applyIndexOperationOnReplica(1, 1, - VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + otherShard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); final ShardRouting primaryShardRouting = shard.routingEntry(); IndexShard newShard = reinitShard(otherShard, ShardRoutingHelper.initWithSameId(primaryShardRouting, @@ -1763,18 +1761,18 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { final IndexShard shard = newStartedShard(false); final String indexName = shard.shardId().getIndexName(); // Index #0, index #1 - shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-0", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here. - shard.applyIndexOperationOnReplica(1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(1, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); // Simulate resync (without rollback): Noop #1, index #2 acquireReplicaOperationPermitBlockingly(shard, shard.primaryTerm + 1); shard.markSeqNoAsNoop(1, "test"); - shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + shard.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "_doc", "doc-2", new BytesArray("{}"), XContentType.JSON)); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); @@ -2104,11 +2102,11 @@ public void testRecoverFromTranslog() throws IOException { int numCorruptEntries = 0; for (int i = 0; i < numTotalEntries; i++) { if (randomBoolean()) { - operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, + operations.add(new Translog.Index("_doc", "1", 0, primary.getPrimaryTerm(), 1, "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1)); } else { // corrupt entry - operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, + operations.add(new Translog.Index("_doc", "2", 1, primary.getPrimaryTerm(), 1, "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1)); numCorruptEntries++; } @@ -2603,8 +2601,7 @@ private Result indexOnReplicaWithGaps( final String id = Integer.toString(i); SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id, new BytesArray("{}"), XContentType.JSON); - indexShard.applyIndexOperationOnReplica(i, - 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + indexShard.applyIndexOperationOnReplica(i, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); if (!gap && i == localCheckpoint + 1) { localCheckpoint++; } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index cf6e753684676..79b20f21be0d0 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -416,9 +416,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(163L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(162L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(163L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -426,9 +426,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(212L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(210L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(212L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -436,9 +436,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(261L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(258L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(261L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -446,13 +446,13 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(303L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(300L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(303L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } - final long expectedSizeInBytes = 358L; + final long expectedSizeInBytes = 355L; translog.rollGeneration(); { final TranslogStats stats = stats(); @@ -725,14 +725,12 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { assertEquals(expIndexOp.type(), indexOp.type()); assertEquals(expIndexOp.source(), indexOp.source()); assertEquals(expIndexOp.version(), indexOp.version()); - assertEquals(expIndexOp.versionType(), indexOp.versionType()); break; case DELETE: Translog.Delete delOp = (Translog.Delete) op; Translog.Delete expDelOp = (Translog.Delete) expectedOp; assertEquals(expDelOp.uid(), delOp.uid()); assertEquals(expDelOp.version(), delOp.version()); - assertEquals(expDelOp.versionType(), delOp.versionType()); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) op; @@ -1478,7 +1476,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " + + assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3025, " + "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage()); } @@ -1842,8 +1840,7 @@ public void run() { new Term("_uid", threadId + "_" + opCount), seqNoGenerator.getAndIncrement(), primaryTerm.get(), - 1 + randomInt(100000), - randomFrom(VersionType.values())); + 1 + randomInt(100000)); break; case NO_OP: op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 537409f35d175..e7606328c7665 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -122,22 +122,22 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { final String indexName = orgReplica.shardId().getIndexName(); // delete #1 - orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL); + orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id"); getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation // index #0 - orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(0, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON)); // index #3 - orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(3, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON)); // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); // index #2 - orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(2, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON)); orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); // index #5 -> force NoOp #4. - orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + orgReplica.applyIndexOperationOnReplica(5, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON)); final int translogOps; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 53fe89ac17ea5..26f95eeb4064e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.analysis.NamedAnalyzer; @@ -124,14 +125,13 @@ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine. source(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())) .routing(index.routing()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, + index.version(), VersionType.EXTERNAL, origin, index.getAutoGeneratedIdTimestamp(), true); return engineIndex; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); + delete.primaryTerm(), delete.version(), VersionType.EXTERNAL, origin, System.nanoTime()); return engineDelete; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5a8e91841c5a7..e4849be20e16e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -573,7 +573,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, shard.getLocalCheckpoint()); } else { result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0, - VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { throw new TransportReplicationAction.RetryOnReplicaException(shard.shardId, "Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate()); @@ -591,7 +591,7 @@ protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id if (shard.routingEntry().primary()) { return shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL); } else { - return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id, VersionType.EXTERNAL); + return shard.applyDeleteOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0L, type, id); } } From badbf44fc0b507762d669fb99e58fffa01037226 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 17:06:51 -0400 Subject: [PATCH 4/9] non-primary should not have version_type --- .../elasticsearch/index/engine/Engine.java | 4 +++ .../index/engine/InternalEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 12 ++++---- .../index/engine/InternalEngineTests.java | 30 +++++++++---------- .../index/engine/EngineTestCase.java | 9 ++---- .../index/engine/TranslogHandler.java | 6 ++-- 6 files changed, 31 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8a560e02fe449..0fc47f15e38cd 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1168,6 +1168,8 @@ public static class Index extends Operation { public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, long autoGeneratedIdTimestamp, boolean isRetry) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + assert (origin == Origin.PRIMARY && versionType != null) || (origin != Origin.PRIMARY && versionType == null) : + "invalid version_type=" + versionType + " for origin=" + origin; this.doc = doc; this.isRetry = isRetry; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; @@ -1245,6 +1247,8 @@ public static class Delete extends Operation { public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + assert (origin == Origin.PRIMARY && versionType != null) || (origin != Origin.PRIMARY && versionType == null) : + "invalid version_type=" + versionType + " for origin=" + origin; this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6f06f881791f2..bdcfb2fc7313f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -691,7 +691,7 @@ private boolean canOptimizeAddDocument(Index index) { return true; case PEER_RECOVERY: case REPLICA: - assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL + assert index.version() == 1 && index.versionType() == null : "version: " + index.version() + " type: " + index.versionType(); return true; case LOCAL_TRANSLOG_RECOVERY: diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b6a9a93723c17..0e28105f778cf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -652,11 +652,11 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse) throws IOException { - return applyIndexOperation(seqNo, primaryTerm, version, VersionType.EXTERNAL, autoGeneratedTimeStamp, isRetry, + return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse); } - private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType, + private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; @@ -741,11 +741,11 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String ty } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException { - return applyDeleteOperation(seqNo, primaryTerm, version, type, id, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA); + return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA); } private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, - VersionType versionType, Engine.Operation.Origin origin) throws IOException { + @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; assert versionType.validateVersionForWrites(version); ensureWriteAllowed(origin); @@ -1210,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(), - VersionType.EXTERNAL, index.getAutoGeneratedIdTimestamp(), true, origin, + null, index.getAutoGeneratedIdTimestamp(), true, origin, source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())).routing(index.routing())); break; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(), - VersionType.EXTERNAL, origin); + null, origin); break; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f2dd5e89f9e74..87b63dfdef832 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1183,7 +1183,7 @@ public void testVersioningNewCreate() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - VersionType.EXTERNAL, REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1197,7 +1197,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - VersionType.EXTERNAL, REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); assertTrue(indexResult.isCreated()); @@ -1216,7 +1216,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(), - VersionType.EXTERNAL, REPLICA, 0, -1, false); + null, REPLICA, 0, -1, false); updateResult = replicaEngine.index(update); assertThat(updateResult.getVersion(), equalTo(2L)); assertFalse(updateResult.isCreated()); @@ -1269,7 +1269,7 @@ public void testVersioningNewIndex() throws IOException { Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, 0, -1, false); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, 0, -1, false); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1418,7 +1418,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? VersionType.EXTERNAL : versionType, + forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis(), -1, false ); @@ -1427,7 +1427,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, - forReplica ? VersionType.EXTERNAL : versionType, + forReplica ? null : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); } @@ -3221,7 +3221,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -3235,7 +3235,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep assertEquals(1, topDocs.totalHits); } - index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); indexResult = replicaEngine.index(index); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); replicaEngine.refresh("test"); @@ -3255,7 +3255,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() Engine.IndexResult result = engine.index(firstIndexRequest); assertThat(result.getVersion(), equalTo(1L)); - Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica); assertThat(indexReplicaResult.getVersion(), equalTo(1L)); @@ -3269,7 +3269,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() assertEquals(1, topDocs.totalHits); } - Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), VersionType.EXTERNAL, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), null, REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); replicaEngine.index(secondIndexRequestReplica); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { @@ -3292,7 +3292,7 @@ public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final l } public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) { - return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, null, Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry); } @@ -3694,7 +3694,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio sequenceNumberSupplier.getAsLong(), 1, i, - VersionType.EXTERNAL, + origin == PRIMARY ? VersionType.EXTERNAL : null, origin, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, @@ -3708,7 +3708,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio sequenceNumberSupplier.getAsLong(), 1, i, - VersionType.EXTERNAL, + origin == PRIMARY ? VersionType.EXTERNAL : null, origin, System.nanoTime()); operations.add(delete); @@ -3928,7 +3928,7 @@ public void markSeqNoAsCompleted(long seqNo) { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); final Term uid = newUid(doc); final long time = System.nanoTime(); - actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false)); + actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false)); if (rarely()) { actualEngine.rollTranslogGeneration(); } @@ -4686,7 +4686,7 @@ public void testTrimUnsafeCommits() throws Exception { for (int i = 0; i < seqNos.size(); i++) { ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, - 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + 1, null, REPLICA, System.nanoTime(), -1, false); engine.index(index); if (randomBoolean()) { engine.flush(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index a23e29b0bcd6b..f26522245493f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -52,7 +52,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapping; @@ -493,14 +492,12 @@ protected Engine.Index indexForDoc(ParsedDocument doc) { protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { - return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, startTime); + return new Engine.Delete("test", id, newUid(id), seqNo, 1, version, null, Engine.Operation.Origin.REPLICA, startTime); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 26f95eeb4064e..9999a3b3748f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.analysis.NamedAnalyzer; @@ -125,13 +124,12 @@ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine. source(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source())) .routing(index.routing()), index.seqNo(), index.primaryTerm(), - index.version(), VersionType.EXTERNAL, origin, - index.getAutoGeneratedIdTimestamp(), true); + index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true); return engineIndex; case DELETE: final Translog.Delete delete = (Translog.Delete) operation; final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), VersionType.EXTERNAL, origin, System.nanoTime()); + delete.primaryTerm(), delete.version(), null, origin, System.nanoTime()); return engineDelete; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; From 5b07b1a26f93a4a124386b29b83609149e0c8956 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 17:22:07 -0400 Subject: [PATCH 5/9] assert write version on primary --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0e28105f778cf..b07e22875e81f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -645,6 +645,7 @@ private IndexShardState changeState(IndexShardState newState, String reason) { public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse, long autoGeneratedTimestamp, boolean isRetry) throws IOException { + assert versionType.validateVersionForWrites(version); return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse); } @@ -660,7 +661,6 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); ensureWriteAllowed(origin); Engine.Index operation; try { @@ -736,6 +736,7 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) { public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType) throws IOException { + assert versionType.validateVersionForWrites(version); return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.PRIMARY); } @@ -747,7 +748,6 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long versio private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException { assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; - assert versionType.validateVersionForWrites(version); ensureWriteAllowed(origin); // When there is a single type, the unique identifier is only composed of the _id, // so there is no way to differenciate foo#1 from bar#1. This is especially an issue From 15e3c4d521c72aa2be07c25b3d147f4efaa9323c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 17:31:18 -0400 Subject: [PATCH 6/9] use operation equals method --- .../index/translog/TranslogWriter.java | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 8bafb0bb51c31..c135facc67f5f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -40,7 +40,6 @@ import java.nio.file.StandardOpenOption; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -203,23 +202,7 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput())); Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput())); - // we need to exclude versionType from this check because it's removed in 7.0 - final boolean sameOp; - if (prvOp instanceof Translog.Index && newOp instanceof Translog.Index) { - final Translog.Index o1 = (Translog.Index) prvOp; - final Translog.Index o2 = (Translog.Index) newOp; - sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) - && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) - && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); - } else if (prvOp instanceof Translog.Delete && newOp instanceof Translog.Delete) { - final Translog.Delete o1 = (Translog.Delete) prvOp; - final Translog.Delete o2 = (Translog.Delete) newOp; - sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) - && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version(); - } else { - sameOp = false; - } - if (sameOp == false) { + if (newOp.equals(prvOp) == false) { throw new AssertionError( "seqNo [" + seqNo + "] was processed twice in generation [" + generation + "], with different data. " + "prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2()); From d3780eeee5cd35121ced39bf243e8e1d46fe7ef3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Jul 2018 20:38:48 -0400 Subject: [PATCH 7/9] reserve some versions for 6.x --- .../java/org/elasticsearch/index/translog/Translog.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 680bf3768c3aa..e0df5cd8da83b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1009,8 +1009,9 @@ public Source(BytesReference source, String routing) { public static class Index implements Operation { public static final int FORMAT_6_0 = 8; // since 6.0.0 - public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 - public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; // since 7.0 + public static final int FORMAT_7_0 = FORMAT_6_0 + 5; // reserve some versions for 6.x + public static final int FORMAT_NO_PARENT = FORMAT_7_0 + 1; + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String id; @@ -1184,7 +1185,8 @@ public long getAutoGeneratedIdTimestamp() { public static class Delete implements Operation { private static final int FORMAT_6_0 = 4; // 6.0 - * - public static final int FORMAT_NO_VERSION_TYPE = FORMAT_6_0 + 1; // since 7.0 + private static final int FORMAT_7_0 = FORMAT_6_0 + 5; // reserve some versions for 6.x + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 1; public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String type, id; From b891b6bed3db6c09fdc87b6569c2534b32aa7da0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 09:19:49 -0400 Subject: [PATCH 8/9] bwc format --- .../index/translog/Translog.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index e0df5cd8da83b..31404b7874a92 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -1009,8 +1010,7 @@ public Source(BytesReference source, String routing) { public static class Index implements Operation { public static final int FORMAT_6_0 = 8; // since 6.0.0 - public static final int FORMAT_7_0 = FORMAT_6_0 + 5; // reserve some versions for 6.x - public static final int FORMAT_NO_PARENT = FORMAT_7_0 + 1; + public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; @@ -1115,12 +1115,16 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { - out.writeVInt(SERIALIZATION_FORMAT); + final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0; + out.writeVInt(format); out.writeString(id); out.writeString(type); out.writeBytesReference(source); out.writeOptionalString(routing); out.writeLong(version); + if (format < FORMAT_NO_VERSION_TYPE) { + out.writeByte(VersionType.EXTERNAL.getValue()); + } out.writeLong(autoGeneratedIdTimestamp); out.writeLong(seqNo); out.writeLong(primaryTerm); @@ -1185,8 +1189,8 @@ public long getAutoGeneratedIdTimestamp() { public static class Delete implements Operation { private static final int FORMAT_6_0 = 4; // 6.0 - * - private static final int FORMAT_7_0 = FORMAT_6_0 + 5; // reserve some versions for 6.x - public static final int FORMAT_NO_VERSION_TYPE = FORMAT_7_0 + 1; + public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 + public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; public static final int SERIALIZATION_FORMAT = FORMAT_NO_VERSION_TYPE; private final String type, id; @@ -1269,12 +1273,16 @@ public Source getSource() { } private void write(final StreamOutput out) throws IOException { - out.writeVInt(SERIALIZATION_FORMAT); + final int format = out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) ? SERIALIZATION_FORMAT : FORMAT_6_0; + out.writeVInt(format); out.writeString(type); out.writeString(id); out.writeString(uid.field()); out.writeBytesRef(uid.bytes()); out.writeLong(version); + if (format < FORMAT_NO_VERSION_TYPE) { + out.writeByte(VersionType.EXTERNAL.getValue()); + } out.writeLong(seqNo); out.writeLong(primaryTerm); } From 5ac734f86970f8a7bf70bb69aee64ca097fe73c3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 09:38:20 -0400 Subject: [PATCH 9/9] simplify assertion --- .../main/java/org/elasticsearch/index/engine/Engine.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0fc47f15e38cd..53a7baa60f6ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1168,8 +1168,7 @@ public static class Index extends Operation { public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, long autoGeneratedIdTimestamp, boolean isRetry) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); - assert (origin == Origin.PRIMARY && versionType != null) || (origin != Origin.PRIMARY && versionType == null) : - "invalid version_type=" + versionType + " for origin=" + origin; + assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; this.doc = doc; this.isRetry = isRetry; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; @@ -1247,8 +1246,7 @@ public static class Delete extends Operation { public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); - assert (origin == Origin.PRIMARY && versionType != null) || (origin != Origin.PRIMARY && versionType == null) : - "invalid version_type=" + versionType + " for origin=" + origin; + assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); }