diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 95c3d4b69301e..9210622ecec23 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -392,7 +392,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq if (!TransportActions.isShardNotAvailableException(failure)) { throw failure; } - } else { + } else if (operationResult.getTranslogLocation() != null) { // out of order ops are not added to the translog location = locationToSync(location, operationResult.getTranslogLocation()); } } catch (Exception e) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bce68ced35c98..0e3fe396601a8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -502,9 +502,11 @@ public IndexResult index(Index index) throws IOException { } else if (plan.indexIntoLucene) { indexResult = indexIntoLucene(index, plan); } else { + assert index.origin() != Operation.Origin.PRIMARY; indexResult = new IndexResult(plan.versionForIndexing, plan.currentNotFoundOrDeleted); } if (indexResult.hasFailure() == false && + plan.indexIntoLucene && // if we didn't store it in lucene, there is no need to store it in the translog index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { Translog.Location location = translog.add(new Translog.Index(index, indexResult)); @@ -541,7 +543,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio // a delete state and return false for the created flag in favor of code simplicity final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processButSkipLucene(false, index.version()); + plan = IndexingStrategy.skipAsStale(false, index.version()); } else { plan = IndexingStrategy.processNormally( opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version() @@ -704,7 +706,7 @@ static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) { return new IndexingStrategy(true, true, true, versionForIndexing, null); } - static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { + static IndexingStrategy skipAsStale(boolean currentNotFoundOrDeleted, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, false, false, versionForIndexing, null); } } @@ -758,9 +760,11 @@ public DeleteResult delete(Delete delete) throws IOException { } else if (plan.deleteFromLucene) { deleteResult = deleteInLucene(delete, plan); } else { + assert delete.origin() != Operation.Origin.PRIMARY; deleteResult = new DeleteResult(plan.versionOfDeletion, plan.currentlyDeleted == false); } if (!deleteResult.hasFailure() && + plan.deleteFromLucene && // if it wasn't applied to lucene, we don't store it in the translog delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f951d26e2a7ca..1cd42c66b52c4 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2522,8 +2522,8 @@ public BytesRef binaryValue() { public void testDoubleDeliveryPrimary() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyPrimary(doc, false, 1); - Engine.Index retry = appendOnlyPrimary(doc, true, 1); + final Engine.Index operation = appendOnlyPrimary(doc, false, 1); + final Engine.Index retry = appendOnlyPrimary(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); @@ -2551,8 +2551,6 @@ public void testDoubleDeliveryPrimary() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertNotNull(indexResult.getTranslogLocation()); @@ -2563,7 +2561,7 @@ public void testDoubleDeliveryPrimary() throws IOException { Engine.IndexResult retryResult = engine.index(retry); assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(retryResult.getTranslogLocation()); + assertNotNull(indexResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -2577,8 +2575,8 @@ public void testDoubleDeliveryPrimary() throws IOException { public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyReplica(doc, false, 1); - Engine.Index retry = appendOnlyReplica(doc, true, 1); + final Engine.Index operation = appendOnlyReplica(doc, false, 1); + final Engine.Index retry = appendOnlyReplica(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); @@ -2587,8 +2585,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.IndexResult retryResult = engine.index(retry); assertFalse(engine.indexWriterHasDeletions()); assertEquals(1, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); // we didn't index it nor put it in the translog } else { Engine.IndexResult retryResult = engine.index(retry); assertFalse(engine.indexWriterHasDeletions()); @@ -2597,8 +2594,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); // we didn't index it nor put it in the translog } engine.refresh("test"); @@ -2606,20 +2602,16 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(indexResult.getTranslogLocation()); + assertNull(indexResult.getTranslogLocation()); // we don't index because a retry has already been processed. Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); } else { Engine.IndexResult retryResult = engine.index(retry); - assertNotNull(retryResult.getTranslogLocation()); + assertNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); } engine.refresh("test"); @@ -2645,8 +2637,7 @@ public void testDoubleDeliveryReplica() throws IOException { Engine.IndexResult retryResult = engine.index(duplicate); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); + assertNull(retryResult.getTranslogLocation()); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertFalse(engine.indexWriterHasDeletions()); @@ -2658,8 +2649,7 @@ public void testDoubleDeliveryReplica() throws IOException { Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(2, engine.getNumVersionLookups()); - assertNotNull(retryResult.getTranslogLocation()); - assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); + assertNull(indexResult.getTranslogLocation()); // we didn't index, no need to put in translog } engine.refresh("test"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3f64ff04ce717..d27242f0532ef 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -77,6 +78,7 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -123,6 +125,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -282,14 +285,14 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe if (randomBoolean()) { // relocation target - indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node", + indexShard = newShard(newShardRouting(shardId, "local_node", "other node", true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); } else if (randomBoolean()) { // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); - ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, + ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateRoutingEntry(primaryRouting); } else { @@ -341,7 +344,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe case 1: { // initializing replica / primary final boolean relocating = randomBoolean(); - ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node", + ShardRouting routing = newShardRouting(shardId, "local_node", relocating ? "sourceNode" : null, relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, @@ -353,7 +356,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe // relocation source indexShard = newStartedShard(true); ShardRouting routing = indexShard.routingEntry(); - routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", + routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); indexShard.updateRoutingEntry(routing); indexShard.relocated("test"); @@ -914,6 +917,38 @@ public void testRecoverFromStore() throws IOException { closeShards(newShard); } + public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { + final IndexShard shard = newStartedShard(false); + final Engine.Index index = shard.prepareIndexOnReplica( + SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"), + XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL); + shard.delete(delete); + final int translogOps; + if (randomBoolean()) { + flushShard(shard, true); // lucene won't flush due to just one pending delete + translogOps = 0; + } else { + translogOps = 1; + } + final Engine.IndexResult result = shard.index(index); + assertThat(result.getTranslogLocation(), nullValue()); + final ShardRouting replicaRouting = shard.routingEntry(); + IndexShard newShard = reinitShard(shard, + newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE)); + DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); + assertTrue(newShard.recoverFromStore()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); + assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); + assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + assertDocCount(newShard, 0); + closeShards(newShard); + } + public void testRecoverFromCleanStore() throws IOException { final IndexShard shard = newStartedShard(true); indexDoc(shard, "test", "0"); @@ -1336,7 +1371,7 @@ public void testRecoverFromLocalShard() throws IOException { sourceShard.refresh("test"); - ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, + ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true, ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE); final IndexShard targetShard;