Skip to content

Commit 7979fda

Browse files
committed
Fix race between replica reset and primary promotion (#32442)
We've recently seen a number of test failures that tripped an assertion in IndexShard (see issues linked below), leading to the discovery of a race between resetting a replica when it learns about a higher term and when the same replica is promoted to primary. This commit fixes the race by distinguishing between a cluster state primary term (called pendingPrimaryTerm) and a shard-level operation term. The former is set during the cluster state update or when a replica learns about a new primary. The latter is only incremented under the operation block, which can happen in a delayed fashion. It also solves the issue where a replica that's still adjusting to the new term receives a cluster state update that promotes it to primary, which can happen in the situation of multiple nodes being shut down in short succession. In that case, the cluster state update thread would call `asyncBlockOperations` in `updateShardState`, which in turn would throw an exception as blocking permits is not allowed while an ongoing block is in place, subsequently failing the shard. This commit therefore extends the IndexShardOperationPermits to allow it to queue multiple blocks (which will all take precedence over operations acquiring permits). Finally, it also moves the primary activation of the replication tracker under the operation block, so that the actual transition to primary only happens under the operation block. Relates to #32431, #32304 and #32118
1 parent d56de98 commit 7979fda

28 files changed

+470
-255
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
142142
switch (indexResult.getResultType()) {
143143
case SUCCESS:
144144
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
145-
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
145+
indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated());
146146
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
147147
case FAILURE:
148148
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
@@ -159,7 +159,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
159159
switch (deleteResult.getResultType()) {
160160
case SUCCESS:
161161
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
162-
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
162+
deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound());
163163
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
164164
case FAILURE:
165165
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
@@ -298,7 +298,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
298298
assert result instanceof Engine.IndexResult : result.getClass();
299299
final IndexRequest updateIndexRequest = translate.action();
300300
final IndexResponse indexResponse = new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(),
301-
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
301+
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.IndexResult) result).isCreated());
302302
updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(),
303303
indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(),
304304
indexResponse.getResult());
@@ -319,7 +319,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
319319
final DeleteRequest updateDeleteRequest = translate.action();
320320

321321
final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(),
322-
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
322+
result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());
323323

324324
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
325325
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
@@ -355,7 +355,7 @@ static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest
355355
} catch (Exception failure) {
356356
// we may fail translating a update to index or delete operation
357357
// we use index result to communicate failure while translating update request
358-
final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO);
358+
final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version());
359359
return new BulkItemResultHolder(null, result, primaryItemRequest);
360360
}
361361

@@ -559,15 +559,15 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
559559
() ->
560560
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
561561
request.getAutoGeneratedTimestamp(), request.isRetry()),
562-
e -> new Engine.IndexResult(e, request.version()),
562+
e -> primary.getFailedIndexResult(e, request.version()),
563563
mappingUpdater);
564564
}
565565

566566
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
567567
MappingUpdatePerformer mappingUpdater) throws Exception {
568568
return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(),
569569
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
570-
e -> new Engine.DeleteResult(e, request.version()),
570+
e -> primary.getFailedDeleteResult(e, request.version()),
571571
mappingUpdater);
572572
}
573573

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
946946
if (actualAllocationId.equals(allocationId) == false) {
947947
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
948948
}
949-
final long actualTerm = indexShard.getPrimaryTerm();
949+
final long actualTerm = indexShard.getPendingPrimaryTerm();
950950
if (actualTerm != primaryTerm) {
951951
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
952952
primaryTerm, actualTerm);
@@ -1000,7 +1000,7 @@ class PrimaryShardReference extends ShardReference
10001000
}
10011001

10021002
public boolean isRelocated() {
1003-
return indexShard.isPrimaryMode() == false;
1003+
return indexShard.isRelocatedPrimary();
10041004
}
10051005

10061006
@Override

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -304,26 +304,29 @@ public abstract static class Result {
304304
private final Operation.TYPE operationType;
305305
private final Result.Type resultType;
306306
private final long version;
307+
private final long term;
307308
private final long seqNo;
308309
private final Exception failure;
309310
private final SetOnce<Boolean> freeze = new SetOnce<>();
310311
private final Mapping requiredMappingUpdate;
311312
private Translog.Location translogLocation;
312313
private long took;
313314

314-
protected Result(Operation.TYPE operationType, Exception failure, long version, long seqNo) {
315+
protected Result(Operation.TYPE operationType, Exception failure, long version, long term, long seqNo) {
315316
this.operationType = operationType;
316317
this.failure = Objects.requireNonNull(failure);
317318
this.version = version;
319+
this.term = term;
318320
this.seqNo = seqNo;
319321
this.requiredMappingUpdate = null;
320322
this.resultType = Type.FAILURE;
321323
}
322324

323-
protected Result(Operation.TYPE operationType, long version, long seqNo) {
325+
protected Result(Operation.TYPE operationType, long version, long term, long seqNo) {
324326
this.operationType = operationType;
325327
this.version = version;
326328
this.seqNo = seqNo;
329+
this.term = term;
327330
this.failure = null;
328331
this.requiredMappingUpdate = null;
329332
this.resultType = Type.SUCCESS;
@@ -333,6 +336,7 @@ protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate) {
333336
this.operationType = operationType;
334337
this.version = Versions.NOT_FOUND;
335338
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
339+
this.term = 0L;
336340
this.failure = null;
337341
this.requiredMappingUpdate = requiredMappingUpdate;
338342
this.resultType = Type.MAPPING_UPDATE_REQUIRED;
@@ -357,6 +361,10 @@ public long getSeqNo() {
357361
return seqNo;
358362
}
359363

364+
public long getTerm() {
365+
return term;
366+
}
367+
360368
/**
361369
* If the operation was aborted due to missing mappings, this method will return the mappings
362370
* that are required to complete the operation.
@@ -415,20 +423,20 @@ public static class IndexResult extends Result {
415423

416424
private final boolean created;
417425

418-
public IndexResult(long version, long seqNo, boolean created) {
419-
super(Operation.TYPE.INDEX, version, seqNo);
426+
public IndexResult(long version, long term, long seqNo, boolean created) {
427+
super(Operation.TYPE.INDEX, version, term, seqNo);
420428
this.created = created;
421429
}
422430

423431
/**
424432
* use in case of the index operation failed before getting to internal engine
425433
**/
426-
public IndexResult(Exception failure, long version) {
427-
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO);
434+
public IndexResult(Exception failure, long version, long term) {
435+
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO);
428436
}
429437

430-
public IndexResult(Exception failure, long version, long seqNo) {
431-
super(Operation.TYPE.INDEX, failure, version, seqNo);
438+
public IndexResult(Exception failure, long version, long term, long seqNo) {
439+
super(Operation.TYPE.INDEX, failure, version, term, seqNo);
432440
this.created = false;
433441
}
434442

@@ -447,20 +455,20 @@ public static class DeleteResult extends Result {
447455

448456
private final boolean found;
449457

450-
public DeleteResult(long version, long seqNo, boolean found) {
451-
super(Operation.TYPE.DELETE, version, seqNo);
458+
public DeleteResult(long version, long term, long seqNo, boolean found) {
459+
super(Operation.TYPE.DELETE, version, term, seqNo);
452460
this.found = found;
453461
}
454462

455463
/**
456464
* use in case of the delete operation failed before getting to internal engine
457465
**/
458-
public DeleteResult(Exception failure, long version) {
459-
this(failure, version, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
466+
public DeleteResult(Exception failure, long version, long term) {
467+
this(failure, version, term, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
460468
}
461469

462-
public DeleteResult(Exception failure, long version, long seqNo, boolean found) {
463-
super(Operation.TYPE.DELETE, failure, version, seqNo);
470+
public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) {
471+
super(Operation.TYPE.DELETE, failure, version, term, seqNo);
464472
this.found = found;
465473
}
466474

@@ -477,12 +485,12 @@ public boolean isFound() {
477485

478486
public static class NoOpResult extends Result {
479487

480-
NoOpResult(long seqNo) {
481-
super(Operation.TYPE.NO_OP, 0, seqNo);
488+
NoOpResult(long term, long seqNo) {
489+
super(Operation.TYPE.NO_OP, term, 0, seqNo);
482490
}
483491

484-
NoOpResult(long seqNo, Exception failure) {
485-
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
492+
NoOpResult(long term, long seqNo, Exception failure) {
493+
super(Operation.TYPE.NO_OP, failure, term, 0, seqNo);
486494
}
487495

488496
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,10 @@ protected long doGenerateSeqNoForOperation(final Operation operation) {
789789
return localCheckpointTracker.generateSeqNo();
790790
}
791791

792+
private long getPrimaryTerm() {
793+
return engineConfig.getPrimaryTermSupplier().getAsLong();
794+
}
795+
792796
@Override
793797
public IndexResult index(Index index) throws IOException {
794798
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
@@ -842,7 +846,7 @@ public IndexResult index(Index index) throws IOException {
842846
indexResult = indexIntoLucene(index, plan);
843847
} else {
844848
indexResult = new IndexResult(
845-
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
849+
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
846850
}
847851
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
848852
final Translog.Location location;
@@ -963,7 +967,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
963967
currentVersion, index.version(), currentNotFoundOrDeleted)) {
964968
final VersionConflictEngineException e =
965969
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
966-
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
970+
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm());
967971
} else {
968972
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
969973
generateSeqNoForOperation(index),
@@ -993,7 +997,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
993997
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
994998
addDocs(index.docs(), indexWriter);
995999
}
996-
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
1000+
return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
9971001
} catch (Exception ex) {
9981002
if (indexWriter.getTragicException() == null) {
9991003
/* There is no tragic event recorded so this must be a document failure.
@@ -1009,7 +1013,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10091013
* we return a `MATCH_ANY` version to indicate no document was index. The value is
10101014
* not used anyway
10111015
*/
1012-
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
1016+
return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing);
10131017
} else {
10141018
throw ex;
10151019
}
@@ -1082,8 +1086,8 @@ static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
10821086
}
10831087

10841088
static IndexingStrategy skipDueToVersionConflict(
1085-
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
1086-
final IndexResult result = new IndexResult(e, currentVersion);
1089+
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
1090+
final IndexResult result = new IndexResult(e, currentVersion, term);
10871091
return new IndexingStrategy(
10881092
currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result);
10891093
}
@@ -1161,7 +1165,7 @@ public DeleteResult delete(Delete delete) throws IOException {
11611165
deleteResult = deleteInLucene(delete, plan);
11621166
} else {
11631167
deleteResult = new DeleteResult(
1164-
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1168+
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
11651169
}
11661170
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
11671171
final Translog.Location location;
@@ -1250,7 +1254,7 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
12501254
final DeletionStrategy plan;
12511255
if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
12521256
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
1253-
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
1257+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted);
12541258
} else {
12551259
plan = DeletionStrategy.processNormally(
12561260
currentlyDeleted,
@@ -1273,12 +1277,12 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
12731277
new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(),
12741278
engineConfig.getThreadPool().relativeTimeInMillis()));
12751279
return new DeleteResult(
1276-
plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1280+
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
12771281
} catch (Exception ex) {
12781282
if (indexWriter.getTragicException() == null) {
12791283
// there is no tragic event and such it must be a document level failure
12801284
return new DeleteResult(
1281-
ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
1285+
ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
12821286
} else {
12831287
throw ex;
12841288
}
@@ -1309,9 +1313,9 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted,
13091313
}
13101314

13111315
static DeletionStrategy skipDueToVersionConflict(
1312-
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
1316+
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
13131317
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
1314-
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false);
1318+
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);
13151319
return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult);
13161320
}
13171321

@@ -1340,7 +1344,7 @@ public NoOpResult noOp(final NoOp noOp) {
13401344
try (ReleasableLock ignored = readLock.acquire()) {
13411345
noOpResult = innerNoOp(noOp);
13421346
} catch (final Exception e) {
1343-
noOpResult = new NoOpResult(noOp.seqNo(), e);
1347+
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
13441348
}
13451349
return noOpResult;
13461350
}
@@ -1350,7 +1354,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
13501354
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
13511355
final long seqNo = noOp.seqNo();
13521356
try {
1353-
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
1357+
final NoOpResult noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
13541358
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
13551359
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
13561360
noOpResult.setTranslogLocation(location);

0 commit comments

Comments
 (0)