Skip to content

Commit ab1636d

Browse files
authored
Engine - do not index operations with seq# lower than the local checkpoint into lucene (#25827)
When a replica processes out of order operations, it can drop some due to version comparisons. In the past that would have resulted in a VersionConflictException being thrown and the operation was totally ignored. With the seq# push, we started storing these operations in the translog (but not indexing them into lucene) in order to have complete op histories to facilitate ops based recoveries. This in turn had the undesired effect that deleted docs may be resurrected during recovery in some extreme edge situation (see a complete explanation below). This PR contains a simple fix, which is also an optimization for the recovery process, incoming operation that have a seq# lower than the current local checkpoint (i.e., have already been processed) should not be indexed into lucene. Note that sometimes we can also skip storing them in the translog, but this is not required for the fix and is more complicated. This is the equivalent of #25592 ## More details on resurrected ops Consider two operations: - Index d1, seq no 1 - Delete d1, seq no 3 On a replica they come out of order: - Translog gen 1 contains: - delete (seqNo 3) - Translog gen 2 contains: - index (seqNo 1) (wasn't indexed into lucene, but put into the translog) - another operation (seqNo 10) - Translog gen 3 - another op (seqNo 9) - Engine commits with: - local checkpoint 9 - refers to gen 2 If this replica becomes a primary: - Local recovery will replay translog gen 2 and up, causing index #1 to be re-index. - Even if recovery will start at gen 3, the translog retention policy will cause file based recovery to replay the entire translog. If it happens to start at gen 2 (but not 1), we will run into the same problem. #### Some context - out of order delivery involving deletes: On normal operations, this relies on the gc_deletes setting. We assume that the setting represents an upper bound on the time between the index and the delete operation. The index operation will be detected as stale based on the tombstone map in the LiveVersionMap. Recovery presents a challenge as it can replay an old index operation that was in the translog and override a delete operation that was done when the engine was opened (and is not part of the replayed snapshot). To deal with this situation, we disable GC deletes (i.e. retain all deletes) for the duration of recoveries. This means that the delete operation will be remembered and the index operation ignored. Both of the above scenarios (local recover + peer recovery) create a situation where the delete operation is never replayed. It this "lost" as lucene doesn't remember it happened and our LiveVersionMap is populated with it. #### Solution: Note that both local and peer recovery represent a scenario where we replay translog ops on top of an existing lucene index, potentially with ongoing indexing. Therefore we can treat them the same. The local checkpoint in Lucene represent a marker indicating that all operations below it were performed on the index. This is the only form of "memory" that we have that relates to deletes. If we can achieve the following: 1) All ops below the local checkpoint are not indexed to lucene. 2) All ops above the local checkpoint are It will mean that all variants are covered: (i# == index op seq#, d# == delete op seq#, lc == local checkpoint in commit) 1) i# < d# <= lc - document is already deleted in lucene and stays that way. 2) i# <= lc < d# - delete is replayed on index - document is deleted 3) lc < i# < d# - index is replayed and then delete - document is deleted. More formally - we want to make sure that for all ops that performed on the primary o1 and o2, if o2 is processed on a shard before o1, o1 will be dropped. We have the following scenarios 1) If both o1 or o2 are not included in the replayed snapshot and are above it (i.e., have a higher seq#), they fall under the gc deletes assumption. 2) If both o1 is part of the replayed snapshot but o2 is above it: - if o2 arrives first, o1 must arrive due to the recovery and potentially via replication as well. since gc deletes is disabled we are guaranteed to know of o2's existence. 3) If both o2 and o1 are part of the replayed snapshot: - we fall under the same scenarios as #2 - disabling GC deletes ensures we know of o2 if it arrives first. 4) If o1 falls before the snapshot and o2 is either part of the snapshot or higher: - Since the snapshot is guaranteed to contain all ops that are not part of lucene and are above the lc in the commit used, this means that o1 is part of lucene and o1 < local checkpoint. This means it won't be processed and we're not in the scenario we're discussing. 5) If o2 falls before the snapshot but o1 is part of it: - by the same reasoning above, o2 is < local checkpoint. Since o1 < o2, we also get o1 < local checkpoint and this will be dropped. #### Implementation: For local recovery, we can filter the ops we read of the translog and avoid replaying them. For peer recovery this is tricky as we do want to send the operations in order to have some history on the target shard. Filtering operations on the engine level (i.e., not indexing to lucene if op seq# <= lc) would work for both.
1 parent c378432 commit ab1636d

File tree

3 files changed

+127
-13
lines changed

3 files changed

+127
-13
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -693,14 +693,23 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
693693
// this allows to ignore the case where a document was found in the live version maps in
694694
// a delete state and return false for the created flag in favor of code simplicity
695695
final OpVsLuceneDocStatus opVsLucene;
696-
if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
697-
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
698-
} else {
696+
if (index.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
699697
// This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog
700698
// created by an old version.
701699
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
702700
"index is newly created but op has no sequence numbers. op: " + index;
703701
opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
702+
} else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
703+
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
704+
// this can happen during recovery where older operations are sent from the translog that are already
705+
// part of the lucene commit (either from a peer recovery or a local translog)
706+
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
707+
// question may have been deleted in an out of order op that is not replayed.
708+
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
709+
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
710+
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
711+
} else {
712+
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
704713
}
705714
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
706715
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
@@ -979,12 +988,21 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
979988
// this allows to ignore the case where a document was found in the live version maps in
980989
// a delete state and return true for the found flag in favor of code simplicity
981990
final OpVsLuceneDocStatus opVsLucene;
982-
if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
983-
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
984-
} else {
991+
if (delete.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
985992
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
986993
"index is newly created but op has no sequence numbers. op: " + delete;
987994
opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
995+
} else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
996+
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
997+
// this can happen during recovery where older operations are sent from the translog that are already
998+
// part of the lucene commit (either from a peer recovery or a local translog)
999+
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
1000+
// question may have been deleted in an out of order op that is not replayed.
1001+
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
1002+
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
1003+
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
1004+
} else {
1005+
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
9881006
}
9891007

9901008
final DeletionStrategy plan;

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,14 @@
125125
import java.util.concurrent.atomic.AtomicLong;
126126
import java.util.concurrent.atomic.AtomicReference;
127127
import java.util.function.BiConsumer;
128+
import java.util.function.Consumer;
128129
import java.util.function.LongFunction;
129130
import java.util.stream.Collectors;
130131
import java.util.stream.IntStream;
131132

132133
import static java.util.Collections.emptyMap;
133134
import static java.util.Collections.emptySet;
135+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
134136
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
135137
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
136138
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -339,7 +341,7 @@ public void onFailure(Exception e) {
339341
// promote the replica
340342
final ShardRouting replicaRouting = indexShard.routingEntry();
341343
final ShardRouting primaryRouting =
342-
TestShardRouting.newShardRouting(
344+
newShardRouting(
343345
replicaRouting.shardId(),
344346
replicaRouting.currentNodeId(),
345347
null,
@@ -416,7 +418,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
416418
// promote the replica
417419
final ShardRouting replicaRouting = indexShard.routingEntry();
418420
final ShardRouting primaryRouting =
419-
TestShardRouting.newShardRouting(
421+
newShardRouting(
420422
replicaRouting.shardId(),
421423
replicaRouting.currentNodeId(),
422424
null,
@@ -458,13 +460,13 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
458460

459461
if (randomBoolean()) {
460462
// relocation target
461-
indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node",
463+
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
462464
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
463465
} else if (randomBoolean()) {
464466
// simulate promotion
465467
indexShard = newStartedShard(false);
466468
ShardRouting replicaRouting = indexShard.routingEntry();
467-
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
469+
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
468470
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
469471
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L,
470472
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
@@ -520,7 +522,7 @@ public void testOperationPermitOnReplicaShards() throws Exception {
520522
case 1: {
521523
// initializing replica / primary
522524
final boolean relocating = randomBoolean();
523-
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node",
525+
ShardRouting routing = newShardRouting(shardId, "local_node",
524526
relocating ? "sourceNode" : null,
525527
relocating ? randomBoolean() : false,
526528
ShardRoutingState.INITIALIZING,
@@ -533,7 +535,7 @@ public void testOperationPermitOnReplicaShards() throws Exception {
533535
// relocation source
534536
indexShard = newStartedShard(true);
535537
ShardRouting routing = indexShard.routingEntry();
536-
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
538+
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
537539
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
538540
IndexShardTestCase.updateRoutingEntry(indexShard, routing);
539541
indexShard.relocated("test", primaryContext -> {});
@@ -1377,6 +1379,47 @@ protected void doRun() throws Exception {
13771379
closeShards(shard);
13781380
}
13791381

1382+
public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
1383+
final IndexShard shard = newStartedShard(false);
1384+
final Consumer<Mapping> mappingConsumer = getMappingUpdater(shard, "test");
1385+
shard.applyDeleteOperationOnReplica(1, 1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer);
1386+
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
1387+
shard.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
1388+
SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
1389+
1390+
// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick
1391+
// around
1392+
shard.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
1393+
SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
1394+
1395+
final int translogOps;
1396+
if (randomBoolean()) {
1397+
logger.info("--> flushing shard");
1398+
flushShard(shard);
1399+
translogOps = 2;
1400+
} else if (randomBoolean()) {
1401+
shard.getEngine().rollTranslogGeneration();
1402+
translogOps = 3;
1403+
} else {
1404+
translogOps = 3;
1405+
}
1406+
1407+
final ShardRouting replicaRouting = shard.routingEntry();
1408+
IndexShard newShard = reinitShard(shard,
1409+
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
1410+
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
1411+
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
1412+
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
1413+
assertTrue(newShard.recoverFromStore());
1414+
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
1415+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
1416+
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
1417+
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
1418+
updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry()));
1419+
assertDocCount(newShard, 1);
1420+
closeShards(newShard);
1421+
}
1422+
13801423
public void testRecoverFromStore() throws IOException {
13811424
final IndexShard shard = newStartedShard(true);
13821425
int totalOps = randomInt(10);
@@ -1939,7 +1982,7 @@ public void testRecoverFromLocalShard() throws IOException {
19391982
sourceShard.refresh("test");
19401983

19411984

1942-
ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
1985+
ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
19431986
ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE);
19441987

19451988
final IndexShard targetShard;

core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.elasticsearch.action.index.IndexRequest;
2223
import org.elasticsearch.cluster.metadata.IndexMetaData;
24+
import org.elasticsearch.common.bytes.BytesArray;
2325
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.xcontent.XContentType;
2427
import org.elasticsearch.index.IndexSettings;
28+
import org.elasticsearch.index.VersionType;
29+
import org.elasticsearch.index.mapper.SourceToParse;
2530
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
2631
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
2732
import org.elasticsearch.index.shard.IndexShard;
@@ -79,4 +84,52 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception {
7984
assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0)));
8085
}
8186
}
87+
88+
public void testRecoveryWithOutOfOrderDelete() throws Exception {
89+
try (ReplicationGroup shards = createGroup(1)) {
90+
shards.startAll();
91+
// create out of order delete and index op on replica
92+
final IndexShard orgReplica = shards.getReplicas().get(0);
93+
orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, u -> {});
94+
orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation
95+
orgReplica.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
96+
SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON),
97+
u -> {});
98+
99+
// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation
100+
// stick around
101+
orgReplica.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
102+
SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {});
103+
104+
final int translogOps;
105+
if (randomBoolean()) {
106+
if (randomBoolean()) {
107+
logger.info("--> flushing shard (translog will be trimmed)");
108+
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
109+
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
110+
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
111+
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
112+
);
113+
orgReplica.indexSettings().updateIndexMetaData(builder.build());
114+
orgReplica.onSettingsChanged();
115+
translogOps = 3; // 2 ops + seqno gaps
116+
} else {
117+
logger.info("--> flushing shard (translog will be retained)");
118+
translogOps = 4; // 3 ops + seqno gaps
119+
}
120+
flushShard(orgReplica);
121+
} else {
122+
translogOps = 4; // 3 ops + seqno gaps
123+
}
124+
125+
final IndexShard orgPrimary = shards.getPrimary();
126+
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.
127+
128+
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
129+
shards.recoverReplica(newReplica);
130+
shards.assertAllEqual(1);
131+
132+
assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps));
133+
}
134+
}
82135
}

0 commit comments

Comments
 (0)