Skip to content

Commit 71b89ef

Browse files
committed
Add replica ops with version conflict to translog
An operation that completed successfully on a primary can result in a version conflict on a replica due to the asynchronous nature of operations. When a replica operation results in a version conflict, the operation is not added to the translog. This leads to gaps in the translog which is problematic as it can lead to situations where a replica shard can never advance its local checkpoint. As such operations are just normal course of business for a replica shard, these operations should be treated as if they completed successfully. This commit adds these operations to the translog.
1 parent 4f89455 commit 71b89ef

File tree

4 files changed

+122
-96
lines changed

4 files changed

+122
-96
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.logging.log4j.util.Supplier;
24+
import org.elasticsearch.ExceptionsHelper;
2425
import org.elasticsearch.action.DocWriteRequest;
2526
import org.elasticsearch.action.DocWriteResponse;
2627
import org.elasticsearch.action.delete.DeleteRequest;
@@ -29,6 +30,7 @@
2930
import org.elasticsearch.action.index.IndexResponse;
3031
import org.elasticsearch.action.support.ActionFilters;
3132
import org.elasticsearch.action.support.replication.ReplicationOperation;
33+
import org.elasticsearch.action.support.TransportActions;
3234
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
3335
import org.elasticsearch.action.support.replication.TransportWriteAction;
3436
import org.elasticsearch.action.update.UpdateHelper;
@@ -65,9 +67,6 @@
6567

6668
import java.util.Map;
6769

68-
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
69-
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
70-
7170
/** Performs shard-level bulk (index, delete or update) operations */
7271
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
7372

@@ -235,6 +234,10 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
235234
return location;
236235
}
237236

237+
private static boolean isConflictException(final Exception e) {
238+
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
239+
}
240+
238241
private static class UpdateResultHolder {
239242
final BulkItemRequest replicaRequest;
240243
final Engine.Result operationResult;
@@ -392,7 +395,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
392395
|| failure instanceof IndexShardClosedException
393396
: "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" +
394397
" failures. got " + failure;
395-
if (!ignoreReplicaException(failure)) {
398+
if (!TransportActions.isShardNotAvailableException(failure)) {
396399
throw failure;
397400
}
398401
} else {
@@ -401,7 +404,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
401404
} catch (Exception e) {
402405
// if its not an ignore replica failure, we need to make sure to bubble up the failure
403406
// so we will fail the shard
404-
if (!ignoreReplicaException(e)) {
407+
if (!TransportActions.isShardNotAvailableException(e)) {
405408
throw e;
406409
}
407410
}

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void onFailure(Exception replicaException) {
202202
shard,
203203
replicaRequest),
204204
replicaException);
205-
if (ignoreReplicaException(replicaException)) {
205+
if (TransportActions.isShardNotAvailableException(replicaException)) {
206206
decPendingAndFinishIfNeeded();
207207
} else {
208208
RestStatus restStatus = ExceptionsHelper.status(replicaException);
@@ -314,30 +314,6 @@ private void finishAsFailed(Exception exception) {
314314
}
315315
}
316316

317-
318-
/**
319-
* Should an exception be ignored when the operation is performed on the replica.
320-
*/
321-
public static boolean ignoreReplicaException(Exception e) {
322-
if (TransportActions.isShardNotAvailableException(e)) {
323-
return true;
324-
}
325-
// on version conflict or document missing, it means
326-
// that a new change has crept into the replica, and it's fine
327-
if (isConflictException(e)) {
328-
return true;
329-
}
330-
return false;
331-
}
332-
333-
public static boolean isConflictException(Throwable t) {
334-
final Throwable cause = ExceptionsHelper.unwrapCause(t);
335-
// on version conflict or document missing, it means
336-
// that a new change has crept into the replica, and it's fine
337-
return cause instanceof VersionConflictEngineException;
338-
}
339-
340-
341317
public interface Primary<
342318
Request extends ReplicationRequest<Request>,
343319
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -477,10 +477,7 @@ private <T extends Result> Optional<T> checkVersionConflict(
477477
}
478478

479479
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
480-
if (op.origin().isRecovery()) {
481-
// version conflict, but okay
482-
result = onSuccess.get();
483-
} else {
480+
if (op.origin() == Operation.Origin.PRIMARY) {
484481
// fatal version conflict
485482
final VersionConflictEngineException e =
486483
new VersionConflictEngineException(
@@ -489,8 +486,9 @@ private <T extends Result> Optional<T> checkVersionConflict(
489486
op.id(),
490487
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
491488
result = onFailure.apply(e);
489+
} else {
490+
result = onSuccess.get();
492491
}
493-
494492
return Optional.of(result);
495493
} else {
496494
return Optional.empty();
@@ -690,7 +688,7 @@ private IndexResult innerIndex(Index index) throws IOException {
690688
seqNo = seqNoService().generateSeqNo();
691689
}
692690

693-
/**
691+
/*
694692
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
695693
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
696694
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
@@ -707,10 +705,12 @@ private IndexResult innerIndex(Index index) throws IOException {
707705
update(index.uid(), index.docs(), indexWriter);
708706
}
709707
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
708+
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
709+
}
710+
if (!indexResult.hasFailure()) {
710711
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
711712
? translog.add(new Translog.Index(index, indexResult))
712713
: null;
713-
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
714714
indexResult.setTranslogLocation(location);
715715
}
716716
indexResult.setTook(System.nanoTime() - index.startTime());
@@ -804,18 +804,17 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
804804

805805
final long expectedVersion = delete.version();
806806

807-
final Optional<DeleteResult> result =
807+
final Optional<DeleteResult> checkVersionConflictResult =
808808
checkVersionConflict(
809809
delete,
810810
currentVersion,
811811
expectedVersion,
812812
deleted,
813813
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
814814
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
815-
816815
final DeleteResult deleteResult;
817-
if (result.isPresent()) {
818-
deleteResult = result.get();
816+
if (checkVersionConflictResult.isPresent()) {
817+
deleteResult = checkVersionConflictResult.get();
819818
} else {
820819
if (delete.origin() == Operation.Origin.PRIMARY) {
821820
seqNo = seqNoService().generateSeqNo();
@@ -824,11 +823,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
824823
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
825824
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
826825
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
826+
827+
versionMap.putUnderLock(delete.uid().bytes(),
828+
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
829+
}
830+
if (!deleteResult.hasFailure()) {
827831
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
828832
? translog.add(new Translog.Delete(delete, deleteResult))
829833
: null;
830-
versionMap.putUnderLock(delete.uid().bytes(),
831-
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
832834
deleteResult.setTranslogLocation(location);
833835
}
834836
deleteResult.setTook(System.nanoTime() - delete.startTime());

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 98 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,76 +1478,121 @@ public void testVersioningCreateExistsExceptionWithFlush() {
14781478
}
14791479

14801480
public void testVersioningReplicaConflict1() {
1481-
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
1482-
Engine.Index index = new Engine.Index(newUid("1"), doc);
1483-
Engine.IndexResult indexResult = engine.index(index);
1484-
assertThat(indexResult.getVersion(), equalTo(1L));
1481+
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
1482+
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
1483+
final Engine.IndexResult v1Result = engine.index(v1Index);
1484+
assertThat(v1Result.getVersion(), equalTo(1L));
14851485

1486-
index = new Engine.Index(newUid("1"), doc);
1487-
indexResult = engine.index(index);
1488-
assertThat(indexResult.getVersion(), equalTo(2L));
1486+
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
1487+
final Engine.IndexResult v2Result = engine.index(v2Index);
1488+
assertThat(v2Result.getVersion(), equalTo(2L));
14891489

14901490
// apply the second index to the replica, should work fine
1491-
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
1492-
indexResult = replicaEngine.index(index);
1493-
assertThat(indexResult.getVersion(), equalTo(2L));
1494-
1495-
long seqNo = indexResult.getSeqNo();
1496-
long primaryTerm = index.primaryTerm();
1497-
// now, the old one should not work
1498-
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
1499-
indexResult = replicaEngine.index(index);
1500-
assertTrue(indexResult.hasFailure());
1501-
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
1491+
final Engine.Index replicaV2Index = new Engine.Index(
1492+
newUid("1"),
1493+
doc,
1494+
v2Result.getSeqNo(),
1495+
v2Index.primaryTerm(),
1496+
v2Result.getVersion(),
1497+
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
1498+
REPLICA,
1499+
0,
1500+
-1,
1501+
false);
1502+
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
1503+
assertThat(replicaV2Result.getVersion(), equalTo(2L));
1504+
1505+
// now, the old one should produce an indexing result
1506+
final Engine.Index replicaV1Index = new Engine.Index(
1507+
newUid("1"),
1508+
doc,
1509+
v1Result.getSeqNo(),
1510+
v1Index.primaryTerm(),
1511+
v1Result.getVersion(),
1512+
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
1513+
REPLICA,
1514+
0,
1515+
-1,
1516+
false);
1517+
final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
1518+
assertFalse(replicaV1Result.hasFailure());
1519+
assertFalse(replicaV1Result.isCreated());
1520+
assertThat(replicaV1Result.getVersion(), equalTo(2L));
15021521

15031522
// second version on replica should fail as well
1504-
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 2L
1505-
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
1506-
indexResult = replicaEngine.index(index);
1507-
assertThat(indexResult.getVersion(), equalTo(2L));
1508-
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
1523+
final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index);
1524+
assertFalse(replicaV2Result.hasFailure());
1525+
assertFalse(replicaV1Result.isCreated());
1526+
assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L));
15091527
}
15101528

15111529
public void testVersioningReplicaConflict2() {
1512-
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
1513-
Engine.Index index = new Engine.Index(newUid("1"), doc);
1514-
Engine.IndexResult indexResult = engine.index(index);
1515-
assertThat(indexResult.getVersion(), equalTo(1L));
1530+
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
1531+
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
1532+
final Engine.IndexResult v1Result = engine.index(v1Index);
1533+
assertThat(v1Result.getVersion(), equalTo(1L));
15161534

15171535
// apply the first index to the replica, should work fine
1518-
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), 1L,
1519-
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
1520-
indexResult = replicaEngine.index(index);
1521-
assertThat(indexResult.getVersion(), equalTo(1L));
1536+
final Engine.Index replicaV1Index = new Engine.Index(
1537+
newUid("1"),
1538+
doc,
1539+
v1Result.getSeqNo(),
1540+
v1Index.primaryTerm(),
1541+
v1Result.getVersion(),
1542+
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
1543+
REPLICA,
1544+
0,
1545+
-1,
1546+
false);
1547+
Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
1548+
assertThat(replicaV1Result.getVersion(), equalTo(1L));
15221549

15231550
// index it again
1524-
index = new Engine.Index(newUid("1"), doc);
1525-
indexResult = engine.index(index);
1526-
assertThat(indexResult.getVersion(), equalTo(2L));
1551+
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
1552+
final Engine.IndexResult v2Result = engine.index(v2Index);
1553+
assertThat(v2Result.getVersion(), equalTo(2L));
15271554

15281555
// now delete it
1529-
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
1530-
Engine.DeleteResult deleteResult = engine.delete(delete);
1556+
final Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
1557+
final Engine.DeleteResult deleteResult = engine.delete(delete);
15311558
assertThat(deleteResult.getVersion(), equalTo(3L));
15321559

15331560
// apply the delete on the replica (skipping the second index)
1534-
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
1535-
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
1536-
deleteResult = replicaEngine.delete(delete);
1537-
assertThat(deleteResult.getVersion(), equalTo(3L));
1538-
1539-
// second time delete with same version should fail
1540-
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
1541-
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
1542-
deleteResult = replicaEngine.delete(delete);
1543-
assertTrue(deleteResult.hasFailure());
1544-
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));
1545-
1546-
// now do the second index on the replica, it should fail
1547-
index = new Engine.Index(newUid("1"), doc, deleteResult.getSeqNo(), delete.primaryTerm(), 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
1548-
indexResult = replicaEngine.index(index);
1549-
assertTrue(indexResult.hasFailure());
1550-
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
1561+
final Engine.Delete replicaDelete = new Engine.Delete(
1562+
"test",
1563+
"1",
1564+
newUid("1"),
1565+
deleteResult.getSeqNo(),
1566+
delete.primaryTerm(),
1567+
deleteResult.getVersion(),
1568+
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
1569+
REPLICA,
1570+
0);
1571+
final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete);
1572+
assertThat(replicaDeleteResult.getVersion(), equalTo(3L));
1573+
1574+
// second time delete with same version should just produce the same version
1575+
final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete);
1576+
assertFalse(deleteReplayResult.hasFailure());
1577+
assertTrue(deleteReplayResult.isFound());
1578+
assertThat(deleteReplayResult.getVersion(), equalTo(3L));
1579+
1580+
// now do the second index on the replica, it should result in the current version
1581+
final Engine.Index replicaV2Index = new Engine.Index(
1582+
newUid("1"),
1583+
doc,
1584+
v2Result.getSeqNo(),
1585+
v2Index.primaryTerm(),
1586+
v2Result.getVersion(),
1587+
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
1588+
REPLICA,
1589+
0,
1590+
-1,
1591+
false);
1592+
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
1593+
assertFalse(replicaV2Result.hasFailure());
1594+
assertFalse(replicaV2Result.isCreated());
1595+
assertThat(replicaV2Result.getVersion(), equalTo(3L));
15511596
}
15521597

15531598
public void testBasicCreatedFlag() {

0 commit comments

Comments
 (0)