Skip to content

Commit 0d60e8a

Browse files
authored
Fix race between replica reset and primary promotion (#32442)
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This commit fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block. Relates to #32431, #32304 and #32118
1 parent 9dcf3f5 commit 0d60e8a

28 files changed

+471
-256
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
144144
switch (indexResult.getResultType()) {
145145
case SUCCESS:
146146
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
147-
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
147+
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
148148
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
149149
case FAILURE:
150150
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
@@ -161,7 +161,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
161161
switch (deleteResult.getResultType()) {
162162
case SUCCESS:
163163
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
164-
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
164+
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
165165
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
166166
case FAILURE:
167167
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
@@ -300,7 +300,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
300300
assert result instanceof Engine.IndexResult : result.getClass();
301301
final IndexRequest updateIndexRequest = translate.action();
302302
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
303-
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
303+
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
304304
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
305305
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
306306
indexResponse.getResult());
@@ -320,7 +320,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
320320
final DeleteRequest updateDeleteRequest = translate.action();
321321

322322
final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
323-
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
323+
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
324324

325325
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
326326
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
@@ -356,7 +356,7 @@ static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest
356356
} catch (Exception failure) {
357357
// we may fail translating a update to index or delete operation
358358
// we use index result to communicate failure while translating update request
359-
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
359+
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
360360
return new BulkItemResultHolder(null, result, primaryItemRequest);
361361
}
362362

@@ -559,15 +559,15 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
559559
() ->
560560
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
561561
request.getAutoGeneratedTimestamp(), request.isRetry()),
562-
e -> new Engine.IndexResult(e, request.version()),
562+
e -> primary.getFailedIndexResult(e, request.version()),
563563
mappingUpdater);
564564
}
565565

566566
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
567567
MappingUpdatePerformer mappingUpdater) throws Exception {
568568
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(),
569569
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
570-
e -> new Engine.DeleteResult(e, request.version()),
570+
e -> primary.getFailedDeleteResult(e, request.version()),
571571
mappingUpdater);
572572
}
573573

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
929929
if (actualAllocationId.equals(allocationId) == false) {
930930
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
931931
}
932-
final long actualTerm = indexShard.getPrimaryTerm();
932+
final long actualTerm = indexShard.getPendingPrimaryTerm();
933933
if (actualTerm != primaryTerm) {
934934
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
935935
primaryTerm, actualTerm);
@@ -983,7 +983,7 @@ class PrimaryShardReference extends ShardReference
983983
}
984984

985985
public boolean isRelocated() {
986-
return indexShard.isPrimaryMode() == false;
986+
return indexShard.isRelocatedPrimary();
987987
}
988988

989989
@Override

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -304,26 +304,29 @@ public abstract static class Result {
304304
private final Operation.TYPE operationType;
305305
private final Result.Type resultType;
306306
private final long version;
307+
private final long term;
307308
private final long seqNo;
308309
private final Exception failure;
309310
private final SetOnce<Boolean> freeze = new SetOnce<>();
310311
private final Mapping requiredMappingUpdate;
311312
private Translog.Location translogLocation;
312313
private long took;
313314

314-
protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
315+
protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
315316
this.operationType = operationType;
316317
this.failure = Objects.requireNonNull(failure);
317318
this.version = version;
319+
this.term = term;
318320
this.seqNo = seqNo;
319321
this.requiredMappingUpdate = null;
320322
this.resultType = Type.FAILURE;
321323
}
322324

323-
protected Result(Operation.TYPE operationType, long version, long seqNo) {
325+
protected Result(Operation.TYPE operationType, long version, long term, long seqNo) {
324326
this.operationType = operationType;
325327
this.version = version;
326328
this.seqNo = seqNo;
329+
this.term = term;
327330
this.failure = null;
328331
this.requiredMappingUpdate = null;
329332
this.resultType = Type.SUCCESS;
@@ -333,6 +336,7 @@ protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
333336
this.operationType = operationType;
334337
this.version = Versions.NOT_FOUND;
335338
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
339+
this.term = 0L;
336340
this.failure = null;
337341
this.requiredMappingUpdate = requiredMappingUpdate;
338342
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
@@ -357,6 +361,10 @@ public long getSeqNo() {
357361
return seqNo;
358362
}
359363

364+
public long getTerm() {
365+
return term;
366+
}
367+
360368
/**
361369
* If the operation was aborted due to missing mappings, this method will return the mappings
362370
* that are required to complete the operation.
@@ -415,20 +423,20 @@ public static class IndexResult extends Result {
415423

416424
private final boolean created;
417425

418-
public IndexResult(long version, long seqNo, boolean created) {
419-
super(Operation.TYPE.INDEX, version, seqNo);
426+
public IndexResult(long version, long term, long seqNo, boolean created) {
427+
super(Operation.TYPE.INDEX, version, term, seqNo);
420428
this.created = created;
421429
}
422430

423431
/**
424432
* use in case of the index operation failed before getting to internal engine
425433
**/
426-
public IndexResult(Exception failure, long version) {
427-
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
434+
public IndexResult(Exception failure, long version, long term) {
435+
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
428436
}
429437

430-
public IndexResult(Exception failure, long version, long seqNo) {
431-
super(Operation.TYPE.INDEX, failure, version, seqNo);
438+
public IndexResult(Exception failure, long version, long term, long seqNo) {
439+
super(Operation.TYPE.INDEX, failure, version, term, seqNo);
432440
this.created = false;
433441
}
434442

@@ -447,20 +455,20 @@ public static class DeleteResult extends Result {
447455

448456
private final boolean found;
449457

450-
public DeleteResult(long version, long seqNo, boolean found) {
451-
super(Operation.TYPE.DELETE, version, seqNo);
458+
public DeleteResult(long version, long term, long seqNo, boolean found) {
459+
super(Operation.TYPE.DELETE, version, term, seqNo);
452460
this.found = found;
453461
}
454462

455463
/**
456464
* use in case of the delete operation failed before getting to internal engine
457465
**/
458-
public DeleteResult(Exception failure, long version) {
459-
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
466+
public DeleteResult(Exception failure, long version, long term) {
467+
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
460468
}
461469

462-
public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
463-
super(Operation.TYPE.DELETE, failure, version, seqNo);
470+
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
471+
super(Operation.TYPE.DELETE, failure, version, term, seqNo);
464472
this.found = found;
465473
}
466474

@@ -477,12 +485,12 @@ public boolean isFound() {
477485

478486
public static class NoOpResult extends Result {
479487

480-
NoOpResult(long seqNo) {
481-
super(Operation.TYPE.NO_OP, 0, seqNo);
488+
NoOpResult(long term, long seqNo) {
489+
super(Operation.TYPE.NO_OP, term, 0, seqNo);
482490
}
483491

484-
NoOpResult(long seqNo, Exception failure) {
485-
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
492+
NoOpResult(long term, long seqNo, Exception failure) {
493+
super(Operation.TYPE.NO_OP, failure, term, 0, seqNo);
486494
}
487495

488496
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,10 @@ protected long doGenerateSeqNoForOperation(final Operation operation) {
736736
return localCheckpointTracker.generateSeqNo();
737737
}
738738

739+
private long getPrimaryTerm() {
740+
return engineConfig.getPrimaryTermSupplier().getAsLong();
741+
}
742+
739743
@Override
740744
public IndexResult index(Index index) throws IOException {
741745
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
@@ -788,7 +792,7 @@ public IndexResult index(Index index) throws IOException {
788792
indexResult = indexIntoLucene(index, plan);
789793
} else {
790794
indexResult = new IndexResult(
791-
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
795+
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
792796
}
793797
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
794798
final Translog.Location location;
@@ -900,7 +904,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
900904
currentVersion, index.version(), currentNotFoundOrDeleted)) {
901905
final VersionConflictEngineException e =
902906
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
903-
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
907+
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
904908
} else {
905909
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
906910
generateSeqNoForOperation(index),
@@ -930,7 +934,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
930934
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
931935
addDocs(index.docs(), indexWriter);
932936
}
933-
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
937+
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
934938
} catch (Exception ex) {
935939
if (indexWriter.getTragicException() == null) {
936940
/* There is no tragic event recorded so this must be a document failure.
@@ -946,7 +950,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
946950
* we return a `MATCH_ANY` version to indicate no document was index. The value is
947951
* not used anyway
948952
*/
949-
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
953+
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
950954
} else {
951955
throw ex;
952956
}
@@ -1019,8 +1023,8 @@ static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
10191023
}
10201024

10211025
static IndexingStrategy skipDueToVersionConflict(
1022-
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
1023-
final IndexResult result = new IndexResult(e, currentVersion);
1026+
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
1027+
final IndexResult result = new IndexResult(e, currentVersion, term);
10241028
return new IndexingStrategy(
10251029
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
10261030
}
@@ -1097,7 +1101,7 @@ public DeleteResult delete(Delete delete) throws IOException {
10971101
deleteResult = deleteInLucene(delete, plan);
10981102
} else {
10991103
deleteResult = new DeleteResult(
1100-
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1104+
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
11011105
}
11021106
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
11031107
final Translog.Location location;
@@ -1178,7 +1182,7 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
11781182
final DeletionStrategy plan;
11791183
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
11801184
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
1181-
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
1185+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
11821186
} else {
11831187
plan = DeletionStrategy.processNormally(
11841188
currentlyDeleted,
@@ -1201,12 +1205,12 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
12011205
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
12021206
engineConfig.getThreadPool().relativeTimeInMillis()));
12031207
return new DeleteResult(
1204-
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1208+
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
12051209
} catch (Exception ex) {
12061210
if (indexWriter.getTragicException() == null) {
12071211
// there is no tragic event and such it must be a document level failure
12081212
return new DeleteResult(
1209-
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1213+
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
12101214
} else {
12111215
throw ex;
12121216
}
@@ -1237,9 +1241,9 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
12371241
}
12381242

12391243
static DeletionStrategy skipDueToVersionConflict(
1240-
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
1244+
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
12411245
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
1242-
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
1246+
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
12431247
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
12441248
}
12451249

@@ -1268,7 +1272,7 @@ public NoOpResult noOp(final NoOp noOp) {
12681272
try (ReleasableLock ignored = readLock.acquire()) {
12691273
noOpResult = innerNoOp(noOp);
12701274
} catch (final Exception e) {
1271-
noOpResult = new NoOpResult(noOp.seqNo(), e);
1275+
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
12721276
}
12731277
return noOpResult;
12741278
}
@@ -1278,7 +1282,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
12781282
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
12791283
final long seqNo = noOp.seqNo();
12801284
try {
1281-
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
1285+
final NoOpResult noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
12821286
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
12831287
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
12841288
noOpResult.setTranslogLocation(location);

0 commit comments

Comments
 (0)