From 3f11b76c4c47b686122ea9d7a1ffc715844ac116 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 15:11:05 -0400 Subject: [PATCH] Clarify acquiring index shard permit In previous work, we refactored the delay mechanism in index shard operation permits to allow for async delaying of acquisition. This refactoring made explicit when permit acquisition is disabled whereas previously we were relying on an implicit condition, namely that all permits were acquired by the thread trying to delay acquisition. When using the implicit mechanism, we tried to acquire a permit and if this failed, we returned a null releasable as an indication that our operation should be queued. Yet, now we know when we are delayed and we should not even try to acquire a permit. If we try to acquire a permit and one is not available, we know that we are not delayed, and so acquisition should be successful. If it is not successful, something is deeply wrong. This commit takes advantage of this refactoring to simplify the internal implementation. --- .../shard/IndexShardOperationPermits.java | 13 +++++------ .../IndexShardOperationPermitsTests.java | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 83a372dd453f7..de539026e7a75 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable { private final Logger logger; private final ThreadPool threadPool; - private static final int TOTAL_PERMITS = Integer.MAX_VALUE; + static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; @@ -225,8 +224,7 @@ public void acquire(final ActionListener onAcquired, final String ex } return; } else { - releasable = tryAcquire(); - assert releasable != null; + releasable = acquire(); } } } catch (final InterruptedException e) { @@ -237,8 +235,7 @@ public void acquire(final ActionListener onAcquired, final String ex onAcquired.onResponse(releasable); } - @Nullable - private Releasable tryAcquire() throws InterruptedException { + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); @@ -247,8 +244,10 @@ private Releasable tryAcquire() throws InterruptedException { semaphore.release(1); } }; + } else { + // this should never happen, if it does something is deeply wrong + throw new IllegalStateException("failed to obtain permit but operations are not delayed"); } - return null; } /** diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ec22f9d862bbc..41dc8f520cc4a 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -533,6 +533,28 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException { thread.join(); } + public void testNoPermitsRemaining() throws InterruptedException { + permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> this.permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + assert false; + } + + @Override + public void onFailure(Exception e) { + assert false; + } + }, + ThreadPool.Names.GENERIC, + false)); + assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); + permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); + } + /** * Returns an operation that acquires a permit and synchronizes in the following manner: *