diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a69e901691de8..14f3a0e749bbb 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -693,14 +693,23 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene; - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } else { + if (index.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) { // This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog // created by an old version. assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "index is newly created but op has no sequence numbers. op: " + index; opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + } else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){ + // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); } if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -979,12 +988,21 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene; - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } else { + if (delete.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) { assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "index is newly created but op has no sequence numbers. op: " + delete; opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + } else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) { + // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); } final DeletionStrategy plan; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9749798b91172..6a896b0f3ccb0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -125,12 +125,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -339,7 +341,7 @@ public void onFailure(Exception e) { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting primaryRouting = - TestShardRouting.newShardRouting( + newShardRouting( replicaRouting.shardId(), replicaRouting.currentNodeId(), null, @@ -416,7 +418,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting primaryRouting = - TestShardRouting.newShardRouting( + newShardRouting( replicaRouting.shardId(), replicaRouting.currentNodeId(), null, @@ -458,13 +460,13 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E if (randomBoolean()) { // relocation target - indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node", + indexShard = newShard(newShardRouting(shardId, "local_node", "other node", true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); } else if (randomBoolean()) { // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); - ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, + ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), @@ -520,7 +522,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { case 1: { // initializing replica / primary final boolean relocating = randomBoolean(); - ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node", + ShardRouting routing = newShardRouting(shardId, "local_node", relocating ? "sourceNode" : null, relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, @@ -533,7 +535,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { // relocation source indexShard = newStartedShard(true); ShardRouting routing = indexShard.routingEntry(); - routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", + routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); IndexShardTestCase.updateRoutingEntry(indexShard, routing); indexShard.relocated("test", primaryContext -> {}); @@ -1377,6 +1379,47 @@ protected void doRun() throws Exception { closeShards(shard); } + public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { + final IndexShard shard = newStartedShard(false); + final Consumer mappingConsumer = getMappingUpdater(shard, "test"); + shard.applyDeleteOperationOnReplica(1, 1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer); + shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation + shard.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick + // around + shard.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + final int translogOps; + if (randomBoolean()) { + logger.info("--> flushing shard"); + flushShard(shard); + translogOps = 2; + } else if (randomBoolean()) { + shard.getEngine().rollTranslogGeneration(); + translogOps = 3; + } else { + translogOps = 3; + } + + final ShardRouting replicaRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(shard, + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry())); + assertDocCount(newShard, 1); + closeShards(newShard); + } + public void testRecoverFromStore() throws IOException { final IndexShard shard = newStartedShard(true); int totalOps = randomInt(10); @@ -1939,7 +1982,7 @@ public void testRecoverFromLocalShard() throws IOException { sourceShard.refresh("test"); - ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, + ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE); final IndexShard targetShard; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2a95bf33908d0..4c592d1c47245 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,9 +19,14 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.shard.IndexShard; @@ -79,4 +84,52 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0))); } } + + public void testRecoveryWithOutOfOrderDelete() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + // create out of order delete and index op on replica + final IndexShard orgReplica = shards.getReplicas().get(0); + orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, u -> {}); + orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation + orgReplica.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), + u -> {}); + + // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation + // stick around + orgReplica.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {}); + + final int translogOps; + if (randomBoolean()) { + if (randomBoolean()) { + logger.info("--> flushing shard (translog will be trimmed)"); + IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + ); + orgReplica.indexSettings().updateIndexMetaData(builder.build()); + orgReplica.onSettingsChanged(); + translogOps = 3; // 2 ops + seqno gaps + } else { + logger.info("--> flushing shard (translog will be retained)"); + translogOps = 4; // 3 ops + seqno gaps + } + flushShard(orgReplica); + } else { + translogOps = 4; // 3 ops + seqno gaps + } + + final IndexShard orgPrimary = shards.getPrimary(); + shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed. + + IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + shards.assertAllEqual(1); + + assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); + } + } }