From 60aa1248043673af9e17c12f294aedb8f3bb1ad2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 13 Sep 2018 14:09:51 -0400 Subject: [PATCH 1/7] Propagate max_auto_id_timestamp in peer recovery Today we don't store the auto-generated timestamp of append-only operations in Lucene; and assign -1 to every index operations constructed from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica if a retry append-only arrives first via peer-recovery; then an original append-only arrives via replication. Since the retry append-only (delivered via recovery) does not have timestamp, the replica will happily optimizes the original request while it should not. This change transmits the max auto-generated timestamp from the primary to replicas before translog phase in peer recovery. This timestamp will prevent replicas from optimizing append-only requests if retry counterparts have been processed. --- .../elasticsearch/index/engine/Engine.java | 11 +++++++++ .../index/engine/InternalEngine.java | 15 ++++++++++-- .../index/engine/ReadOnlyEngine.java | 11 +++++++++ .../elasticsearch/index/shard/IndexShard.java | 15 ++++++++++++ .../recovery/PeerRecoveryTargetService.java | 2 +- .../recovery/RecoverySourceHandler.java | 6 +++-- .../indices/recovery/RecoveryTarget.java | 4 +++- .../recovery/RecoveryTargetHandler.java | 8 +++---- .../RecoveryTranslogOperationsRequest.java | 19 ++++++++++++++- .../recovery/RemoteRecoveryTargetHandler.java | 4 ++-- .../IndexLevelReplicationTests.java | 24 ++++++++++++++++--- .../RecoveryDuringReplicationTests.java | 15 +++++++----- .../index/shard/IndexShardTests.java | 15 +++++++----- .../recovery/RecoverySourceHandlerTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 13 ++++++++++ 15 files changed, 135 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index fc693113fee53..7d07477c61000 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1690,6 +1690,17 @@ public boolean isRecovering() { */ public abstract void maybePruneDeletes(); + /** + * Returns the maximum auto-generated timestamp of append-only requests has been processed by this engine. + */ + public abstract long getMaxAutoIdTimestamp(); + + /** + * Sets the maximum auto-generated timestamp of append-only requests tracked by this engine to {@code newTimestamp}. + * The update only takes effect if the current timestamp is smaller the new given parameter. + */ + public abstract void updateMaxAutoIdTimestamp(long newTimestamp); + @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 52dd4d3fcd09e..e3d9c9a124a60 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1014,8 +1014,7 @@ private boolean mayHaveBeenIndexedBefore(Index index) { final boolean mayHaveBeenIndexBefore; if (index.isRetry()) { mayHaveBeenIndexBefore = true; - maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr)); - assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + updateMaxAutoIdTimestamp(index.getAutoGeneratedIdTimestamp()); } else { // in this case we force mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); @@ -2531,4 +2530,16 @@ void updateRefreshedCheckpoint(long checkpoint) { assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; } } + + @Override + public long getMaxAutoIdTimestamp() { + return maxUnsafeAutoIdTimestamp.get(); + } + + @Override + public void updateMaxAutoIdTimestamp(long newTimestamp) { + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + assert newTimestamp <= maxUnsafeAutoIdTimestamp.get(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b958bd84b76a6..c87ab0bcd8e78 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -28,6 +28,7 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.internal.io.IOUtils; @@ -365,4 +366,14 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) { @Override public void maybePruneDeletes() { } + + @Override + public long getMaxAutoIdTimestamp() { + return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } + + @Override + public void updateMaxAutoIdTimestamp(long newTimestamp) { + + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 91d87b0008214..8671fdd567e98 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1244,6 +1244,21 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo); } + /** + * Sets the maximum auto-generated timestamp of append-only requests tracked by this shard to {@code newTimestamp}. + * The update only takes effect if the current timestamp is smaller the new given parameter. + */ + public void updateMaxAutoIdTimestamp(long newTimestamp) { + getEngine().updateMaxAutoIdTimestamp(newTimestamp); + } + + /** + * Returns the maximum auto-generated timestamp of append-only requests has been processed by this shard. + */ + public long getMaxAutoIdTimestamp() { + return getEngine().getMaxAutoIdTimestamp(); + } + public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index aaa4697e5cbb5..144b0dec3af31 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -455,7 +455,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final RecoveryTarget recoveryTarget = recoveryRef.target(); try { - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); + recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), request.maxAutoIdTimestamp()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 220abf43124ab..da35bb157c75f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -201,6 +201,8 @@ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); + // DISCUSS: Is it possible to have an operation gets delivered via recovery first, then delivered via replication? + // If this is the case, we need to propagate the max_timestamp of all append-only, not only retry requests. final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all @@ -551,8 +553,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require logger.trace("no translog operations to send"); } - final CancellableThreads.IOInterruptable sendBatch = - () -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); + final CancellableThreads.IOInterruptable sendBatch = () -> + targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, shard.getMaxAutoIdTimestamp())); // send operations in batches Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e28b01c8a6187..aeaca69d5d78b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -386,13 +386,15 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); if (indexShard().state() != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } + indexShard().updateMaxAutoIdTimestamp(maxAutoIdTimestamp); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 4e728a72b300f..cc2ff71a973b1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -59,12 +59,12 @@ public interface RecoveryTargetHandler { /** * Index a set of translog operations on the target - * @param operations operations to index - * @param totalTranslogOps current number of total operations expected to be indexed - * + * @param operations operations to index + * @param totalTranslogOps current number of total operations expected to be indexed + * @param maxAutoIdTimestamp the maximum auto-generated timestamp from the primary shard * @return the local checkpoint on the target shard */ - long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException; + long indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index be399e0f81fd0..c87d3e53b856e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private ShardId shardId; private List operations; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; + private long maxAutoIdTimestamp; public RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, int totalTranslogOps) { + RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, + int totalTranslogOps, long maxAutoIdTimestamp) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; + this.maxAutoIdTimestamp = maxAutoIdTimestamp; } public long recoveryId() { @@ -61,6 +66,10 @@ public int totalTranslogOps() { return totalTranslogOps; } + public long maxAutoIdTimestamp() { + return maxAutoIdTimestamp; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -68,6 +77,11 @@ public void readFrom(StreamInput in) throws IOException { shardId = ShardId.readShardId(in); operations = Translog.readOperations(in, "recovery"); totalTranslogOps = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + maxAutoIdTimestamp = in.readZLong(); + } else { + maxAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } } @Override @@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); Translog.writeOperations(out, operations); out.writeVInt(totalTranslogOps); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeZLong(maxAutoIdTimestamp); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index edf17595350c4..2c50117c1cd30 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp) { final RecoveryTranslogOperationsRequest translogOperationsRequest = - new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps); + new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxAutoIdTimestamp); final TransportFuture future = transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f2cdfbf8fc566..a3491ba1f8782 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -141,10 +142,26 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } } + public void testRetryAppendOnlyWhileRecovering() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + final IndexRequest originalRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + originalRequest.process(Version.CURRENT, null, index.getName()); + final IndexRequest retryRequest = copyIndexRequest(originalRequest); + retryRequest.onRetry(); + shards.index(retryRequest); + IndexShard replica = shards.addReplica(); + shards.recoverReplica(replica); + shards.assertAllEqual(1); + shards.index(originalRequest); + shards.assertAllEqual(1); + assertThat(replica.getMaxAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp())); + assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp())); + } + } + public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { - //TODO: Enables this test with soft-deletes once we have timestamp - Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); - try (ReplicationGroup shards = createGroup(0, settings)) { + try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); indexRequest.onRetry(); // force an update of the timestamp @@ -161,6 +178,7 @@ public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp()); assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp()); assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp()); + assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp())); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index a73d7385d9d4d..dbb9c2ed7bb34 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -488,9 +488,10 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { opsSent.set(true); - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); } }; }); @@ -557,7 +558,8 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public long indexTranslogOperations(final List operations, final int totalTranslogOps) + public long indexTranslogOperations(final List operations, final int totalTranslogOps, + final long maxAutoIdTimestamp) throws IOException { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); @@ -585,7 +587,7 @@ public long indexTranslogOperations(final List operations, f } catch (InterruptedException e) { throw new AssertionError(e); } - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -671,11 +673,12 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9a5df39a970a9..12d54aea2dcb4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2197,8 +2197,9 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); assertFalse(replica.isSyncNeeded()); return localCheckpoint; } @@ -2304,8 +2305,9 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); // Shard should now be active since we did recover: assertTrue(replica.isActive()); return localCheckpoint; @@ -2351,8 +2353,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); assertListenerCalled.accept(replica); return localCheckpoint; } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 0f7a72aacf3f0..03f06ef48b6d8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -211,7 +211,7 @@ public Translog.Operation next() throws IOException { final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); - verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), ArgumentCaptor.forClass(Long.class).capture()); List shippedOps = new ArrayList<>(); for (List list: shippedOpsCaptor.getAllValues()) { shippedOps.addAll(list); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 5f0909db0d3f0..f3e8e4e57a53f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -55,6 +55,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -137,6 +139,17 @@ protected IndexMetaData buildIndexMetaData(int replicas, Settings indexSettings, return metaData.build(); } + protected IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException { + final IndexRequest outRequest = new IndexRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + inRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + outRequest.readFrom(in); + } + } + return outRequest; + } + protected DiscoveryNode getDiscoveryNode(String id) { return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); From 27cf199b9fb0079857d9d308b5f0ddc18e75c062 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Sep 2018 01:03:13 -0400 Subject: [PATCH 2/7] add testAppendOnlyRecoveryThenReplication --- .../IndexLevelReplicationTests.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index a3491ba1f8782..d49ff941e8942 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -160,6 +160,61 @@ public void testRetryAppendOnlyWhileRecovering() throws Exception { } } + public void testAppendOnlyRecoveryThenReplication() throws Exception { + CountDownLatch indexedOnPrimary = new CountDownLatch(1); + CountDownLatch recoveryDone = new CountDownLatch(1); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1)) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return config -> new InternalEngine(config) { + @Override + public IndexResult index(Index op) throws IOException { + IndexResult result = super.index(op); + if (op.origin() == Operation.Origin.PRIMARY) { + indexedOnPrimary.countDown(); + // prevent the indexing on the primary from returning (it was added to Lucene and translog already) + // to make sure that this operation is replicated to the replica via recovery, then via replication. + try { + recoveryDone.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + return result; + } + }; + } + }) { + shards.startAll(); + Thread thread = new Thread(() -> { + IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + try { + shards.index(indexRequest); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + thread.start(); + IndexShard replica = shards.addReplica(); + Future fut = shards.asyncRecoverReplica(replica, + (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ + @Override + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + try { + indexedOnPrimary.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + } + }); + fut.get(); + recoveryDone.countDown(); + thread.join(); + shards.assertAllEqual(1); + } + } + public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); From d08cdd053269b4b84bc8ef655c87edd48eddd0c5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Sep 2018 21:14:42 -0400 Subject: [PATCH 3/7] use max_seen_timestamp --- .../index/engine/AutoIdTimestamp.java | 49 +++++++++++++++++++ .../elasticsearch/index/engine/Engine.java | 15 ++++-- .../index/engine/InternalEngine.java | 33 +++++++------ .../index/engine/ReadOnlyEngine.java | 8 +-- .../elasticsearch/index/shard/IndexShard.java | 22 ++++++--- .../recovery/PeerRecoveryTargetService.java | 3 +- .../recovery/RecoverySourceHandler.java | 19 ++++--- .../indices/recovery/RecoveryTarget.java | 4 +- .../recovery/RecoveryTargetHandler.java | 9 ++-- .../RecoveryTranslogOperationsRequest.java | 16 +++--- .../recovery/RemoteRecoveryTargetHandler.java | 4 +- .../index/engine/InternalEngineTests.java | 2 + .../IndexLevelReplicationTests.java | 4 +- .../RecoveryDuringReplicationTests.java | 4 +- .../index/shard/IndexShardTests.java | 4 +- .../recovery/RecoverySourceHandlerTests.java | 7 +-- .../ESIndexLevelReplicationTestCase.java | 3 ++ 17 files changed, 137 insertions(+), 69 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java b/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java new file mode 100644 index 0000000000000..f524c77b9c521 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tracks auto_id_timestamp of append-only index requests have been processed in an {@link Engine}. + */ +final class AutoIdTimestamp { + private final AtomicLong maxUnsafeTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeenTimestamp = new AtomicLong(-1); + + void onNewTimestamp(long newTimestamp, boolean unsafe) { + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + maxSeenTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + assert maxSeenTimestamp.get() >= newTimestamp; + if (unsafe) { + maxUnsafeTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + assert maxUnsafeTimestamp.get() >= newTimestamp; + } + assert maxUnsafeTimestamp.get() <= maxSeenTimestamp.get(); + } + + long maxUnsafeTimestamp() { + return maxUnsafeTimestamp.get(); + } + + long maxSeenTimestamp() { + return maxSeenTimestamp.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7d07477c61000..2ee04cfe0c1ea 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.Accountables; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; @@ -1691,15 +1692,19 @@ public boolean isRecovering() { public abstract void maybePruneDeletes(); /** - * Returns the maximum auto-generated timestamp of append-only requests has been processed by this engine. + * Returns the maximum auto_id_timestamp of all append-only have been processed (or force-updated) by this engine. + * Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp. + * @see #forceUpdateMaxUnsafeAutoIdTimestamp(long) */ - public abstract long getMaxAutoIdTimestamp(); + public long getMaxSeenAutoIdTimestamp() { + return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } /** - * Sets the maximum auto-generated timestamp of append-only requests tracked by this engine to {@code newTimestamp}. - * The update only takes effect if the current timestamp is smaller the new given parameter. + * Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp. + * The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}. */ - public abstract void updateMaxAutoIdTimestamp(long newTimestamp); + public abstract void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp); @FunctionalInterface public interface TranslogRecoveryRunner { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e3d9c9a124a60..a5912ff620814 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -139,7 +139,7 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; - private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AutoIdTimestamp autoIdTimestamp = new AutoIdTimestamp(); private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -166,7 +166,7 @@ public InternalEngine(EngineConfig engineConfig) { final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { - maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); + autoIdTimestamp.onNewTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), @@ -372,9 +372,9 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { - assert maxUnsafeAutoIdTimestamp.get() == -1 : - "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; - maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue())); + assert autoIdTimestamp.maxSeenTimestamp() == -1 : + "max unsafe timestamp was assigned already [" + autoIdTimestamp.maxSeenTimestamp() + "]"; + autoIdTimestamp.onNewTimestamp(Long.parseLong(entry.getValue()), true); } if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : @@ -1014,10 +1014,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) { final boolean mayHaveBeenIndexBefore; if (index.isRetry()) { mayHaveBeenIndexBefore = true; - updateMaxAutoIdTimestamp(index.getAutoGeneratedIdTimestamp()); + autoIdTimestamp.onNewTimestamp(index.getAutoGeneratedIdTimestamp(), true); + assert autoIdTimestamp.maxUnsafeTimestamp() >= index.getAutoGeneratedIdTimestamp(); } else { // in this case we force - mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + mayHaveBeenIndexBefore = autoIdTimestamp.maxUnsafeTimestamp() >= index.getAutoGeneratedIdTimestamp(); + autoIdTimestamp.onNewTimestamp(index.getAutoGeneratedIdTimestamp(), false); } return mayHaveBeenIndexBefore; } @@ -1907,7 +1909,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed()); - stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get()); + stats.updateMaxUnsafeAutoIdTimestamp(autoIdTimestamp.maxUnsafeTimestamp()); } @Override @@ -2237,7 +2239,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(Engine.SYNC_COMMIT_ID, syncId); } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); - commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(autoIdTimestamp.maxUnsafeTimestamp())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); @@ -2291,7 +2293,7 @@ public void onSettingsChanged() { // this is an anti-viral settings you can only opt out for the entire index // only if a shard starts up again due to relocation or if the index is closed // the setting will be re-interpreted if it's set to true - this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); + autoIdTimestamp.onNewTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); final IndexSettings indexSettings = engineConfig.getIndexSettings(); @@ -2532,14 +2534,13 @@ void updateRefreshedCheckpoint(long checkpoint) { } @Override - public long getMaxAutoIdTimestamp() { - return maxUnsafeAutoIdTimestamp.get(); + public final long getMaxSeenAutoIdTimestamp() { + return autoIdTimestamp.maxSeenTimestamp(); } @Override - public void updateMaxAutoIdTimestamp(long newTimestamp) { - assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; - maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); - assert newTimestamp <= maxUnsafeAutoIdTimestamp.get(); + public final void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + autoIdTimestamp.onNewTimestamp(newTimestamp, true); } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c87ab0bcd8e78..e3642cc91f8c5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -28,7 +28,6 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.internal.io.IOUtils; @@ -368,12 +367,7 @@ public void maybePruneDeletes() { } @Override - public long getMaxAutoIdTimestamp() { - return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; - } - - @Override - public void updateMaxAutoIdTimestamp(long newTimestamp) { + public void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8671fdd567e98..6cb107040d806 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1245,18 +1245,26 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { } /** - * Sets the maximum auto-generated timestamp of append-only requests tracked by this shard to {@code newTimestamp}. - * The update only takes effect if the current timestamp is smaller the new given parameter. + * Returns the maximum auto_id_timestamp of all append-only requests have been processed (or force-updated) by this shard. + * A primary propagates this timestamp to replicas at the beginning of a peer-recovery or a primary-replica resync. + * + * @see #forceUpdateMaxUnsafeAutoIdTimestamp(long) */ - public void updateMaxAutoIdTimestamp(long newTimestamp) { - getEngine().updateMaxAutoIdTimestamp(newTimestamp); + public long getMaxSeenAutoIdTimestamp() { + return getEngine().getMaxSeenAutoIdTimestamp(); } /** - * Returns the maximum auto-generated timestamp of append-only requests has been processed by this shard. + * Since operations stored in soft-deletes do not have max_auto_id_timestamp, the primary has to propagate its max_auto_id_timestamp + * (via {@link #getMaxSeenAutoIdTimestamp()} of all processed append-only requests to replicas at the beginning of a peer-recovery + * or a primary-replica resync to force a replica to disable optimization for all append-only requests which are replicated via + * replication while its retry variants are replicated via recovery without auto_id_timestamp. + *

+ * Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives + * a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication. */ - public long getMaxAutoIdTimestamp() { - return getEngine().getMaxAutoIdTimestamp(); + public void forceUpdateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) { + getEngine().forceUpdateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary); } public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 144b0dec3af31..ba88e30727d61 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -455,7 +455,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final RecoveryTarget recoveryTarget = recoveryRef.target(); try { - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), request.maxAutoIdTimestamp()); + recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index da35bb157c75f..20e6d8578732d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -201,8 +201,6 @@ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - // DISCUSS: Is it possible to have an operation gets delivered via recovery first, then delivered via replication? - // If this is the case, we need to propagate the max_timestamp of all append-only, not only retry requests. final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all @@ -217,7 +215,10 @@ public RecoveryResponse recoverToTarget() throws IOException { } final long targetLocalCheckpoint; try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { - targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); + // We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee + // that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -449,9 +450,11 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot, + final long maxSeenAutoIdTimestamp) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); @@ -464,7 +467,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot( + startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -532,10 +536,11 @@ static class SendSnapshotResult { * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the * total number of operations sent + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, - final Translog.Snapshot snapshot) throws IOException { + final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -554,7 +559,7 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require } final CancellableThreads.IOInterruptable sendBatch = () -> - targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, shard.getMaxAutoIdTimestamp())); + targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp)); // send operations in batches Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index aeaca69d5d78b..eba50b66ab0e2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -387,14 +387,14 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { + long maxSeenAutoIdTimestampOnPrimary) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); if (indexShard().state() != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } - indexShard().updateMaxAutoIdTimestamp(maxAutoIdTimestamp); + indexShard().forceUpdateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index cc2ff71a973b1..53220c5860949 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -59,12 +59,13 @@ public interface RecoveryTargetHandler { /** * Index a set of translog operations on the target - * @param operations operations to index - * @param totalTranslogOps current number of total operations expected to be indexed - * @param maxAutoIdTimestamp the maximum auto-generated timestamp from the primary shard + * @param operations operations to index + * @param totalTranslogOps current number of total operations expected to be indexed + * @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard * @return the local checkpoint on the target shard */ - long indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp) throws IOException; + long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index c87d3e53b856e..3adb5695e0290 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -36,18 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private ShardId shardId; private List operations; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; - private long maxAutoIdTimestamp; + private long maxSeenAutoIdTimestampOnPrimary; public RecoveryTranslogOperationsRequest() { } RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, - int totalTranslogOps, long maxAutoIdTimestamp) { + int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; - this.maxAutoIdTimestamp = maxAutoIdTimestamp; + this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary; } public long recoveryId() { @@ -66,8 +66,8 @@ public int totalTranslogOps() { return totalTranslogOps; } - public long maxAutoIdTimestamp() { - return maxAutoIdTimestamp; + public long maxSeenAutoIdTimestampOnPrimary() { + return maxSeenAutoIdTimestampOnPrimary; } @Override @@ -78,9 +78,9 @@ public void readFrom(StreamInput in) throws IOException { operations = Translog.readOperations(in, "recovery"); totalTranslogOps = in.readVInt(); if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { - maxAutoIdTimestamp = in.readZLong(); + maxSeenAutoIdTimestampOnPrimary = in.readZLong(); } else { - maxAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; } } @@ -92,7 +92,7 @@ public void writeTo(StreamOutput out) throws IOException { Translog.writeOperations(out, operations); out.writeVInt(totalTranslogOps); if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { - out.writeZLong(maxAutoIdTimestamp); + out.writeZLong(maxSeenAutoIdTimestampOnPrimary); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 2c50117c1cd30..3a7f28e8eb7e2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp) { + public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { final RecoveryTranslogOperationsRequest translogOperationsRequest = - new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxAutoIdTimestamp); + new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); final TransportFuture future = transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8f9d90154f8f4..71fe879ea84d8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3536,6 +3536,8 @@ public void run() { } assertEquals(0, engine.getNumVersionLookups()); assertEquals(0, engine.getNumIndexVersionsLookups()); + assertThat(engine.getMaxSeenAutoIdTimestamp(), + equalTo(docs.stream().mapToLong(Engine.Index::getAutoGeneratedIdTimestamp).max().getAsLong())); assertLuceneOperations(engine, numDocs, 0, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index d49ff941e8942..f5acf5c197443 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -155,8 +155,7 @@ public void testRetryAppendOnlyWhileRecovering() throws Exception { shards.assertAllEqual(1); shards.index(originalRequest); shards.assertAllEqual(1); - assertThat(replica.getMaxAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp())); - assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp())); + assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp())); } } @@ -233,7 +232,6 @@ public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp()); assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp()); assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp()); - assertThat(replica.getMaxAutoIdTimestamp(), equalTo(shards.getPrimary().getMaxAutoIdTimestamp())); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index dbb9c2ed7bb34..3883554acc04c 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -489,9 +489,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { }) { @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { + long maxSeenAutoIdTimestampOnPrimary) throws IOException { opsSent.set(true); - return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); } }; }); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12d54aea2dcb4..b7fa9448a0fc1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2198,8 +2198,8 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { }) { @Override public long indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); + long maxSeenAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp); assertFalse(replica.isSyncNeeded()); return localCheckpoint; } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 03f06ef48b6d8..9b17962f91b1f 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -207,7 +207,7 @@ public int totalOperations() { public Translog.Operation next() throws IOException { return operations.get(counter++); } - }); + }, randomNonNegativeLong()); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); @@ -249,7 +249,7 @@ public Translog.Operation next() throws IOException { } while (op != null && opsToSkip.contains(op)); return op; } - })); + }, randomNonNegativeLong())); } } @@ -420,7 +420,8 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr } @Override - long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp) { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index f3e8e4e57a53f..1da8311d81f2f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -441,6 +441,9 @@ public synchronized List shardRoutings() { public synchronized void close() throws Exception { if (closed == false) { closed = true; + for (IndexShard replica : replicas) { + assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); + } closeShards(this); } else { throw new AlreadyClosedException("too bad"); From e0c48dab091dd55417420a5d8d49557f300209b6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 15 Sep 2018 02:24:32 -0400 Subject: [PATCH 4/7] ignore AlreadyClosedException --- .../index/replication/ESIndexLevelReplicationTestCase.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 1da8311d81f2f..f590b99b48172 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -442,7 +442,10 @@ public synchronized void close() throws Exception { if (closed == false) { closed = true; for (IndexShard replica : replicas) { - assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); + try { + assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); + } catch (AlreadyClosedException ignored) { + } } closeShards(this); } else { From 377267a67eb9e91ace996e8f20c2da5404aaf6de Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Sep 2018 08:20:44 -0400 Subject: [PATCH 5/7] =?UTF-8?q?boaz=E2=80=99s=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/elasticsearch/index/engine/Engine.java | 6 +++--- .../org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../org/elasticsearch/index/engine/ReadOnlyEngine.java | 2 +- .../java/org/elasticsearch/index/shard/IndexShard.java | 10 +++++----- .../elasticsearch/indices/recovery/RecoveryTarget.java | 2 +- .../index/replication/IndexLevelReplicationTests.java | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0fc72bb710bfa..508cd09a4b3e7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1728,9 +1728,9 @@ public boolean isRecovering() { public abstract void maybePruneDeletes(); /** - * Returns the maximum auto_id_timestamp of all append-only have been processed (or force-updated) by this engine. + * Returns the maximum auto_id_timestamp of all append-only index requests have been processed by this engine + * or the auto_id_timestamp received from its primary shard via {@link #updateMaxUnsafeAutoIdTimestamp(long)}. * Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp. - * @see #forceUpdateMaxUnsafeAutoIdTimestamp(long) */ public long getMaxSeenAutoIdTimestamp() { return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; @@ -1740,7 +1740,7 @@ public long getMaxSeenAutoIdTimestamp() { * Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp. * The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}. */ - public abstract void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp); + public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp); @FunctionalInterface public interface TranslogRecoveryRunner { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a5912ff620814..271bfe5d0afd3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2539,7 +2539,7 @@ public final long getMaxSeenAutoIdTimestamp() { } @Override - public final void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { autoIdTimestamp.onNewTimestamp(newTimestamp, true); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 45d0768bc4a5d..4175ff2a36095 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -375,7 +375,7 @@ public DocsStats docStats() { } @Override - public void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 20e5029eb8c4d..adc219bd87e49 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1218,10 +1218,10 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { } /** - * Returns the maximum auto_id_timestamp of all append-only requests have been processed (or force-updated) by this shard. - * A primary propagates this timestamp to replicas at the beginning of a peer-recovery or a primary-replica resync. + * Returns the maximum auto_id_timestamp of all append-only requests have been processed by this shard or the auto_id_timestamp received + * from the primary via {@link #updateMaxUnsafeAutoIdTimestamp(long)} at the beginning of a peer-recovery or a primary-replica resync. * - * @see #forceUpdateMaxUnsafeAutoIdTimestamp(long) + * @see #updateMaxUnsafeAutoIdTimestamp(long) */ public long getMaxSeenAutoIdTimestamp() { return getEngine().getMaxSeenAutoIdTimestamp(); @@ -1236,8 +1236,8 @@ public long getMaxSeenAutoIdTimestamp() { * Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives * a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication. */ - public void forceUpdateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) { - getEngine().forceUpdateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary); + public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) { + getEngine().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary); } public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index eba50b66ab0e2..6feeab175c6bc 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -394,7 +394,7 @@ public long indexTranslogOperations(List operations, int tot if (indexShard().state() != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } - indexShard().forceUpdateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); + indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f5acf5c197443..2f38ef709d1c5 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -142,7 +142,7 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } } - public void testRetryAppendOnlyWhileRecovering() throws Exception { + public void testRetryAppendOnlyAfterRecovering() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); final IndexRequest originalRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); @@ -153,7 +153,7 @@ public void testRetryAppendOnlyWhileRecovering() throws Exception { IndexShard replica = shards.addReplica(); shards.recoverReplica(replica); shards.assertAllEqual(1); - shards.index(originalRequest); + shards.index(originalRequest); // original append-only arrives after recovery completed shards.assertAllEqual(1); assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp())); } From e6a929a432d56838b593ed05c2f4397dd5a68708 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Sep 2018 08:45:12 -0400 Subject: [PATCH 6/7] remove class --- .../index/engine/AutoIdTimestamp.java | 49 ------------------- .../index/engine/InternalEngine.java | 40 +++++++++------ 2 files changed, 26 insertions(+), 63 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java b/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java deleted file mode 100644 index f524c77b9c521..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/AutoIdTimestamp.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * Tracks auto_id_timestamp of append-only index requests have been processed in an {@link Engine}. - */ -final class AutoIdTimestamp { - private final AtomicLong maxUnsafeTimestamp = new AtomicLong(-1); - private final AtomicLong maxSeenTimestamp = new AtomicLong(-1); - - void onNewTimestamp(long newTimestamp, boolean unsafe) { - assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; - maxSeenTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); - assert maxSeenTimestamp.get() >= newTimestamp; - if (unsafe) { - maxUnsafeTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); - assert maxUnsafeTimestamp.get() >= newTimestamp; - } - assert maxUnsafeTimestamp.get() <= maxSeenTimestamp.get(); - } - - long maxUnsafeTimestamp() { - return maxUnsafeTimestamp.get(); - } - - long maxSeenTimestamp() { - return maxSeenTimestamp.get(); - } -} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 271bfe5d0afd3..28a8fc81dcd54 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -139,7 +139,8 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; - private final AutoIdTimestamp autoIdTimestamp = new AutoIdTimestamp(); + private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -166,7 +167,7 @@ public InternalEngine(EngineConfig engineConfig) { final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { - autoIdTimestamp.onNewTimestamp(Long.MAX_VALUE, true); + updateAutoIdTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), @@ -372,9 +373,9 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { - assert autoIdTimestamp.maxSeenTimestamp() == -1 : - "max unsafe timestamp was assigned already [" + autoIdTimestamp.maxSeenTimestamp() + "]"; - autoIdTimestamp.onNewTimestamp(Long.parseLong(entry.getValue()), true); + assert maxUnsafeAutoIdTimestamp.get() == -1 : + "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; + updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); } if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : @@ -1014,12 +1015,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) { final boolean mayHaveBeenIndexBefore; if (index.isRetry()) { mayHaveBeenIndexBefore = true; - autoIdTimestamp.onNewTimestamp(index.getAutoGeneratedIdTimestamp(), true); - assert autoIdTimestamp.maxUnsafeTimestamp() >= index.getAutoGeneratedIdTimestamp(); + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true); + assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); } else { // in this case we force - mayHaveBeenIndexBefore = autoIdTimestamp.maxUnsafeTimestamp() >= index.getAutoGeneratedIdTimestamp(); - autoIdTimestamp.onNewTimestamp(index.getAutoGeneratedIdTimestamp(), false); + mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false); } return mayHaveBeenIndexBefore; } @@ -1909,7 +1910,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed()); - stats.updateMaxUnsafeAutoIdTimestamp(autoIdTimestamp.maxUnsafeTimestamp()); + stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get()); } @Override @@ -2239,7 +2240,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(Engine.SYNC_COMMIT_ID, syncId); } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); - commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(autoIdTimestamp.maxUnsafeTimestamp())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); @@ -2293,7 +2294,7 @@ public void onSettingsChanged() { // this is an anti-viral settings you can only opt out for the entire index // only if a shard starts up again due to relocation or if the index is closed // the setting will be re-interpreted if it's set to true - autoIdTimestamp.onNewTimestamp(Long.MAX_VALUE, true); + updateAutoIdTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); final IndexSettings indexSettings = engineConfig.getIndexSettings(); @@ -2535,12 +2536,23 @@ void updateRefreshedCheckpoint(long checkpoint) { @Override public final long getMaxSeenAutoIdTimestamp() { - return autoIdTimestamp.maxSeenTimestamp(); + return maxSeenAutoIdTimestamp.get(); } @Override public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { - autoIdTimestamp.onNewTimestamp(newTimestamp, true); + updateAutoIdTimestamp(newTimestamp, true); + } + + private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + assert maxSeenAutoIdTimestamp.get() >= newTimestamp; + if (unsafe) { + maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + assert maxUnsafeAutoIdTimestamp.get() >= newTimestamp; + } + assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } } From 07e1621e7ca3a7fe2366f4ad3673d3bb2dca7694 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Sep 2018 16:00:53 -0400 Subject: [PATCH 7/7] feedback --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 -- .../org/elasticsearch/indices/recovery/RecoveryTarget.java | 6 ++++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 518c34a331bfb..1043e514fd737 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2542,10 +2542,8 @@ public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); - assert maxSeenAutoIdTimestamp.get() >= newTimestamp; if (unsafe) { maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); - assert maxUnsafeAutoIdTimestamp.get() >= newTimestamp; } assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 6feeab175c6bc..e2f21fe8edd2e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -394,6 +394,12 @@ public long indexTranslogOperations(List operations, int tot if (indexShard().state() != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } + /* + * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation + * will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests + * (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we + * replay these operations first (without timestamp), then optimize append-only requests (with timestamp). + */ indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);