Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
Expand Down Expand Up @@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable {
private final Logger logger;
private final ThreadPool threadPool;

private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
Expand Down Expand Up @@ -225,8 +224,7 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
}
return;
} else {
releasable = tryAcquire();
assert releasable != null;
releasable = acquire();
}
}
} catch (final InterruptedException e) {
Expand All @@ -237,8 +235,7 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
onAcquired.onResponse(releasable);
}

@Nullable
private Releasable tryAcquire() throws InterruptedException {
private Releasable acquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
Expand All @@ -247,8 +244,10 @@ private Releasable tryAcquire() throws InterruptedException {
semaphore.release(1);
}
};
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,28 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException {
thread.join();
}

public void testNoPermitsRemaining() throws InterruptedException {
permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS);
final IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> this.permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
assert false;
}

@Override
public void onFailure(Exception e) {
assert false;
}
},
ThreadPool.Names.GENERIC,
false));
assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed")));
permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS);
}

/**
* Returns an operation that acquires a permit and synchronizes in the following manner:
* <ul>
Expand Down