Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -521,7 +521,6 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
@Override
public void onResponse(Releasable releasable) {
try {
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
Expand Down Expand Up @@ -596,7 +595,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
replica.acquireReplicaOperationPermit(request.primaryTerm, globalCheckpoint, this, executor);
}

/**
Expand Down
31 changes: 29 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2031,29 +2031,47 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
* name.
*
* @param operationPrimaryTerm the operation primary term
* @param globalCheckpoint the global checkpoint associated with the request
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/
public void acquireReplicaOperationPermit(
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed();
verifyReplicationTarget();
final boolean globalCheckpointUpdated;
if (operationPrimaryTerm > primaryTerm) {
synchronized (primaryTermMutex) {
if (operationPrimaryTerm > primaryTerm) {
IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
if (shardState != IndexShardState.POST_RECOVERY &&
shardState != IndexShardState.STARTED &&
shardState != IndexShardState.RELOCATED) {
throw new IndexShardNotStartedException(shardId, shardState);
}
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;
} catch (final Exception e) {
onPermitAcquired.onFailure(e);
return;
}
} else {
globalCheckpointUpdated = false;
}
}
} else {
globalCheckpointUpdated = false;
}

assert operationPrimaryTerm <= primaryTerm
Expand All @@ -2072,6 +2090,15 @@ public void onResponse(final Releasable releasable) {
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
if (globalCheckpointUpdated == false) {
try {
updateGlobalCheckpointOnReplica(globalCheckpoint);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
}
onPermitAcquired.onResponse(releasable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
final long primaryTerm = indexShard.getPrimaryTerm();
if (term < primaryTerm) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
Expand All @@ -1170,7 +1170,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,11 @@ public void performOn(
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.acquireReplicaOperationPermit(
request.primaryTerm(),
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try {
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
performOnReplica(request, replica);
releasable.close();
listener.onResponse(
Expand Down
Loading