Skip to content
29 changes: 22 additions & 7 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -332,12 +333,14 @@ public long getPrimaryTerm() {
}

/**
* notifies the shard of an increase in the primary term
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
*/
public void updatePrimaryTerm(final long newTerm) {
public void updatePrimaryTerm(final long newPrimaryTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newTerm != primaryTerm) {
if (newPrimaryTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
// in one state causing it's term to be incremented. Note that if both current shard state and new
// shard state are initializing, we could replace the current shard and reinitialize it. It is however
Expand All @@ -354,10 +357,22 @@ public void updatePrimaryTerm(final long newTerm) {
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm;
+ "new term [" + newPrimaryTerm + "]";
assert newPrimaryTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
/*
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
latch::await,
e -> failShard("exception during primary term transition", e));
primaryTerm = newPrimaryTerm;
latch.countDown();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -36,20 +38,35 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition
* between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain
* and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has
* completed.
*/
final class IndexShardOperationPermits implements Closeable {

private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;

private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this

IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
/**
* Construct operation permits for the specified shards.
*
* @param shardId the shard
* @param logger the logger for the shard
* @param threadPool the thread pool (used to execute delayed operations)
*/
IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) {
this.shardId = shardId;
this.logger = logger;
this.threadPool = threadPool;
Expand All @@ -61,99 +78,170 @@ public void close() {
}

/**
* Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues
* operations that are occurring in the meanwhile and runs them once onBlocked has executed.
* Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
* operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
*
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @param <E> the type of checked exception thrown by {@code onBlocked}
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation permit has been closed
*/
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
InterruptedException, TimeoutException, E {
<E extends Exception> void blockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (closed) {
throw new IndexShardClosedException(shardId);
}
delayOperations();
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
doBlockOperations(timeout, timeUnit, onBlocked);
} finally {
releaseDelayedOperations();
}
}

/**
* Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked}
* under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After
* operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking
* operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked.
*
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param onBlocked the action to run once the block has been acquired
* @param onFailure the action to run if a failure occurs while blocking operations
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
*/
<E extends Exception> void asyncBlockOperations(
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
}

@Override
protected void doRun() throws Exception {
doBlockOperations(timeout, timeUnit, onBlocked);
}

@Override
public void onAfter() {
releaseDelayedOperations();
}
});
}

private void delayOperations() {
synchronized (this) {
if (delayed) {
throw new IllegalStateException("operations are already delayed");
} else {
throw new TimeoutException("timed out during blockOperations");
assert delayedOperations.isEmpty();
delayed = true;
}
} finally {
final List<ActionListener<Releasable>> queuedActions;
}
}

private <E extends Exception> void doBlockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
synchronized (this) {
queuedActions = delayedOperations;
delayedOperations = null;
assert delayed;
}
if (queuedActions != null) {
// Try acquiring permits on fresh thread (for two reasons):
// - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
// handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
}
});
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
} else {
throw new TimeoutException("timeout while blocking operations");
}
}

private void releaseDelayedOperations() {
final List<ActionListener<Releasable>> queuedActions;
synchronized (this) {
assert delayed;
queuedActions = new ArrayList<>(delayedOperations);
delayedOperations.clear();
delayed = false;
}
if (!queuedActions.isEmpty()) {
/*
* Try acquiring permits on fresh thread (for two reasons):
* - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled;
* interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by
* the threaded action listener if the queue of the thread pool on which it submits is full
* - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the
* onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the
* recovery
*/
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
}
});
}
}

/**
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will
* then be called using the provided executor once operations are no longer blocked.
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
* will then be called using the provided executor once operations are no longer blocked.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
if (closed) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
}
Releasable releasable;
final Releasable releasable;
try {
synchronized (this) {
releasable = tryAcquire();
if (releasable == null) {
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
if (delayedOperations == null) {
delayedOperations = new ArrayList<>();
}
if (delayed) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
if (executorOnDelay != null) {
delayedOperations.add(
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
return;
} else {
releasable = tryAcquire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just make this be called acquire and make it never return null (but blow up if it can't acquire (as it is not non-delayed)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will do that in a follow-up immediately after this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened the follow-up: #24971

assert releasable != null;
}
}
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
onAcquired.onFailure(e);
return;
}
// execute this outside the synchronized block!
onAcquired.onResponse(releasable);
}

@Nullable private Releasable tryAcquire() throws InterruptedException {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
AtomicBoolean closed = new AtomicBoolean();
@Nullable
private Releasable tryAcquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
if (closed.compareAndSet(false, true)) {
semaphore.release(1);
Expand All @@ -163,13 +251,23 @@ public void acquire(ActionListener<Releasable> onAcquired, String executorOnDela
return null;
}

public int getActiveOperationsCount() {
/**
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
*
* @return the active operation count, or zero when all permits ar eheld
*/
int getActiveOperationsCount() {
int availablePermits = semaphore.availablePermits();
if (availablePermits == 0) {
// when blockOperations is holding all permits
/*
* This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the
* remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that
* the active operations count is zero.
*/
return 0;
} else {
return TOTAL_PERMITS - availablePermits;
}
}

}
Loading