From 3b4a5c0b6d6e8c07c7e664953ba72604ed6f24e1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 26 May 2017 22:19:43 -0400 Subject: [PATCH 01/12] Introduce clean transition on primary promotion This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed. --- .../elasticsearch/index/shard/IndexShard.java | 25 +- .../shard/IndexShardOperationPermits.java | 117 ++++++++-- .../IndexShardOperationPermitsTests.java | 213 +++++++++++++++++- .../index/shard/IndexShardTests.java | 108 +++++++++ 4 files changed, 429 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 35ab1db14189a..d3fbcad1351ea 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -336,12 +336,14 @@ public long getPrimaryTerm() { } /** - * notifies the shard of an increase in the primary term + * Notifies the shard of an increase in the primary term. + * + * @param newPrimaryTerm the new primary term */ - public void updatePrimaryTerm(final long newTerm) { + public void updatePrimaryTerm(final long newPrimaryTerm) { assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { - if (newTerm != primaryTerm) { + if (newPrimaryTerm != primaryTerm) { // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned // in one state causing it's term to be incremented. Note that if both current shard state and new // shard state are initializing, we could replace the current shard and reinitialize it. It is however @@ -358,10 +360,15 @@ public void updatePrimaryTerm(final long newTerm) { "a started primary shard should never update its term; " + "shard " + shardRouting + ", " + "current term [" + primaryTerm + "], " - + "new term [" + newTerm + "]"; - assert newTerm > primaryTerm : - "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]"; - primaryTerm = newTerm; + + "new term [" + newPrimaryTerm + "]"; + assert newPrimaryTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; + /* + * Before this call returns, we are guarantee that all future operations are delayed and so this happens before we increment + * the primary term. + */ + indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {}); + primaryTerm = newPrimaryTerm; } } } @@ -460,7 +467,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { - indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + indexShardOperationPermits.syncBlockOperations(30, TimeUnit.MINUTES, () -> { // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : "in-flight operations in progress while moving shard state to relocated"; @@ -1876,7 +1883,7 @@ public void acquireReplicaOperationPermit( synchronized (primaryTermMutex) { if (operationPrimaryTerm > primaryTerm) { try { - indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + indexShardOperationPermits.syncBlockOperations(30, TimeUnit.MINUTES, () -> { assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index fea26168efa9d..7fc56a154481f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; @@ -38,18 +39,32 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +/** + * Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition + * between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain + * and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has + * completed. + */ final class IndexShardOperationPermits implements Closeable { + private final ShardId shardId; private final Logger logger; private final ThreadPool threadPool; private static final int TOTAL_PERMITS = Integer.MAX_VALUE; - // fair semaphore to ensure that blockOperations() does not starve under thread contention - final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); + final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE, true); // fair to ensure a blocking thread is not starved @Nullable private List> delayedOperations; // operations that are delayed private volatile boolean closed; + private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this - IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) { + /** + * Construct operation permits for the specified shards. + * + * @param shardId the shard + * @param logger the logger for the shard + * @param threadPool the thread pool (used to execute delayed operations) + */ + IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) { this.shardId = shardId; this.logger = logger; this.threadPool = threadPool; @@ -61,21 +76,68 @@ public void close() { } /** - * Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues - * operations that are occurring in the meanwhile and runs them once onBlocked has executed. + * Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues + * operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed. * - * @param timeout the maximum time to wait for the in-flight operations block - * @param timeUnit the time unit of the {@code timeout} argument + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument * @param onBlocked the action to run once the block has been acquired - * @throws InterruptedException if calling thread is interrupted - * @throws TimeoutException if timed out waiting for in-flight operations to finish + * @param the type of checked exception thrown by {@code onBlocked} + * @throws InterruptedException if calling thread is interrupted + * @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws IndexShardClosedException if operation permit has been closed */ - public void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable onBlocked) throws - InterruptedException, TimeoutException, E { + void syncBlockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { if (closed) { throw new IndexShardClosedException(shardId); } + delayOperations(); + doBlockOperations(timeout, timeUnit, onBlocked); + } + + /** + * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} + * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After + * operations are delayed and the blocking is forked to another thread, returns to the caller. + * + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + * @param onBlocked the action to run once the block has been acquired + * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) + */ + void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked) { + delayOperations(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + doBlockOperations(timeout, timeUnit, onBlocked); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + } + + private void delayOperations() { + synchronized (this) { + if (delayed) { + throw new IllegalStateException("operations are already delayed"); + } else { + delayed = true; + } + } + } + + private void doBlockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { + if (Assertions.ENABLED) { + synchronized (this) { + assert delayed; + } + } try { if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { assert semaphore.availablePermits() == 0; @@ -92,6 +154,7 @@ public void blockOperations(long timeout, TimeUnit timeUni synchronized (this) { queuedActions = delayedOperations; delayedOperations = null; + delayed = false; } if (queuedActions != null) { // Try acquiring permits on fresh thread (for two reasons): @@ -112,24 +175,25 @@ public void blockOperations(long timeout, TimeUnit timeUni /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided * {@link ActionListener} will be called on the calling thread. During calls of - * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will - * then be called using the provided executor once operations are no longer blocked. + * {@link #syncBlockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} + * will then be called using the provided executor once operations are no longer blocked. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call * @param forceExecution whether the runnable should force its execution in case it gets rejected */ - public void acquire(ActionListener onAcquired, String executorOnDelay, boolean forceExecution) { + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; } - Releasable releasable; + final Releasable releasable; try { synchronized (this) { releasable = tryAcquire(); if (releasable == null) { - // blockOperations is executing, this operation will be retried by blockOperations once it finishes + assert delayed; + // operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked if (delayedOperations == null) { delayedOperations = new ArrayList<>(); } @@ -144,7 +208,7 @@ public void acquire(ActionListener onAcquired, String executorOnDela return; } } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { onAcquired.onFailure(e); return; } @@ -152,8 +216,9 @@ public void acquire(ActionListener onAcquired, String executorOnDela } @Nullable private Releasable tryAcquire() throws InterruptedException { - if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting - AtomicBoolean closed = new AtomicBoolean(); + assert Thread.holdsLock(this); + if (!delayed && semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting + final AtomicBoolean closed = new AtomicBoolean(); return () -> { if (closed.compareAndSet(false, true)) { semaphore.release(1); @@ -163,13 +228,23 @@ public void acquire(ActionListener onAcquired, String executorOnDela return null; } - public int getActiveOperationsCount() { + /** + * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). + * + * @return the active operation count, or zero when all permits ar eheld + */ + int getActiveOperationsCount() { int availablePermits = semaphore.availablePermits(); if (availablePermits == 0) { - // when blockOperations is holding all permits + /* + * This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the + * remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that + * the active operations count is zero. + */ return 0; } else { return TOTAL_PERMITS - availablePermits; } } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 18a250a42827d..e263839e37b02 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -25,17 +25,24 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -137,13 +144,13 @@ public void testOperationsIfClosed() throws ExecutionException, InterruptedExcep public void testBlockIfClosed() throws ExecutionException, InterruptedException { permits.close(); - expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, + expectThrows(IndexShardClosedException.class, () -> permits.syncBlockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } @@ -184,7 +191,7 @@ public void onResponse(Releasable releasable) { } }; - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { // we preserve the thread context here so that we have a different context in the call to acquire than the context present // when the releasable is closed try (ThreadContext.StoredContext ignore = context.newStoredContext(false)) { @@ -208,7 +215,7 @@ protected Releasable blockAndWait() throws InterruptedException { IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0)); threadPool.generic().execute(() -> { try { - permits.blockOperations(1, TimeUnit.MINUTES, () -> { + permits.syncBlockOperations(1, TimeUnit.MINUTES, () -> { try { blockAcquired.countDown(); releaseBlock.await(); @@ -238,6 +245,204 @@ protected Releasable blockAndWait() throws InterruptedException { }; } + public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedException { + final CountDownLatch blockAcquired = new CountDownLatch(1); + final CountDownLatch releaseBlock = new CountDownLatch(1); + final AtomicBoolean blocked = new AtomicBoolean(); + permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }); + blockAcquired.await(); + assertTrue(blocked.get()); + + // an operation that is submitted while there is a delay in place should be delayed + final CountDownLatch delayedOperation = new CountDownLatch(1); + final AtomicBoolean delayed = new AtomicBoolean(); + final Thread thread = new Thread(() -> + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + delayed.set(true); + releasable.close(); + delayedOperation.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false)); + thread.start(); + assertFalse(delayed.get()); + releaseBlock.countDown(); + delayedOperation.await(); + assertTrue(delayed.get()); + thread.join(); + } + + public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedException, BrokenBarrierException { + final CountDownLatch operationExecuting = new CountDownLatch(1); + final CountDownLatch firstOperationLatch = new CountDownLatch(1); + final CountDownLatch firstOperationComplete = new CountDownLatch(1); + final Thread firstOperationThread = new Thread(() -> { + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + operationExecuting.countDown(); + try { + firstOperationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + firstOperationComplete.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }); + firstOperationThread.start(); + + operationExecuting.await(); + + // now we will delay operations while the first operation is still executing (because it is latched) + final CountDownLatch blockedLatch = new CountDownLatch(1); + final AtomicBoolean onBlocked = new AtomicBoolean(); + permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { + onBlocked.set(true); + blockedLatch.countDown(); + }); + + assertFalse(onBlocked.get()); + + // if we submit another operation, it should be delayed + final CountDownLatch secondOperationExecuting = new CountDownLatch(1); + final CountDownLatch secondOperationComplete = new CountDownLatch(1); + final AtomicBoolean secondOperation = new AtomicBoolean(); + final Thread secondOperationThread = new Thread(() -> { + secondOperationExecuting.countDown(); + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + secondOperation.set(true); + releasable.close(); + secondOperationComplete.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }); + secondOperationThread.start(); + + secondOperationExecuting.await(); + assertFalse(secondOperation.get()); + + firstOperationLatch.countDown(); + firstOperationComplete.await(); + blockedLatch.await(); + assertTrue(onBlocked.get()); + + secondOperationComplete.await(); + assertTrue(secondOperation.get()); + + firstOperationThread.join(); + secondOperationThread.join(); + } + + public void testAsyncBlockOperationsRace() throws Exception { + // we racily submit operations and a delay, and then ensure that all operations were actually completed + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + 1 + operations); + final CountDownLatch operationLatch = new CountDownLatch(1 + operations); + final Set values = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final int value = i; + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + values.add(value); + releasable.close(); + operationLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false); + }); + thread.start(); + threads.add(thread); + } + + final Thread blockingThread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { + values.add(operations); + operationLatch.countDown(); + }); + }); + blockingThread.start(); + + barrier.await(); + + operationLatch.await(); + for (final Thread thread : threads) { + thread.join(); + } + blockingThread.join(); + + // check that all operations completed + for (int i = 0; i < operations; i++) { + assertTrue(values.contains(i)); + } + assertTrue(values.contains(operations)); + /* + * The block operation is executed on another thread and the operations can have completed before this thread has returned all the + * permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to + * the semaphore. + */ + awaitBusy(() -> { + for (final ThreadPoolStats.Stats stats : threadPool.stats()) { + if (ThreadPool.Names.GENERIC.equals(stats.getName())) { + return stats.getActive() == 0; + } + } + return false; + }); + } + public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); permits.acquire(future1, ThreadPool.Names.GENERIC, true); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a113132351b78..72f19371bc547 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -280,6 +280,114 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu } } + public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + final CountDownLatch operationLatch = new CountDownLatch(1); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + indexShard.getPrimaryTerm(), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + latch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + threads.add(thread); + } + + barrier.await(); + latch.await(); + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + final int delayedOperations = scaledRandomIntBetween(1, 64); + final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); + final CountDownLatch delayedOperationsLatch = new CountDownLatch(delayedOperations); + final AtomicLong counter = new AtomicLong(); + final List delayedThreads = new ArrayList<>(); + for (int i = 0; i < delayedOperations; i++) { + final Thread thread = new Thread(() -> { + try { + delayedOperationsBarrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + counter.incrementAndGet(); + releasable.close(); + delayedOperationsLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + delayedThreads.add(thread); + } + + delayedOperationsBarrier.await(); + + assertThat(counter.get(), equalTo(0L)); + + operationLatch.countDown(); + for (final Thread thread : threads) { + thread.join(); + } + + delayedOperationsLatch.await(); + + assertThat(counter.get(), equalTo((long) delayedOperations)); + + for (final Thread thread : delayedThreads) { + thread.join(); + } + + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; From 069745e615a0b8548225f400ac02c2e0473e222f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 12:50:23 -0400 Subject: [PATCH 02/12] Separate block/release --- .../shard/IndexShardOperationPermits.java | 87 +++++++++++-------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 7fc56a154481f..1c639904ec86c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.threadpool.ThreadPool; @@ -95,7 +96,11 @@ void syncBlockOperations( throw new IndexShardClosedException(shardId); } delayOperations(); - doBlockOperations(timeout, timeUnit, onBlocked); + try { + doBlockOperations(timeout, timeUnit, onBlocked); + } finally { + releasedDelayedOperations(); + } } /** @@ -110,12 +115,21 @@ void syncBlockOperations( */ void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked) { delayOperations(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - try { - doBlockOperations(timeout, timeUnit, onBlocked); - } catch (final Exception e) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { throw new RuntimeException(e); } + + @Override + protected void doRun() throws Exception { + doBlockOperations(timeout, timeUnit, onBlocked); + } + + @Override + public void onAfter() { + releasedDelayedOperations(); + } }); } @@ -138,37 +152,40 @@ private void doBlockOperations( assert delayed; } } - try { - if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - assert semaphore.availablePermits() == 0; - try { - onBlocked.run(); - } finally { - semaphore.release(TOTAL_PERMITS); - } - } else { - throw new TimeoutException("timed out during blockOperations"); - } - } finally { - final List> queuedActions; - synchronized (this) { - queuedActions = delayedOperations; - delayedOperations = null; - delayed = false; - } - if (queuedActions != null) { - // Try acquiring permits on fresh thread (for two reasons): - // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled. - // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by - // ThreadedActionListener if the queue of the thread pool on which it submits is full. - // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure - // handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery. - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (ActionListener queuedAction : queuedActions) { - acquire(queuedAction, null, false); - } - }); + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { + assert semaphore.availablePermits() == 0; + try { + onBlocked.run(); + } finally { + semaphore.release(TOTAL_PERMITS); } + } else { + throw new TimeoutException("timed out during blockOperations"); + } + } + + private void releasedDelayedOperations() { + final List> queuedActions; + synchronized (this) { + assert delayed; + queuedActions = delayedOperations; + delayedOperations = null; + delayed = false; + } + if (queuedActions != null) { + /* + * Try acquiring permits on fresh thread (for two reasons): + * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; + * interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by + * the threaded action listener if the queue of the thread pool on which it submits is full + * - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the onFailure + * handler is executed on the calling thread; this should not be the recovery thread as it would delay the recovery + */ + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + for (ActionListener queuedAction : queuedActions) { + acquire(queuedAction, null, false); + } + }); } } From 46978eeb60fc7b9bbe43b305ce7291dbcf0af79c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 12:50:37 -0400 Subject: [PATCH 03/12] Fix permits --- .../elasticsearch/index/shard/IndexShardOperationPermits.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 1c639904ec86c..4901229a2d063 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -53,7 +53,7 @@ final class IndexShardOperationPermits implements Closeable { private final ThreadPool threadPool; private static final int TOTAL_PERMITS = Integer.MAX_VALUE; - final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE, true); // fair to ensure a blocking thread is not starved + final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved @Nullable private List> delayedOperations; // operations that are delayed private volatile boolean closed; private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this From b420228c566d453a0dd85e7286e633c006de6250 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 13:28:49 -0400 Subject: [PATCH 04/12] Failure listener --- .../elasticsearch/index/shard/IndexShard.java | 6 +- .../shard/IndexShardOperationPermits.java | 11 +- .../IndexShardOperationPermitsTests.java | 128 ++++++++++++++++-- 3 files changed, 127 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d3fbcad1351ea..e3e2bf4cc2b62 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -367,7 +367,11 @@ public void updatePrimaryTerm(final long newPrimaryTerm) { * Before this call returns, we are guarantee that all future operations are delayed and so this happens before we increment * the primary term. */ - indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {}); + indexShardOperationPermits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> {}, + e -> failShard("exception during primary term transition", e)); primaryTerm = newPrimaryTerm; } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 4901229a2d063..50994211971c8 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -111,14 +112,16 @@ void syncBlockOperations( * @param timeout the maximum time to wait for the in-flight operations block * @param timeUnit the time unit of the {@code timeout} argument * @param onBlocked the action to run once the block has been acquired + * @param onFailure the action to run if a failure occurs while blocking operations * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) */ - void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked) { + void asyncBlockOperations( + final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked, final Consumer onFailure) { delayOperations(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); + public void onFailure(final Exception e) { + onFailure.accept(e); } @Override @@ -160,7 +163,7 @@ private void doBlockOperations( semaphore.release(TOTAL_PERMITS); } } else { - throw new TimeoutException("timed out during blockOperations"); + throw new TimeoutException("timeout while blocking operations"); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index e263839e37b02..bc10b2764a7fe 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -43,9 +43,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; public class IndexShardOperationPermitsTests extends ESTestCase { @@ -249,11 +252,17 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx final CountDownLatch blockAcquired = new CountDownLatch(1); final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); - permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - }); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }, + e -> { + throw new RuntimeException(e); + }); blockAcquired.await(); assertTrue(blocked.get()); @@ -319,10 +328,15 @@ public void onFailure(Exception e) { // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); final AtomicBoolean onBlocked = new AtomicBoolean(); - permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { - onBlocked.set(true); - blockedLatch.countDown(); - }); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + onBlocked.set(true); + blockedLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); assertFalse(onBlocked.get()); @@ -408,10 +422,15 @@ public void onFailure(Exception e) { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - permits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { - values.add(operations); - operationLatch.countDown(); - }); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + values.add(operations); + operationLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); }); blockingThread.start(); @@ -472,4 +491,87 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx future3.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(0)); } + + public void testAsyncBlockOperationsOnFailure() throws InterruptedException { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 10, + TimeUnit.MINUTES, + () -> { + throw new RuntimeException("simulated"); + }, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), instanceOf(RuntimeException.class)); + assertThat(reference.get(), hasToString(containsString("simulated"))); + } + + public void testTimeout() throws BrokenBarrierException, InterruptedException { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch operationLatch = new CountDownLatch(1); + + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + operationLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }); + thread.start(); + + barrier.await(); + + { + final TimeoutException e = + expectThrows(TimeoutException.class, () -> permits.syncBlockOperations(1, TimeUnit.MILLISECONDS, () -> {})); + assertThat(e, hasToString(containsString("timeout while blocking operations"))); + } + + { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 1, + TimeUnit.MILLISECONDS, + () -> {}, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); + } + + latch.countDown(); + + operationLatch.await(); + + thread.join(); + } + } From ebb9f3d05ca8a3872d6c1c635266c2c357d062c5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 14:09:27 -0400 Subject: [PATCH 05/12] Simpler --- .../index/shard/IndexShardOperationPermits.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 50994211971c8..d3e2df2e5a08b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -210,34 +210,35 @@ public void acquire(final ActionListener onAcquired, final String ex final Releasable releasable; try { synchronized (this) { - releasable = tryAcquire(); - if (releasable == null) { - assert delayed; - // operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked + if (delayed) { if (delayedOperations == null) { delayedOperations = new ArrayList<>(); } final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); if (executorOnDelay != null) { delayedOperations.add( - new ThreadedActionListener<>(logger, threadPool, executorOnDelay, - new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); + new ThreadedActionListener<>(logger, threadPool, executorOnDelay, + new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); } else { delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); } return; + } else { + releasable = tryAcquire(); + assert releasable != null; } } } catch (final InterruptedException e) { onAcquired.onFailure(e); return; } + // execute this outside the synchronized block! onAcquired.onResponse(releasable); } @Nullable private Releasable tryAcquire() throws InterruptedException { assert Thread.holdsLock(this); - if (!delayed && semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting + if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); return () -> { if (closed.compareAndSet(false, true)) { From e9224dd24c08846ef98d8b1904e3cb366f8cfb98 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 14:32:55 -0400 Subject: [PATCH 06/12] Delayed --- .../shard/IndexShardOperationPermits.java | 12 +- .../IndexShardOperationPermitsTests.java | 123 ++++++++++-------- 2 files changed, 71 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index d3e2df2e5a08b..c0d63c8f4baff 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -55,7 +55,7 @@ final class IndexShardOperationPermits implements Closeable { private static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved - @Nullable private List> delayedOperations; // operations that are delayed + private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this @@ -141,6 +141,7 @@ private void delayOperations() { if (delayed) { throw new IllegalStateException("operations are already delayed"); } else { + assert delayedOperations.isEmpty(); delayed = true; } } @@ -171,11 +172,11 @@ private void releasedDelayedOperations() { final List> queuedActions; synchronized (this) { assert delayed; - queuedActions = delayedOperations; - delayedOperations = null; + queuedActions = new ArrayList<>(delayedOperations); + delayedOperations.clear(); delayed = false; } - if (queuedActions != null) { + if (!queuedActions.isEmpty()) { /* * Try acquiring permits on fresh thread (for two reasons): * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; @@ -211,9 +212,6 @@ public void acquire(final ActionListener onAcquired, final String ex try { synchronized (this) { if (delayed) { - if (delayedOperations == null) { - delayedOperations = new ArrayList<>(); - } final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); if (executorOnDelay != null) { delayedOperations.add( diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index bc10b2764a7fe..de53619ba99e1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -295,35 +295,17 @@ public void onFailure(Exception e) { } public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedException, BrokenBarrierException { - final CountDownLatch operationExecuting = new CountDownLatch(1); + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); final CountDownLatch firstOperationLatch = new CountDownLatch(1); - final CountDownLatch firstOperationComplete = new CountDownLatch(1); - final Thread firstOperationThread = new Thread(() -> { - permits.acquire( - new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - operationExecuting.countDown(); - try { - firstOperationLatch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - releasable.close(); - firstOperationComplete.countDown(); - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } - }, - ThreadPool.Names.GENERIC, - false); - }); + final CountDownLatch firstOperationCompleteLatch = new CountDownLatch(1); + final Thread firstOperationThread = + new Thread(controlledAcquire(barrier, operationExecutingLatch, firstOperationLatch, firstOperationCompleteLatch)); firstOperationThread.start(); - operationExecuting.await(); + barrier.await(); + + operationExecutingLatch.await(); // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); @@ -369,7 +351,7 @@ public void onFailure(Exception e) { assertFalse(secondOperation.get()); firstOperationLatch.countDown(); - firstOperationComplete.await(); + firstOperationCompleteLatch.await(); blockedLatch.await(); assertTrue(onBlocked.get()); @@ -512,40 +494,17 @@ public void testAsyncBlockOperationsOnFailure() throws InterruptedException { public void testTimeout() throws BrokenBarrierException, InterruptedException { final CyclicBarrier barrier = new CyclicBarrier(2); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); final CountDownLatch operationLatch = new CountDownLatch(1); + final CountDownLatch operationCompleteLatch = new CountDownLatch(1); - final Thread thread = new Thread(() -> { - try { - barrier.await(); - } catch (final BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - permits.acquire( - new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try { - latch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - releasable.close(); - operationLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } - }, - ThreadPool.Names.GENERIC, - false); - }); + final Thread thread = new Thread(controlledAcquire(barrier, operationExecutingLatch, operationLatch, operationCompleteLatch)); thread.start(); barrier.await(); + operationExecutingLatch.await(); + { final TimeoutException e = expectThrows(TimeoutException.class, () -> permits.syncBlockOperations(1, TimeUnit.MILLISECONDS, () -> {})); @@ -567,11 +526,61 @@ public void onFailure(Exception e) { assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); } - latch.countDown(); + operationLatch.countDown(); - operationLatch.await(); + operationCompleteLatch.await(); thread.join(); } + /** + * Returns an operation that acquires a permit and synchronizes in the following manner: + *
    + *
  • waits on the {@code barrier} before acquiring a permit
  • + *
  • counts down the {@code operationExecutingLatch} when it acquires the permit
  • + *
  • waits on the {@code operationLatch} before releasing the permit
  • + *
  • counts down the {@code operationCompleteLatch} after releasing the permit
  • + *
+ * + * @param barrier the barrier to wait on + * @param operationExecutingLatch the latch to countdown after acquiring the permit + * @param operationLatch the latch to wait on before releasing the permit + * @param operationCompleteLatch the latch to countdown after releasing the permit + * @return a controllable runnable that acquires a permit + */ + private Runnable controlledAcquire( + final CyclicBarrier barrier, + final CountDownLatch operationExecutingLatch, + final CountDownLatch operationLatch, + final CountDownLatch operationCompleteLatch) { + return () -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + operationExecutingLatch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + operationCompleteLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }; + } + } From 73c48fe81cb5baa603f88edc832627e11b5a94b3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 14:38:00 -0400 Subject: [PATCH 07/12] Fix comment formatting --- .../index/shard/IndexShardOperationPermits.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index c0d63c8f4baff..c2ffb943979e4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -179,11 +179,12 @@ private void releasedDelayedOperations() { if (!queuedActions.isEmpty()) { /* * Try acquiring permits on fresh thread (for two reasons): - * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; - * interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by - * the threaded action listener if the queue of the thread pool on which it submits is full - * - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the onFailure - * handler is executed on the calling thread; this should not be the recovery thread as it would delay the recovery + * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; + * interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by + * the threaded action listener if the queue of the thread pool on which it submits is full + * - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the + * onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the + * recovery */ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { for (ActionListener queuedAction : queuedActions) { From f04ff7e4627911043f4bf6dfe992be81e3c8bc6f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 20:01:33 -0400 Subject: [PATCH 08/12] Comments --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- .../elasticsearch/index/shard/IndexShardOperationPermits.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9f373c3e1bf49..806d3ca96c7f0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -360,8 +360,8 @@ public void updatePrimaryTerm(final long newPrimaryTerm) { assert newPrimaryTerm > primaryTerm : "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; /* - * Before this call returns, we are guarantee that all future operations are delayed and so this happens before we increment - * the primary term. + * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we + * increment the primary term. */ indexShardOperationPermits.asyncBlockOperations( 30, diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index c2ffb943979e4..ee2e56839891d 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -107,7 +107,8 @@ void syncBlockOperations( /** * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After - * operations are delayed and the blocking is forked to another thread, returns to the caller. + * operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking + * operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked. * * @param timeout the maximum time to wait for the in-flight operations block * @param timeUnit the time unit of the {@code timeout} argument From a4d2bf3294e4ff30c2554d97f1cb7ea161ffad48 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 20:08:41 -0400 Subject: [PATCH 09/12] Latch to ensure ordering --- .../java/org/elasticsearch/index/shard/IndexShard.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 806d3ca96c7f0..7e0fe05790641 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -131,6 +131,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -361,14 +362,17 @@ public void updatePrimaryTerm(final long newPrimaryTerm) { "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; /* * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we - * increment the primary term. + * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is + * incremented. */ + final CountDownLatch latch = new CountDownLatch(1); indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, - () -> {}, + latch::await, e -> failShard("exception during primary term transition", e)); primaryTerm = newPrimaryTerm; + latch.countDown(); } } } From 27b4f850bfac5833da62b65fdbc64c4b7f3aff2d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 20:37:51 -0400 Subject: [PATCH 10/12] Formatting so I can stop fighting IntelliJ --- .../elasticsearch/index/shard/IndexShardOperationPermits.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index ee2e56839891d..1025c89033103 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -236,7 +236,8 @@ public void acquire(final ActionListener onAcquired, final String ex onAcquired.onResponse(releasable); } - @Nullable private Releasable tryAcquire() throws InterruptedException { + @Nullable + private Releasable tryAcquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); From 419528a8c7a3f4db74f8f3872170e9a747c021c6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 06:18:41 -0400 Subject: [PATCH 11/12] Rename --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- .../index/shard/IndexShardOperationPermits.java | 4 ++-- .../index/shard/IndexShardOperationPermitsTests.java | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7e0fe05790641..6a79026125243 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -471,7 +471,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { - indexShardOperationPermits.syncBlockOperations(30, TimeUnit.MINUTES, () -> { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : "in-flight operations in progress while moving shard state to relocated"; @@ -1878,7 +1878,7 @@ public void acquireReplicaOperationPermit( synchronized (primaryTermMutex) { if (operationPrimaryTerm > primaryTerm) { try { - indexShardOperationPermits.syncBlockOperations(30, TimeUnit.MINUTES, () -> { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { assert operationPrimaryTerm > primaryTerm : "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 1025c89033103..4e116cbe75e30 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -89,7 +89,7 @@ public void close() { * @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws IndexShardClosedException if operation permit has been closed */ - void syncBlockOperations( + void blockOperations( final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { @@ -198,7 +198,7 @@ private void releasedDelayedOperations() { /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided * {@link ActionListener} will be called on the calling thread. During calls of - * {@link #syncBlockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} + * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} * will then be called using the provided executor once operations are no longer blocked. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index de53619ba99e1..ec22f9d862bbc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -147,7 +147,7 @@ public void testOperationsIfClosed() throws ExecutionException, InterruptedExcep public void testBlockIfClosed() throws ExecutionException, InterruptedException { permits.close(); - expectThrows(IndexShardClosedException.class, () -> permits.syncBlockOperations(randomInt(10), TimeUnit.MINUTES, + expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); } @@ -218,7 +218,7 @@ protected Releasable blockAndWait() throws InterruptedException { IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0)); threadPool.generic().execute(() -> { try { - permits.syncBlockOperations(1, TimeUnit.MINUTES, () -> { + permits.blockOperations(1, TimeUnit.MINUTES, () -> { try { blockAcquired.countDown(); releaseBlock.await(); @@ -507,7 +507,7 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException { { final TimeoutException e = - expectThrows(TimeoutException.class, () -> permits.syncBlockOperations(1, TimeUnit.MILLISECONDS, () -> {})); + expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {})); assertThat(e, hasToString(containsString("timeout while blocking operations"))); } From 632af5a54e175a9cc765950a557fc85244726720 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 11:38:07 -0400 Subject: [PATCH 12/12] Last comments --- .../index/shard/IndexShardOperationPermits.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 4e116cbe75e30..83a372dd453f7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -100,7 +100,7 @@ void blockOperations( try { doBlockOperations(timeout, timeUnit, onBlocked); } finally { - releasedDelayedOperations(); + releaseDelayedOperations(); } } @@ -132,7 +132,7 @@ protected void doRun() throws Exception { @Override public void onAfter() { - releasedDelayedOperations(); + releaseDelayedOperations(); } }); } @@ -153,6 +153,7 @@ private void doBlockOperations( final TimeUnit timeUnit, final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { if (Assertions.ENABLED) { + // since delayed is not volatile, we have to synchronize even here for visibility synchronized (this) { assert delayed; } @@ -169,7 +170,7 @@ private void doBlockOperations( } } - private void releasedDelayedOperations() { + private void releaseDelayedOperations() { final List> queuedActions; synchronized (this) { assert delayed;