From 976dcb03e7f9de722a054e2deb239a984c9dc4fd Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 1/2] Allow asynchronous block operations to be delayed --- .../shard/IndexShardOperationPermits.java | 108 +++++++++++++++--- 1 file changed, 89 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index d5d0d7f3e9753..ffc32576e72fb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -34,9 +34,9 @@ import java.io.Closeable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -103,7 +103,11 @@ void blockOperations( final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { - delayOperations(); + verifyNotClosed(); + synchronized (this) { + assert queuedBlockOperations > 0 || delayedOperations.isEmpty(); + queuedBlockOperations++; + } try (Releasable ignored = acquireAll(timeout, timeUnit)) { onBlocked.run(); } finally { @@ -121,9 +125,37 @@ void blockOperations( * @param timeout the maximum time to wait for the in-flight operations block * @param timeUnit the time unit of the {@code timeout} argument */ - public void asyncBlockOperations(final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit) { - delayOperations(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + public void asyncBlockOperations(final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit) { + verifyNotClosed(); + synchronized (this) { + assert queuedBlockOperations > 0 || delayedOperations.isEmpty(); + final boolean delayed = queuedBlockOperations > 0; + queuedBlockOperations++; + if (delayed) { + delayedOperations.add(new AsyncBlockDelayedOperation(onAcquired, timeout, timeUnit)); + return; + } + } + asyncBlockOperations(onAcquired, timeout, timeUnit, ThreadPool.Names.GENERIC); + } + + private void asyncBlockOperations(final ActionListener onAcquired, + final long timeout, + final TimeUnit timeUnit, + final String executor) { + if (Assertions.ENABLED) { + // since queuedBlockOperations is not volatile, we have to synchronize even here for visibility + synchronized (this) { + assert queuedBlockOperations > 0; + } + } + try { + verifyNotClosed(); + } catch (final IndexShardClosedException e) { + onAcquired.onFailure(new IndexShardClosedException(shardId)); + return; + } + threadPool.executor(executor).execute(new AbstractRunnable() { final RunOnce released = new RunOnce(() -> releaseDelayedOperations()); @@ -150,19 +182,15 @@ protected void doRun() throws Exception { }); } - private void delayOperations() { + private void verifyNotClosed() { if (closed) { throw new IndexShardClosedException(shardId); } - synchronized (this) { - assert queuedBlockOperations > 0 || delayedOperations.isEmpty(); - queuedBlockOperations++; - } } private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException { if (Assertions.ENABLED) { - // since delayed is not volatile, we have to synchronize even here for visibility + // since queuedBlockOperations is not volatile, we have to synchronize even here for visibility synchronized (this) { assert queuedBlockOperations > 0; } @@ -187,10 +215,19 @@ private void releaseDelayedOperations() { queuedActions = new ArrayList<>(delayedOperations); delayedOperations.clear(); } else { - queuedActions = Collections.emptyList(); + queuedActions = new ArrayList<>(); + // Execute the next async block operation if there is one + final Optional nextAsyncOperation = delayedOperations.stream() + .filter(op -> op instanceof AsyncBlockDelayedOperation) + .findFirst(); + if (nextAsyncOperation.isPresent()) { + if (delayedOperations.remove(nextAsyncOperation.get())) { + queuedActions.add(nextAsyncOperation.get()); + } + } } } - if (!queuedActions.isEmpty()) { + if (queuedActions.isEmpty() == false) { /* * Try acquiring permits on fresh thread (for two reasons): * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; @@ -202,7 +239,7 @@ private void releaseDelayedOperations() { */ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { for (DelayedOperation queuedAction : queuedActions) { - acquire(queuedAction.listener, null, false, queuedAction.debugInfo, queuedAction.stackTrace); + queuedAction.run(); } }); } @@ -255,7 +292,7 @@ private void acquire(final ActionListener onAcquired, final String e } else { wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); } - delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace)); + delayedOperations.add(new SyncDelayedOperation(wrappedListener, debugInfo, stackTrace)); return; } else { releasable = acquire(debugInfo, stackTrace); @@ -326,13 +363,25 @@ List getActiveOperations() { .collect(Collectors.toList()); } - private static class DelayedOperation { - private final ActionListener listener; + private abstract class DelayedOperation extends AbstractRunnable { + protected final ActionListener listener; + + private DelayedOperation(final ActionListener listener) { + this.listener = listener; + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + } + + private class SyncDelayedOperation extends DelayedOperation { private final String debugInfo; private final StackTraceElement[] stackTrace; - private DelayedOperation(ActionListener listener, Object debugInfo, StackTraceElement[] stackTrace) { - this.listener = listener; + private SyncDelayedOperation(ActionListener listener, Object debugInfo, StackTraceElement[] stackTrace) { + super(listener); if (Assertions.ENABLED) { this.debugInfo = "[delayed] " + debugInfo; this.stackTrace = stackTrace; @@ -341,6 +390,27 @@ private DelayedOperation(ActionListener listener, Object debugInfo, this.stackTrace = null; } } + + @Override + public void doRun() throws Exception { + acquire(listener, null, false, debugInfo, stackTrace); + } + } + + private class AsyncBlockDelayedOperation extends DelayedOperation { + private final long timeout; + private final TimeUnit timeUnit; + + private AsyncBlockDelayedOperation(ActionListener listener, long timeout, TimeUnit timeUnit) { + super(listener); + this.timeout = timeout; + this.timeUnit = timeUnit; + } + + @Override + public void doRun() throws Exception { + asyncBlockOperations(listener, timeout, timeUnit, ThreadPool.Names.SAME); + } } /** From 54f80e0218ca4b7347349889fd1d90e001ea7cb4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 2/2] Update tests --- .../IndexShardOperationPermitsTests.java | 44 +++++++++++++++++++ .../index/shard/IndexShardTests.java | 3 -- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index a785c2c4d8224..f9d05220cf5c0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -48,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -704,6 +706,48 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc assertThat(permits.getActiveOperations(), emptyIterable()); } + public void testAsyncBlockOperationsAreExecutedInSubmissionOrder() throws Exception { + final int nbOps = scaledRandomIntBetween(10, 64); + final CountDownLatch latch = new CountDownLatch(nbOps); + + final AtomicInteger counter = new AtomicInteger(0); + final AtomicArray invocations = new AtomicArray<>(nbOps); + + try (Releasable ignored = blockAndWait()) { + for (int i = 0; i < nbOps; i++) { + final int operationId = i; + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + invocations.setOnce(counter.getAndIncrement(), operationId); + if (rarely()) { + try { + Thread.sleep(scaledRandomIntBetween(1, 100)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + throw new AssertionError(e); + } + }, 30, TimeUnit.SECONDS); + } + } + latch.await(); + + for (int i = 0; i < nbOps; i++) { + assertThat("Expected the operation with id [" + i + "] to be executed at place [" + i + + "] but got [" + invocations.get(i) + "] instead", invocations.get(i), equalTo(i)); + } + } + private static ActionListener wrap(final CheckedRunnable onResponse) { return new ActionListener() { @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 970e8ed5c9fab..cccc51b8017d3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -732,7 +732,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard return fut.get(); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testOperationPermitOnReplicaShards() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1023,7 +1022,6 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -1088,7 +1086,6 @@ public void onFailure(Exception e) { closeShard(indexShard, false); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false);