Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,7 +103,11 @@ <E extends Exception> void blockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
delayOperations();
verifyNotClosed();
synchronized (this) {
assert queuedBlockOperations > 0 || delayedOperations.isEmpty();
queuedBlockOperations++;
}
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
onBlocked.run();
} finally {
Expand All @@ -121,9 +125,37 @@ <E extends Exception> void blockOperations(
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
*/
public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit) {
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit) {
verifyNotClosed();
synchronized (this) {
assert queuedBlockOperations > 0 || delayedOperations.isEmpty();
final boolean delayed = queuedBlockOperations > 0;
queuedBlockOperations++;
if (delayed) {
delayedOperations.add(new AsyncBlockDelayedOperation(onAcquired, timeout, timeUnit));
return;
}
}
asyncBlockOperations(onAcquired, timeout, timeUnit, ThreadPool.Names.GENERIC);
}

private void asyncBlockOperations(final ActionListener<Releasable> onAcquired,
final long timeout,
final TimeUnit timeUnit,
final String executor) {
if (Assertions.ENABLED) {
// since queuedBlockOperations is not volatile, we have to synchronize even here for visibility
synchronized (this) {
assert queuedBlockOperations > 0;
}
}
try {
verifyNotClosed();
} catch (final IndexShardClosedException e) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
}
threadPool.executor(executor).execute(new AbstractRunnable() {

final RunOnce released = new RunOnce(() -> releaseDelayedOperations());

Expand All @@ -150,19 +182,15 @@ protected void doRun() throws Exception {
});
}

private void delayOperations() {
private void verifyNotClosed() {
if (closed) {
throw new IndexShardClosedException(shardId);
}
synchronized (this) {
assert queuedBlockOperations > 0 || delayedOperations.isEmpty();
queuedBlockOperations++;
}
}

private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException {
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
// since queuedBlockOperations is not volatile, we have to synchronize even here for visibility
synchronized (this) {
assert queuedBlockOperations > 0;
}
Expand All @@ -187,10 +215,19 @@ private void releaseDelayedOperations() {
queuedActions = new ArrayList<>(delayedOperations);
delayedOperations.clear();
} else {
queuedActions = Collections.emptyList();
queuedActions = new ArrayList<>();
// Execute the next async block operation if there is one
final Optional<DelayedOperation> nextAsyncOperation = delayedOperations.stream()
.filter(op -> op instanceof AsyncBlockDelayedOperation)
.findFirst();
if (nextAsyncOperation.isPresent()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was happy with the simplicity of this change but there is a pitfall here: if a block op sneaks in and terminates it "releases" another async block op.

if (delayedOperations.remove(nextAsyncOperation.get())) {
queuedActions.add(nextAsyncOperation.get());
}
}
}
}
if (!queuedActions.isEmpty()) {
if (queuedActions.isEmpty() == false) {
/*
* Try acquiring permits on fresh thread (for two reasons):
* - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled;
Expand All @@ -202,7 +239,7 @@ private void releaseDelayedOperations() {
*/
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (DelayedOperation queuedAction : queuedActions) {
acquire(queuedAction.listener, null, false, queuedAction.debugInfo, queuedAction.stackTrace);
queuedAction.run();
}
});
}
Expand Down Expand Up @@ -255,7 +292,7 @@ private void acquire(final ActionListener<Releasable> onAcquired, final String e
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
delayedOperations.add(new SyncDelayedOperation(wrappedListener, debugInfo, stackTrace));
return;
} else {
releasable = acquire(debugInfo, stackTrace);
Expand Down Expand Up @@ -326,13 +363,25 @@ List<String> getActiveOperations() {
.collect(Collectors.toList());
}

private static class DelayedOperation {
private final ActionListener<Releasable> listener;
private abstract class DelayedOperation extends AbstractRunnable {
protected final ActionListener<Releasable> listener;

private DelayedOperation(final ActionListener<Releasable> listener) {
this.listener = listener;
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}

private class SyncDelayedOperation extends DelayedOperation {
private final String debugInfo;
private final StackTraceElement[] stackTrace;

private DelayedOperation(ActionListener<Releasable> listener, Object debugInfo, StackTraceElement[] stackTrace) {
this.listener = listener;
private SyncDelayedOperation(ActionListener<Releasable> listener, Object debugInfo, StackTraceElement[] stackTrace) {
super(listener);
if (Assertions.ENABLED) {
this.debugInfo = "[delayed] " + debugInfo;
this.stackTrace = stackTrace;
Expand All @@ -341,6 +390,27 @@ private DelayedOperation(ActionListener<Releasable> listener, Object debugInfo,
this.stackTrace = null;
}
}

@Override
public void doRun() throws Exception {
acquire(listener, null, false, debugInfo, stackTrace);
}
}

private class AsyncBlockDelayedOperation extends DelayedOperation {
private final long timeout;
private final TimeUnit timeUnit;

private AsyncBlockDelayedOperation(ActionListener<Releasable> listener, long timeout, TimeUnit timeUnit) {
super(listener);
this.timeout = timeout;
this.timeUnit = timeUnit;
}

@Override
public void doRun() throws Exception {
asyncBlockOperations(listener, timeout, timeUnit, ThreadPool.Names.SAME);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -48,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -704,6 +706,48 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc
assertThat(permits.getActiveOperations(), emptyIterable());
}

public void testAsyncBlockOperationsAreExecutedInSubmissionOrder() throws Exception {
final int nbOps = scaledRandomIntBetween(10, 64);
final CountDownLatch latch = new CountDownLatch(nbOps);

final AtomicInteger counter = new AtomicInteger(0);
final AtomicArray<Integer> invocations = new AtomicArray<>(nbOps);

try (Releasable ignored = blockAndWait()) {
for (int i = 0; i < nbOps; i++) {
final int operationId = i;
permits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
invocations.setOnce(counter.getAndIncrement(), operationId);
if (rarely()) {
try {
Thread.sleep(scaledRandomIntBetween(1, 100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} finally {
latch.countDown();
}
}

@Override
public void onFailure(final Exception e) {
throw new AssertionError(e);
}
}, 30, TimeUnit.SECONDS);
}
}
latch.await();

for (int i = 0; i < nbOps; i++) {
assertThat("Expected the operation with id [" + i + "] to be executed at place [" + i
+ "] but got [" + invocations.get(i) + "] instead", invocations.get(i), equalTo(i));
}
}

private static ActionListener<Releasable> wrap(final CheckedRunnable<Exception> onResponse) {
return new ActionListener<Releasable>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard
return fut.get();
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testOperationPermitOnReplicaShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
Expand Down Expand Up @@ -1023,7 +1022,6 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down Expand Up @@ -1088,7 +1086,6 @@ public void onFailure(Exception e) {
closeShard(indexShard, false);
}

@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

Expand Down