From f6621b52d3e3545d3c14af8f80183ab7b5e89a9a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 19 Jul 2017 10:09:59 +0200 Subject: [PATCH 1/4] failing test --- .../index/shard/IndexShardTests.java | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9749798b91172..6a896b0f3ccb0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -125,12 +125,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -339,7 +341,7 @@ public void onFailure(Exception e) { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting primaryRouting = - TestShardRouting.newShardRouting( + newShardRouting( replicaRouting.shardId(), replicaRouting.currentNodeId(), null, @@ -416,7 +418,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); final ShardRouting primaryRouting = - TestShardRouting.newShardRouting( + newShardRouting( replicaRouting.shardId(), replicaRouting.currentNodeId(), null, @@ -458,13 +460,13 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E if (randomBoolean()) { // relocation target - indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node", + indexShard = newShard(newShardRouting(shardId, "local_node", "other node", true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); } else if (randomBoolean()) { // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); - ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, + ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), @@ -520,7 +522,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { case 1: { // initializing replica / primary final boolean relocating = randomBoolean(); - ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node", + ShardRouting routing = newShardRouting(shardId, "local_node", relocating ? "sourceNode" : null, relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, @@ -533,7 +535,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { // relocation source indexShard = newStartedShard(true); ShardRouting routing = indexShard.routingEntry(); - routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", + routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); IndexShardTestCase.updateRoutingEntry(indexShard, routing); indexShard.relocated("test", primaryContext -> {}); @@ -1377,6 +1379,47 @@ protected void doRun() throws Exception { closeShards(shard); } + public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { + final IndexShard shard = newStartedShard(false); + final Consumer mappingConsumer = getMappingUpdater(shard, "test"); + shard.applyDeleteOperationOnReplica(1, 1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer); + shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation + shard.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick + // around + shard.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + final int translogOps; + if (randomBoolean()) { + logger.info("--> flushing shard"); + flushShard(shard); + translogOps = 2; + } else if (randomBoolean()) { + shard.getEngine().rollTranslogGeneration(); + translogOps = 3; + } else { + translogOps = 3; + } + + final ShardRouting replicaRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(shard, + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry())); + assertDocCount(newShard, 1); + closeShards(newShard); + } + public void testRecoverFromStore() throws IOException { final IndexShard shard = newStartedShard(true); int totalOps = randomInt(10); @@ -1939,7 +1982,7 @@ public void testRecoverFromLocalShard() throws IOException { sourceShard.refresh("test"); - ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, + ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE); final IndexShard targetShard; From 3db5e6d16cea0ac942b31fad236de341a0f28839 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 20 Jul 2017 22:34:42 +0200 Subject: [PATCH 2/4] recovery failing test --- .../indices/recovery/RecoveryTests.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 2a95bf33908d0..a8154bd8a3c02 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,15 +19,22 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.shard.IndexShard; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; @@ -79,4 +86,52 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0))); } } + + public void testRecoveryWithOutOfOrderDelete() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + // create out of order delete and index op on replica + final IndexShard orgReplica = shards.getReplicas().get(0); + final Consumer mappingConsumer = getMappingUpdater(orgReplica, "type"); + orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, mappingConsumer); + orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation + orgReplica.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick + // around + orgReplica.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + + final int translogOps; + if (randomBoolean()) { + if (randomBoolean()) { + logger.info("--> flushing shard (translog will be trimmed)"); + IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + ); + orgReplica.indexSettings().updateIndexMetaData(builder.build()); + orgReplica.onSettingsChanged(); + translogOps = 3; // 2 ops + seqno gaps + } else { + logger.info("--> flushing shard (translog will be retained)"); + translogOps = 4; // 3 ops + seqno gaps + } + flushShard(orgReplica); + } else { + translogOps = 4; // 3 ops + seqno gaps + } + + final IndexShard orgPrimary = shards.getPrimary(); + shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed. + + IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + shards.assertAllEqual(1); + + assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); + } + } } From c9ce58fc0b8f6971163735b18545659d6f5ae163 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 20 Jul 2017 22:53:19 +0200 Subject: [PATCH 3/4] fix --- .../index/engine/InternalEngine.java | 26 ++++++++++++++----- .../indices/recovery/RecoveryTests.java | 14 +++++----- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a69e901691de8..664108f46a6f2 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -693,14 +693,21 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene; - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } else { + if (index.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) { // This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog // created by an old version. assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "index is newly created but op has no sequence numbers. op: " + index; opVsLucene = compareOpToLuceneDocBasedOnVersions(index); + } else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){ + // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit that was sent over to the replica (due to translog retention) or due to + // a concurrent indexing / recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. See testRecoverFromStoreWithOutOfOrderDelete + opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); } if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -979,12 +986,19 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene; - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } else { + if (delete.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) { assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) : "index is newly created but op has no sequence numbers. op: " + delete; opVsLucene = compareOpToLuceneDocBasedOnVersions(delete); + } else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) { + // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit that was sent over to the replica (due to translog retention) or due to + // a concurrent indexing / recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. See testRecoverFromStoreWithOutOfOrderDelete + opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); } final DeletionStrategy plan; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a8154bd8a3c02..4c592d1c47245 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; @@ -34,7 +33,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; @@ -92,16 +90,16 @@ public void testRecoveryWithOutOfOrderDelete() throws Exception { shards.startAll(); // create out of order delete and index op on replica final IndexShard orgReplica = shards.getReplicas().get(0); - final Consumer mappingConsumer = getMappingUpdater(orgReplica, "type"); - orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, mappingConsumer); + orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, u -> {}); orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation orgReplica.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), + u -> {}); - // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick - // around + // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation + // stick around orgReplica.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer); + SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {}); final int translogOps; if (randomBoolean()) { From 6538c558336b7fc628b0d00d0a0e74b7aac59361 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Jul 2017 17:15:48 +0200 Subject: [PATCH 4/4] improve comment --- .../index/engine/InternalEngine.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 664108f46a6f2..14f3a0e749bbb 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -702,9 +702,11 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){ // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit that was sent over to the replica (due to translog retention) or due to - // a concurrent indexing / recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. See testRecoverFromStoreWithOutOfOrderDelete + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); @@ -993,9 +995,11 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit that was sent over to the replica (due to translog retention) or due to - // a concurrent indexing / recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. See testRecoverFromStoreWithOutOfOrderDelete + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else { opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);