Skip to content

Commit fd86420

Browse files
authored
Engine - Do not store operations that are not index into lucene in the translog (5.x only) (#25592)
When a replica processes out of order operations, it can drop some due to version comparisons. In the past that would have resulted in a VersionConflictException being thrown and ignored higher up. We changed this to have a cleaner flow that doesn't use exceptions. However, when backporting that change from master, we also back ported a change that isn't good for 5.x: we started storing these out of order ops in the translog. This is needed for the sequence number push, which also gives us some mechanism to deal with it later on during recovery. With the seq# this is not needed and can lead to deletes being lost (see the added test `testRecoverFromStoreWithOutOfOrderDelete` which fails without the fix). Note that master also suffers from a similar issue but we will be pursuing a different solution there (still under discussion).
1 parent ef84778 commit fd86420

File tree

4 files changed

+60
-31
lines changed

4 files changed

+60
-31
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
392392
if (!TransportActions.isShardNotAvailableException(failure)) {
393393
throw failure;
394394
}
395-
} else {
395+
} else if (operationResult.getTranslogLocation() != null) { // out of order ops are not added to the translog
396396
location = locationToSync(location, operationResult.getTranslogLocation());
397397
}
398398
} catch (Exception e) {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,11 @@ public IndexResult index(Index index) throws IOException {
502502
} else if (plan.indexIntoLucene) {
503503
indexResult = indexIntoLucene(index, plan);
504504
} else {
505+
assert index.origin() != Operation.Origin.PRIMARY;
505506
indexResult = new IndexResult(plan.versionForIndexing, plan.currentNotFoundOrDeleted);
506507
}
507508
if (indexResult.hasFailure() == false &&
509+
plan.indexIntoLucene && // if we didn't store it in lucene, there is no need to store it in the translog
508510
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
509511
Translog.Location location =
510512
translog.add(new Translog.Index(index, indexResult));
@@ -541,7 +543,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
541543
// a delete state and return false for the created flag in favor of code simplicity
542544
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
543545
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
544-
plan = IndexingStrategy.processButSkipLucene(false, index.version());
546+
plan = IndexingStrategy.skipAsStale(false, index.version());
545547
} else {
546548
plan = IndexingStrategy.processNormally(
547549
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()
@@ -704,7 +706,7 @@ static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
704706
return new IndexingStrategy(true, true, true, versionForIndexing, null);
705707
}
706708

707-
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
709+
static IndexingStrategy skipAsStale(boolean currentNotFoundOrDeleted, long versionForIndexing) {
708710
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, versionForIndexing, null);
709711
}
710712
}
@@ -758,9 +760,11 @@ public DeleteResult delete(Delete delete) throws IOException {
758760
} else if (plan.deleteFromLucene) {
759761
deleteResult = deleteInLucene(delete, plan);
760762
} else {
763+
assert delete.origin() != Operation.Origin.PRIMARY;
761764
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.currentlyDeleted == false);
762765
}
763766
if (!deleteResult.hasFailure() &&
767+
plan.deleteFromLucene && // if it wasn't applied to lucene, we don't store it in the translog
764768
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
765769
Translog.Location location =
766770
translog.add(new Translog.Delete(delete, deleteResult));

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2522,8 +2522,8 @@ public BytesRef binaryValue() {
25222522
public void testDoubleDeliveryPrimary() throws IOException {
25232523
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L,
25242524
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
2525-
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
2526-
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
2525+
final Engine.Index operation = appendOnlyPrimary(doc, false, 1);
2526+
final Engine.Index retry = appendOnlyPrimary(doc, true, 1);
25272527
if (randomBoolean()) {
25282528
Engine.IndexResult indexResult = engine.index(operation);
25292529
assertFalse(engine.indexWriterHasDeletions());
@@ -2551,8 +2551,6 @@ public void testDoubleDeliveryPrimary() throws IOException {
25512551
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
25522552
assertEquals(1, topDocs.totalHits);
25532553
}
2554-
operation = randomAppendOnly(doc, false, 1);
2555-
retry = randomAppendOnly(doc, true, 1);
25562554
if (randomBoolean()) {
25572555
Engine.IndexResult indexResult = engine.index(operation);
25582556
assertNotNull(indexResult.getTranslogLocation());
@@ -2563,7 +2561,7 @@ public void testDoubleDeliveryPrimary() throws IOException {
25632561
Engine.IndexResult retryResult = engine.index(retry);
25642562
assertNotNull(retryResult.getTranslogLocation());
25652563
Engine.IndexResult indexResult = engine.index(operation);
2566-
assertNotNull(retryResult.getTranslogLocation());
2564+
assertNotNull(indexResult.getTranslogLocation());
25672565
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
25682566
}
25692567

@@ -2577,8 +2575,8 @@ public void testDoubleDeliveryPrimary() throws IOException {
25772575
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
25782576
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1,
25792577
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
2580-
Engine.Index operation = appendOnlyReplica(doc, false, 1);
2581-
Engine.Index retry = appendOnlyReplica(doc, true, 1);
2578+
final Engine.Index operation = appendOnlyReplica(doc, false, 1);
2579+
final Engine.Index retry = appendOnlyReplica(doc, true, 1);
25822580
if (randomBoolean()) {
25832581
Engine.IndexResult indexResult = engine.index(operation);
25842582
assertFalse(engine.indexWriterHasDeletions());
@@ -2587,8 +2585,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
25872585
Engine.IndexResult retryResult = engine.index(retry);
25882586
assertFalse(engine.indexWriterHasDeletions());
25892587
assertEquals(1, engine.getNumVersionLookups());
2590-
assertNotNull(retryResult.getTranslogLocation());
2591-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2588+
assertNull(retryResult.getTranslogLocation()); // we didn't index it nor put it in the translog
25922589
} else {
25932590
Engine.IndexResult retryResult = engine.index(retry);
25942591
assertFalse(engine.indexWriterHasDeletions());
@@ -2597,29 +2594,24 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
25972594
Engine.IndexResult indexResult = engine.index(operation);
25982595
assertFalse(engine.indexWriterHasDeletions());
25992596
assertEquals(2, engine.getNumVersionLookups());
2600-
assertNotNull(retryResult.getTranslogLocation());
2601-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2597+
assertNull(indexResult.getTranslogLocation()); // we didn't index it nor put it in the translog
26022598
}
26032599

26042600
engine.refresh("test");
26052601
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
26062602
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
26072603
assertEquals(1, topDocs.totalHits);
26082604
}
2609-
operation = randomAppendOnly(doc, false, 1);
2610-
retry = randomAppendOnly(doc, true, 1);
26112605
if (randomBoolean()) {
26122606
Engine.IndexResult indexResult = engine.index(operation);
2613-
assertNotNull(indexResult.getTranslogLocation());
2607+
assertNull(indexResult.getTranslogLocation()); // we don't index because a retry has already been processed.
26142608
Engine.IndexResult retryResult = engine.index(retry);
2615-
assertNotNull(retryResult.getTranslogLocation());
2616-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2609+
assertNull(retryResult.getTranslogLocation());
26172610
} else {
26182611
Engine.IndexResult retryResult = engine.index(retry);
2619-
assertNotNull(retryResult.getTranslogLocation());
2612+
assertNull(retryResult.getTranslogLocation());
26202613
Engine.IndexResult indexResult = engine.index(operation);
2621-
assertNotNull(retryResult.getTranslogLocation());
2622-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2614+
assertNull(indexResult.getTranslogLocation());
26232615
}
26242616

26252617
engine.refresh("test");
@@ -2645,8 +2637,7 @@ public void testDoubleDeliveryReplica() throws IOException {
26452637
Engine.IndexResult retryResult = engine.index(duplicate);
26462638
assertFalse(engine.indexWriterHasDeletions());
26472639
assertEquals(2, engine.getNumVersionLookups());
2648-
assertNotNull(retryResult.getTranslogLocation());
2649-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
2640+
assertNull(retryResult.getTranslogLocation());
26502641
} else {
26512642
Engine.IndexResult retryResult = engine.index(duplicate);
26522643
assertFalse(engine.indexWriterHasDeletions());
@@ -2658,8 +2649,7 @@ public void testDoubleDeliveryReplica() throws IOException {
26582649
Engine.IndexResult indexResult = engine.index(operation);
26592650
assertFalse(engine.indexWriterHasDeletions());
26602651
assertEquals(2, engine.getNumVersionLookups());
2661-
assertNotNull(retryResult.getTranslogLocation());
2662-
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
2652+
assertNull(indexResult.getTranslogLocation()); // we didn't index, no need to put in translog
26632653
}
26642654

26652655
engine.refresh("test");

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.action.admin.indices.stats.CommonStats;
3939
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
4040
import org.elasticsearch.action.admin.indices.stats.ShardStats;
41+
import org.elasticsearch.action.index.IndexRequest;
4142
import org.elasticsearch.action.support.PlainActionFuture;
4243
import org.elasticsearch.cluster.metadata.IndexMetaData;
4344
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -77,6 +78,7 @@
7778
import org.elasticsearch.index.mapper.Mapping;
7879
import org.elasticsearch.index.mapper.ParseContext;
7980
import org.elasticsearch.index.mapper.ParsedDocument;
81+
import org.elasticsearch.index.mapper.SourceToParse;
8082
import org.elasticsearch.index.mapper.Uid;
8183
import org.elasticsearch.index.mapper.UidFieldMapper;
8284
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -123,6 +125,7 @@
123125

124126
import static java.util.Collections.emptyMap;
125127
import static java.util.Collections.emptySet;
128+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
126129
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
127130
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
128131
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -282,14 +285,14 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe
282285

283286
if (randomBoolean()) {
284287
// relocation target
285-
indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node",
288+
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
286289
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
287290
} else if (randomBoolean()) {
288291
// simulate promotion
289292
indexShard = newStartedShard(false);
290293
ShardRouting replicaRouting = indexShard.routingEntry();
291294
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
292-
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
295+
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
293296
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
294297
indexShard.updateRoutingEntry(primaryRouting);
295298
} else {
@@ -341,7 +344,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
341344
case 1: {
342345
// initializing replica / primary
343346
final boolean relocating = randomBoolean();
344-
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node",
347+
ShardRouting routing = newShardRouting(shardId, "local_node",
345348
relocating ? "sourceNode" : null,
346349
relocating ? randomBoolean() : false,
347350
ShardRoutingState.INITIALIZING,
@@ -353,7 +356,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
353356
// relocation source
354357
indexShard = newStartedShard(true);
355358
ShardRouting routing = indexShard.routingEntry();
356-
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
359+
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
357360
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
358361
indexShard.updateRoutingEntry(routing);
359362
indexShard.relocated("test");
@@ -914,6 +917,38 @@ public void testRecoverFromStore() throws IOException {
914917
closeShards(newShard);
915918
}
916919

920+
public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
921+
final IndexShard shard = newStartedShard(false);
922+
final Engine.Index index = shard.prepareIndexOnReplica(
923+
SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"),
924+
XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
925+
final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL);
926+
shard.delete(delete);
927+
final int translogOps;
928+
if (randomBoolean()) {
929+
flushShard(shard, true); // lucene won't flush due to just one pending delete
930+
translogOps = 0;
931+
} else {
932+
translogOps = 1;
933+
}
934+
final Engine.IndexResult result = shard.index(index);
935+
assertThat(result.getTranslogLocation(), nullValue());
936+
final ShardRouting replicaRouting = shard.routingEntry();
937+
IndexShard newShard = reinitShard(shard,
938+
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
939+
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
940+
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
941+
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
942+
assertTrue(newShard.recoverFromStore());
943+
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
944+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
945+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
946+
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
947+
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
948+
assertDocCount(newShard, 0);
949+
closeShards(newShard);
950+
}
951+
917952
public void testRecoverFromCleanStore() throws IOException {
918953
final IndexShard shard = newStartedShard(true);
919954
indexDoc(shard, "test", "0");
@@ -1336,7 +1371,7 @@ public void testRecoverFromLocalShard() throws IOException {
13361371
sourceShard.refresh("test");
13371372

13381373

1339-
ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
1374+
ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
13401375
ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE);
13411376

13421377
final IndexShard targetShard;

0 commit comments

Comments
 (0)