Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
}
} else {
} else if (operationResult.getTranslogLocation() != null) { // out of order ops are not added to the translog
location = locationToSync(location, operationResult.getTranslogLocation());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,11 @@ public IndexResult index(Index index) throws IOException {
} else if (plan.indexIntoLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
assert index.origin() != Operation.Origin.PRIMARY;
indexResult = new IndexResult(plan.versionForIndexing, plan.currentNotFoundOrDeleted);
}
if (indexResult.hasFailure() == false &&
plan.indexIntoLucene && // if we didn't store it in lucene, there is no need to store it in the translog
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Index(index, indexResult));
Expand Down Expand Up @@ -541,7 +543,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processButSkipLucene(false, index.version());
plan = IndexingStrategy.skipAsStale(false, index.version());
} else {
plan = IndexingStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()
Expand Down Expand Up @@ -704,7 +706,7 @@ static IndexingStrategy overrideExistingAsIfNotThere(long versionForIndexing) {
return new IndexingStrategy(true, true, true, versionForIndexing, null);
}

static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
static IndexingStrategy skipAsStale(boolean currentNotFoundOrDeleted, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, versionForIndexing, null);
}
}
Expand Down Expand Up @@ -758,9 +760,11 @@ public DeleteResult delete(Delete delete) throws IOException {
} else if (plan.deleteFromLucene) {
deleteResult = deleteInLucene(delete, plan);
} else {
assert delete.origin() != Operation.Origin.PRIMARY;
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.currentlyDeleted == false);
}
if (!deleteResult.hasFailure() &&
plan.deleteFromLucene && // if it wasn't applied to lucene, we don't store it in the translog
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Delete(delete, deleteResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2522,8 +2522,8 @@ public BytesRef binaryValue() {
public void testDoubleDeliveryPrimary() throws IOException {
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1L,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyPrimary(doc, false, 1);
Engine.Index retry = appendOnlyPrimary(doc, true, 1);
final Engine.Index operation = appendOnlyPrimary(doc, false, 1);
final Engine.Index retry = appendOnlyPrimary(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
Expand Down Expand Up @@ -2551,8 +2551,6 @@ public void testDoubleDeliveryPrimary() throws IOException {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
operation = randomAppendOnly(doc, false, 1);
retry = randomAppendOnly(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
Expand All @@ -2563,7 +2561,7 @@ public void testDoubleDeliveryPrimary() throws IOException {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertNotNull(indexResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
}

Expand All @@ -2577,8 +2575,8 @@ public void testDoubleDeliveryPrimary() throws IOException {
public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
final ParsedDocument doc = testParsedDocument("1", "test", null, System.currentTimeMillis(), -1,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = appendOnlyReplica(doc, false, 1);
Engine.Index retry = appendOnlyReplica(doc, true, 1);
final Engine.Index operation = appendOnlyReplica(doc, false, 1);
final Engine.Index retry = appendOnlyReplica(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
Expand All @@ -2587,8 +2585,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
Engine.IndexResult retryResult = engine.index(retry);
assertFalse(engine.indexWriterHasDeletions());
assertEquals(1, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
assertNull(retryResult.getTranslogLocation()); // we didn't index it nor put it in the translog
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertFalse(engine.indexWriterHasDeletions());
Expand All @@ -2597,29 +2594,24 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
assertNull(indexResult.getTranslogLocation()); // we didn't index it nor put it in the translog
}

engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
operation = randomAppendOnly(doc, false, 1);
retry = randomAppendOnly(doc, true, 1);
if (randomBoolean()) {
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getTranslogLocation());
assertNull(indexResult.getTranslogLocation()); // we don't index because a retry has already been processed.
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
assertNull(retryResult.getTranslogLocation());
} else {
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getTranslogLocation());
assertNull(retryResult.getTranslogLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
assertNull(indexResult.getTranslogLocation());
}

engine.refresh("test");
Expand All @@ -2645,8 +2637,7 @@ public void testDoubleDeliveryReplica() throws IOException {
Engine.IndexResult retryResult = engine.index(duplicate);
assertFalse(engine.indexWriterHasDeletions());
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0);
assertNull(retryResult.getTranslogLocation());
} else {
Engine.IndexResult retryResult = engine.index(duplicate);
assertFalse(engine.indexWriterHasDeletions());
Expand All @@ -2658,8 +2649,7 @@ public void testDoubleDeliveryReplica() throws IOException {
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertEquals(2, engine.getNumVersionLookups());
assertNotNull(retryResult.getTranslogLocation());
assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0);
assertNull(indexResult.getTranslogLocation()); // we didn't index, no need to put in translog
}

engine.refresh("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
Expand Down Expand Up @@ -123,6 +125,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -282,14 +285,14 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe

if (randomBoolean()) {
// relocation target
indexShard = newShard(TestShardRouting.newShardRouting(shardId, "local_node", "other node",
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
} else if (randomBoolean()) {
// simulate promotion
indexShard = newStartedShard(false);
ShardRouting replicaRouting = indexShard.routingEntry();
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
} else {
Expand Down Expand Up @@ -341,7 +344,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
case 1: {
// initializing replica / primary
final boolean relocating = randomBoolean();
ShardRouting routing = TestShardRouting.newShardRouting(shardId, "local_node",
ShardRouting routing = newShardRouting(shardId, "local_node",
relocating ? "sourceNode" : null,
relocating ? randomBoolean() : false,
ShardRoutingState.INITIALIZING,
Expand All @@ -353,7 +356,7 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe
// relocation source
indexShard = newStartedShard(true);
ShardRouting routing = indexShard.routingEntry();
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
routing = newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
indexShard.updateRoutingEntry(routing);
indexShard.relocated("test");
Expand Down Expand Up @@ -914,6 +917,38 @@ public void testRecoverFromStore() throws IOException {
closeShards(newShard);
}

public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
final IndexShard shard = newStartedShard(false);
final Engine.Index index = shard.prepareIndexOnReplica(
SourceToParse.source(SourceToParse.Origin.REPLICA, shard.shardId().getIndexName(), "type", "id", new BytesArray("{}"),
XContentType.JSON), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
final Engine.Delete delete = shard.prepareDeleteOnReplica("type", "id", 2, VersionType.EXTERNAL);
shard.delete(delete);
final int translogOps;
if (randomBoolean()) {
flushShard(shard, true); // lucene won't flush due to just one pending delete
translogOps = 0;
} else {
translogOps = 1;
}
final Engine.IndexResult result = shard.index(index);
assertThat(result.getTranslogLocation(), nullValue());
final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard,
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 0);
closeShards(newShard);
}

public void testRecoverFromCleanStore() throws IOException {
final IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0");
Expand Down Expand Up @@ -1336,7 +1371,7 @@ public void testRecoverFromLocalShard() throws IOException {
sourceShard.refresh("test");


ShardRouting targetRouting = TestShardRouting.newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
ShardRouting targetRouting = newShardRouting(new ShardId("index_1", "index_1", 0), "n1", true,
ShardRoutingState.INITIALIZING, RecoverySource.LocalShardsRecoverySource.INSTANCE);

final IndexShard targetShard;
Expand Down