Skip to content

Commit 8ae61c0

Browse files
authored
Update global checkpoint when increasing primary term on replica (#25422)
When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync. Relates to #25355, #10708
1 parent dd6751d commit 8ae61c0

File tree

6 files changed

+135
-58
lines changed

6 files changed

+135
-58
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima
183183

184184
/**
185185
* Synchronously execute the specified replica operation. This is done under a permit from
186-
* {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
186+
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
187187
*
188188
* @param shardRequest the request to the replica shard
189189
* @param replica the replica shard to perform the operation on
@@ -521,7 +521,6 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
521521
@Override
522522
public void onResponse(Releasable releasable) {
523523
try {
524-
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
525524
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
526525
releasable.close(); // release shard operation lock before responding to caller
527526
final TransportReplicationAction.ReplicaResponse response =
@@ -596,7 +595,7 @@ protected void doRun() throws Exception {
596595
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
597596
actualAllocationId);
598597
}
599-
replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
598+
replica.acquireReplicaOperationPermit(request.primaryTerm, globalCheckpoint, this, executor);
600599
}
601600

602601
/**

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,29 +2031,47 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
20312031
* name.
20322032
*
20332033
* @param operationPrimaryTerm the operation primary term
2034+
* @param globalCheckpoint the global checkpoint associated with the request
20342035
* @param onPermitAcquired the listener for permit acquisition
20352036
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
20362037
*/
2037-
public void acquireReplicaOperationPermit(
2038-
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
2038+
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
2039+
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
20392040
verifyNotClosed();
20402041
verifyReplicationTarget();
2042+
final boolean globalCheckpointUpdated;
20412043
if (operationPrimaryTerm > primaryTerm) {
20422044
synchronized (primaryTermMutex) {
20432045
if (operationPrimaryTerm > primaryTerm) {
2046+
IndexShardState shardState = state();
2047+
// only roll translog and update primary term if shard has made it past recovery
2048+
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
2049+
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
2050+
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
2051+
if (shardState != IndexShardState.POST_RECOVERY &&
2052+
shardState != IndexShardState.STARTED &&
2053+
shardState != IndexShardState.RELOCATED) {
2054+
throw new IndexShardNotStartedException(shardId, shardState);
2055+
}
20442056
try {
20452057
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
20462058
assert operationPrimaryTerm > primaryTerm :
20472059
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
20482060
primaryTerm = operationPrimaryTerm;
2061+
updateGlobalCheckpointOnReplica(globalCheckpoint);
20492062
getEngine().getTranslog().rollGeneration();
20502063
});
2064+
globalCheckpointUpdated = true;
20512065
} catch (final Exception e) {
20522066
onPermitAcquired.onFailure(e);
20532067
return;
20542068
}
2069+
} else {
2070+
globalCheckpointUpdated = false;
20552071
}
20562072
}
2073+
} else {
2074+
globalCheckpointUpdated = false;
20572075
}
20582076

20592077
assert operationPrimaryTerm <= primaryTerm
@@ -2072,6 +2090,15 @@ public void onResponse(final Releasable releasable) {
20722090
primaryTerm);
20732091
onPermitAcquired.onFailure(new IllegalStateException(message));
20742092
} else {
2093+
if (globalCheckpointUpdated == false) {
2094+
try {
2095+
updateGlobalCheckpointOnReplica(globalCheckpoint);
2096+
} catch (Exception e) {
2097+
releasable.close();
2098+
onPermitAcquired.onFailure(e);
2099+
return;
2100+
}
2101+
}
20752102
onPermitAcquired.onResponse(releasable);
20762103
}
20772104
}

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
11611161
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
11621162
doAnswer(invocation -> {
11631163
long term = (Long)invocation.getArguments()[0];
1164-
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
1164+
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
11651165
final long primaryTerm = indexShard.getPrimaryTerm();
11661166
if (term < primaryTerm) {
11671167
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
@@ -1170,7 +1170,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
11701170
count.incrementAndGet();
11711171
callback.onResponse(count::decrementAndGet);
11721172
return null;
1173-
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
1173+
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
11741174
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
11751175
final ClusterState state = clusterService.state();
11761176
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
456456
count.incrementAndGet();
457457
callback.onResponse(count::decrementAndGet);
458458
return null;
459-
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
459+
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
460460
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
461461
final ClusterState state = clusterService.state();
462462
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,11 +518,11 @@ public void performOn(
518518
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
519519
replica.acquireReplicaOperationPermit(
520520
request.primaryTerm(),
521+
globalCheckpoint,
521522
new ActionListener<Releasable>() {
522523
@Override
523524
public void onResponse(Releasable releasable) {
524525
try {
525-
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
526526
performOnReplica(request, replica);
527527
releasable.close();
528528
listener.onResponse(

0 commit comments

Comments
 (0)