From bafb5f65d52daee4b7d364abc131e2600466533a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 6 Jul 2017 14:30:11 +0200 Subject: [PATCH 1/5] Engine - Do not store operations that are not index into lucene in the translog When a replica processes out of order operations, it can drop some due to version comparisons. In the past that would have resulted in a VersionConflictException being thrown and ignored higher up. We changed this to have a cleaner flow that doesn't use exceptions. However, when backporting that change from master, we also back ported a change that isn't good for 5.x: we started storing these out of order ops in the translog. This is needed for the sequence number push, which also gives us some mechanism to deal with it later on during recovery. With the seq# this is not needed and can lead to deletes being lost (see test). --- .../index/engine/InternalEngine.java | 8 +++- .../index/engine/InternalEngineTests.java | 36 ++++++---------- .../index/shard/IndexShardTests.java | 42 ++++++++++++++++--- 3 files changed, 56 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bce68ced35c98..0e3fe396601a8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -502,9 +502,11 @@ public IndexResult index(Index index) throws IOException { } else if (plan.indexIntoLucene) { indexResult = indexIntoLucene(index, plan); } else { + assert index.origin() != Operation.Origin.PRIMARY; indexResult = new IndexResult(plan.versionForIndexing, plan.currentNotFoundOrDeleted); } if (indexResult.hasFailure() == false && + plan.indexIntoLucene && // if we didn't store it in lucene, there is no need to store it in the translog index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { Translog.Location location = translog.add(new Translog.Index(index, indexResult)); @@ -541,7 +543,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // a delete state and return false for the created flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processButSkipLucene(false, index.version()); + plan = IndexingStrategy.skipAsStale(false, index.version()); } else { plan = IndexingStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version() @@ -704,7 +706,7 @@ static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) { return new IndexingStrategy(true, true, true, versionForIndexing, null); } - static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { + static IndexingStrategy skipAsStale(boolean currentNotFoundOrDeleted, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, false, false, versionForIndexing, null); } } @@ -758,9 +760,11 @@ public DeleteResult delete(Delete delete) throws IOException { } else if (plan.deleteFromLucene) { deleteResult = deleteInLucene(delete, plan); } else { + assert delete.origin() != Operation.Origin.PRIMARY; deleteResult = new DeleteResult(plan.versionOfDeletion, plan.currentlyDeleted == false); } if (!deleteResult.hasFailure() && + plan.deleteFromLucene && // if it wasn't applied to lucene, we don't store it in the translog delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f951d26e2a7ca..1cd42c66b52c4 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2522,8 +2522,8 @@ public BytesRef binaryValue() { public void testDoubleDeliveryPrimary() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyPrimary(doc, false, 1); - Engine.Index retry = appendOnlyPrimary(doc, true, 1); + final Engine.Index operation = appendOnlyPrimary(doc, false, 1); + final Engine.Index retry = appendOnlyPrimary(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); @@ -2551,8 +2551,6 @@ public void testDoubleDeliveryPrimary() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertNotNull(indexResult.getTranslogLocation()); @@ -2563,7 +2561,7 @@ public void testDoubleDeliveryPrimary() throws IOException { Engine.IndexResult retryResult = engine.index(retry); assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(retryResult.getTranslogLocation()); + assertNotNull(indexResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -2577,8 +2575,8 @@ public void testDoubleDeliveryPrimary() throws IOException { public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyReplica(doc, false, 1); - Engine.Index retry = appendOnlyReplica(doc, true, 1); + final Engine.Index operation = appendOnlyReplica(doc, false, 1); + final Engine.Index retry = appendOnlyReplica(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); @@ -2587,8 +2585,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.IndexResult retryResult = engine.index(retry); assertFalse(engine.indexWriterHasDeletions()); assertEquals(1, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); // we didn't index it nor put it in the translog } else { Engine.IndexResult retryResult = engine.index(retry); assertFalse(engine.indexWriterHasDeletions()); @@ -2597,8 +2594,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); // we didn't index it nor put it in the translog } engine.refresh("test"); @@ -2606,20 +2602,16 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(indexResult.getTranslogLocation()); + assertNull(indexResult.getTranslogLocation()); // we don't index because a retry has already been processed. Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); } else { Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getTranslogLocation()); + assertNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); } engine.refresh("test"); @@ -2645,8 +2637,7 @@ public void testDoubleDeliveryReplica() throws IOException { Engine.IndexResult retryResult = engine.index(duplicate); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertFalse(engine.indexWriterHasDeletions()); @@ -2658,8 +2649,7 @@ public void testDoubleDeliveryReplica() throws IOException { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); // we didn't index, no need to put in translog } engine.refresh("test"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3f64ff04ce717..aa6d880ec2f8d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -123,6 +125,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -282,14 +285,14 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe if (randomBoolean()) { // relocation target - indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node", + indexShard = newShard(newShardRouting(shardId, "local_node", "other node", true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); } else if (randomBoolean()) { // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); - ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, + ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateRoutingEntry(primaryRouting); } else { @@ -341,7 +344,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe case 1: { // initializing replica / primary final boolean relocating = randomBoolean(); - ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node", + ShardRouting routing = newShardRouting(shardId, "local_node", relocating ? "sourceNode" : null, relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, @@ -353,7 +356,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe // relocation source indexShard = newStartedShard(true); ShardRouting routing = indexShard.routingEntry(); - routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", + routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); indexShard.updateRoutingEntry(routing); indexShard.relocated("test"); @@ -914,6 +917,35 @@ public void testRecoverFromStore() throws IOException { closeShards(newShard); } + public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { + final IndexShard shard = newStartedShard(false); + int translogOps = 1; + final Engine.Index index = shard.prepareIndexOnReplica( + SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"), + XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL); + shard.delete(delete); + if (randomBoolean()) { + flushShard(shard); + translogOps = 0; + } + final Engine.IndexResult result = shard.index(index); + assertThat(result.getTranslogLocation(), nullValue()); + final ShardRouting replicaRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(shard, + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING)); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 0); + closeShards(newShard); + } + public void testRecoverFromCleanStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0"); @@ -1336,7 +1368,7 @@ public void testRecoverFromLocalShard() throws IOException { sourceShard.refresh("test"); - ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, + ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE); final IndexShard targetShard; From f2253bea0ff1a107225a55f4605e3a07d6fe656e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 6 Jul 2017 15:00:45 +0200 Subject: [PATCH 2/5] fix testRecoverFromStoreWithOutOfOrderDelete --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index aa6d880ec2f8d..86c7e138b4970 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -921,7 +921,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { final IndexShard shard = newStartedShard(false); int translogOps = 1; final Engine.Index index = shard.prepareIndexOnReplica( - SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"), + SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL); shard.delete(delete); @@ -933,7 +933,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { assertThat(result.getTranslogLocation(), nullValue()); final ShardRouting replicaRouting = shard.routingEntry(); IndexShard newShard = reinitShard(shard, - newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING)); + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); From 566245035960ff709fd364bf87507807e0bf206f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 6 Jul 2017 15:24:50 +0200 Subject: [PATCH 3/5] allow for null locations on replicas in TransportShardBulkAction.java --- .../elasticsearch/action/bulk/TransportShardBulkAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index b4a188578b3b7..408653d3372dc 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -29,8 +29,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -391,7 +391,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq if (!TransportActions.isShardNotAvailableException(failure)) { throw failure; } - } else { + } else if (operationResult.getTranslogLocation() != null) { // out of order ops are not added to the translog location = locationToSync(location, operationResult.getTranslogLocation()); } } catch (Exception e) { From 09b3ee13d4f5e2316873799b659dc6cdc8033ae6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 6 Jul 2017 20:05:00 +0200 Subject: [PATCH 4/5] testRecoverFromStoreWithOutOfOrderDelete tweak --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 86c7e138b4970..8b35bd8bcb925 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -926,7 +926,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL); shard.delete(delete); if (randomBoolean()) { - flushShard(shard); + flushShard(shard, true); // lucene won't flush due to just one pending delete translogOps = 0; } final Engine.IndexResult result = shard.index(index); From c6bc91cd7f24acb3085e09da1da99a64d74dc4ad Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 10 Jul 2017 08:50:27 +0200 Subject: [PATCH 5/5] feedback --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8b35bd8bcb925..d27242f0532ef 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -919,15 +919,17 @@ public void testRecoverFromStore() throws IOException { public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { final IndexShard shard = newStartedShard(false); - int translogOps = 1; final Engine.Index index = shard.prepareIndexOnReplica( SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL); shard.delete(delete); + final int translogOps; if (randomBoolean()) { flushShard(shard, true); // lucene won't flush due to just one pending delete translogOps = 0; + } else { + translogOps = 1; } final Engine.IndexResult result = shard.index(index); assertThat(result.getTranslogLocation(), nullValue());