Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -693,14 +693,23 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
} else {
if (index.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// This can happen if the primary is still on an old node and send traffic without seq# or we recover from translog
// created by an old version.
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
"index is newly created but op has no sequence numbers. op: " + index;
opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
} else if (index.seqNo() <= seqNoService.getLocalCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
}
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
Expand Down Expand Up @@ -979,12 +988,21 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
} else {
if (delete.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_alpha1) :
"index is newly created but op has no sequence numbers. op: " + delete;
opVsLucene = compareOpToLuceneDocBasedOnVersions(delete);
} else if (delete.seqNo() <= seqNoService.getLocalCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
}

final DeletionStrategy plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -339,7 +341,7 @@ public void onFailure(Exception e) {
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
TestShardRouting.newShardRouting(
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
Expand Down Expand Up @@ -416,7 +418,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
TestShardRouting.newShardRouting(
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
Expand Down Expand Up @@ -458,13 +460,13 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E

if (randomBoolean()) {
// relocation target
indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node",
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
} else if (randomBoolean()) {
// simulate promotion
indexShard = newStartedShard(false);
ShardRouting replicaRouting = indexShard.routingEntry();
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L,
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
Expand Down Expand Up @@ -520,7 +522,7 @@ public void testOperationPermitOnReplicaShards() throws Exception {
case 1: {
// initializing replica / primary
final boolean relocating = randomBoolean();
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node",
ShardRouting routing = newShardRouting(shardId, "local_node",
relocating ? "sourceNode" : null,
relocating ? randomBoolean() : false,
ShardRoutingState.INITIALIZING,
Expand All @@ -533,7 +535,7 @@ public void testOperationPermitOnReplicaShards() throws Exception {
// relocation source
indexShard = newStartedShard(true);
ShardRouting routing = indexShard.routingEntry();
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexShard.relocated("test", primaryContext -> {});
Expand Down Expand Up @@ -1377,6 +1379,47 @@ protected void doRun() throws Exception {
closeShards(shard);
}

public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
final IndexShard shard = newStartedShard(false);
final Consumer<Mapping> mappingConsumer = getMappingUpdater(shard, "test");
shard.applyDeleteOperationOnReplica(1, 1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer);
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer);

// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick
// around
shard.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);

final int translogOps;
if (randomBoolean()) {
logger.info("--> flushing shard");
flushShard(shard);
translogOps = 2;
} else if (randomBoolean()) {
shard.getEngine().rollTranslogGeneration();
translogOps = 3;
} else {
translogOps = 3;
}

final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard,
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry()));
assertDocCount(newShard, 1);
closeShards(newShard);
}

public void testRecoverFromStore() throws IOException {
final IndexShard shard = newStartedShard(true);
int totalOps = randomInt(10);
Expand Down Expand Up @@ -1939,7 +1982,7 @@ public void testRecoverFromLocalShard() throws IOException {
sourceShard.refresh("test");


ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE);

final IndexShard targetShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -79,4 +84,52 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception {
assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0)));
}
}

public void testRecoveryWithOutOfOrderDelete() throws Exception {
try (ReplicationGroup shards = createGroup(1)) {
shards.startAll();
// create out of order delete and index op on replica
final IndexShard orgReplica = shards.getReplicas().get(0);
orgReplica.applyDeleteOperationOnReplica(1, 1, 2, "type", "id", VersionType.EXTERNAL, u -> {});
orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation
orgReplica.applyIndexOperationOnReplica(0, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON),
u -> {});

// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation
// stick around
orgReplica.applyIndexOperationOnReplica(3, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {});

final int translogOps;
if (randomBoolean()) {
if (randomBoolean()) {
logger.info("--> flushing shard (translog will be trimmed)");
IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
);
orgReplica.indexSettings().updateIndexMetaData(builder.build());
orgReplica.onSettingsChanged();
translogOps = 3; // 2 ops + seqno gaps
} else {
logger.info("--> flushing shard (translog will be retained)");
translogOps = 4; // 3 ops + seqno gaps
}
flushShard(orgReplica);
} else {
translogOps = 4; // 3 ops + seqno gaps
}

final IndexShard orgPrimary = shards.getPrimary();
shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed.

IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(1);

assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps));
}
}
}