Skip to content

Commit 23917ed

Browse files
pquentinarteam
authored andcommitted
Add possibility to acquire permits on primary shards with different checks
Since #42241 we check that the shard must be in a primary mode for acquiring a primary permit on it. We would like customize this check and an option to perform different checks before running the `onPermitAcquired` listener. For example, we would to skip the primary mode check when we acquire primary permits during recovering of a hollow indexing shard. See ES-10487
1 parent 385040d commit 23917ed

File tree

2 files changed

+71
-17
lines changed

2 files changed

+71
-17
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@
189189
import static org.elasticsearch.core.Strings.format;
190190
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
191191
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
192+
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.PRIMARY_MODE;
192193

193194
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
194195

@@ -3568,58 +3569,96 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
35683569
);
35693570
}
35703571

3572+
/**
3573+
* Checks to run before running the primary permit operation
3574+
*/
3575+
public enum PrimaryPermitCheck {
3576+
PRIMARY_MODE,
3577+
NONE
3578+
}
3579+
35713580
/**
35723581
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
35733582
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
35743583
* ActionListener will then be called using the provided executor.
3575-
*
35763584
*/
35773585
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
3578-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
3586+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, PRIMARY_MODE);
35793587
}
35803588

35813589
public void acquirePrimaryOperationPermit(
35823590
ActionListener<Releasable> onPermitAcquired,
35833591
Executor executorOnDelay,
35843592
boolean forceExecution
3593+
) {
3594+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, PRIMARY_MODE);
3595+
}
3596+
3597+
public void acquirePrimaryOperationPermit(
3598+
ActionListener<Releasable> onPermitAcquired,
3599+
Executor executorOnDelay,
3600+
boolean forceExecution,
3601+
PrimaryPermitCheck primaryPermitCheck
35853602
) {
35863603
verifyNotClosed();
35873604
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3588-
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
3605+
indexShardOperationPermits.acquire(
3606+
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3607+
executorOnDelay,
3608+
forceExecution
3609+
);
35893610
}
35903611

35913612
public boolean isPrimaryMode() {
35923613
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
35933614
return replicationTracker.isPrimaryMode();
35943615
}
35953616

3617+
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3618+
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, PRIMARY_MODE);
3619+
}
3620+
35963621
/**
35973622
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
35983623
* It is the responsibility of the caller to close the {@link Releasable}.
35993624
*/
3600-
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3625+
public void acquireAllPrimaryOperationsPermits(
3626+
final ActionListener<Releasable> onPermitAcquired,
3627+
final TimeValue timeout,
3628+
final PrimaryPermitCheck primaryPermitCheck
3629+
) {
36013630
verifyNotClosed();
36023631
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
36033632

3604-
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
3633+
asyncBlockOperations(
3634+
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3635+
timeout.duration(),
3636+
timeout.timeUnit()
3637+
);
36053638
}
36063639

36073640
/**
3608-
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
3609-
* executing the action.
3641+
* Wraps the action to run on a primary after acquiring permit.
36103642
*
3643+
* @param primaryPermitCheck check to run before the primary mode operation
36113644
* @param listener the listener to wrap
36123645
* @return the wrapped listener
36133646
*/
3614-
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
3615-
return listener.delegateFailure((l, r) -> {
3616-
if (isPrimaryMode()) {
3617-
l.onResponse(r);
3618-
} else {
3619-
r.close();
3620-
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3621-
}
3622-
});
3647+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
3648+
final PrimaryPermitCheck primaryPermitCheck,
3649+
final ActionListener<Releasable> listener
3650+
) {
3651+
return switch (primaryPermitCheck) {
3652+
case PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
3653+
if (isPrimaryMode()) {
3654+
l.onResponse(r);
3655+
} else {
3656+
r.close();
3657+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3658+
}
3659+
});
3660+
case NONE -> listener;
3661+
};
36233662
}
36243663

36253664
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
@@ -3657,7 +3696,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
36573696
runnable.run();
36583697
}
36593698
}, onFailure);
3660-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
3699+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, PRIMARY_MODE);
36613700
}
36623701

36633702
private <E extends Exception> void bumpPrimaryTerm(

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,21 @@ public void onFailure(final Exception e) {
790790
}
791791
}, TimeValue.timeValueSeconds(30));
792792
latch.await();
793+
794+
// Possible to acquire permits if skip the primary mode check
795+
var permitAcquiredLatch = new CountDownLatch(1);
796+
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
797+
r.close();
798+
permitAcquiredLatch.countDown();
799+
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
800+
safeAwait(permitAcquiredLatch);
801+
802+
var allPermitsAcquiredLatch = new CountDownLatch(1);
803+
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
804+
r.close();
805+
allPermitsAcquiredLatch.countDown();
806+
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
807+
safeAwait(allPermitsAcquiredLatch);
793808
}
794809

795810
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {

0 commit comments

Comments
 (0)