Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -198,11 +198,14 @@ private void releaseDelayedOperations() {
/**
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
* will then be called using the provided executor once operations are no longer blocked.
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
* The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
* longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
* directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure
* that the {@link ActionListener#onFailure(Exception)} method provided here only contains lightweight operations.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
Expand All @@ -217,7 +220,7 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
if (executorOnDelay != null) {
delayedOperations.add(
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
Expand Down Expand Up @@ -269,4 +272,56 @@ int getActiveOperationsCount() {
}
}

/**
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
* invoker's thread to communicate failures.
*/
private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {

private final ThreadPool threadPool;
private final String executor;
private final ActionListener<Releasable> listener;
private final boolean forceExecution;

private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
boolean forceExecution) {
this.threadPool = threadPool;
this.executor = executor;
this.listener = listener;
this.forceExecution = forceExecution;
}

@Override
public void onResponse(final Releasable releasable) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() throws Exception {
listener.onResponse(releasable);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(releasable);
super.onRejection(e);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e); // will possibly execute on the caller thread
}
});
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -47,6 +50,7 @@
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -59,7 +63,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase {

@BeforeClass
public static void setupThreadPool() {
threadPool = new TestThreadPool("IndexShardOperationsLockTests");
int bulkThreadPoolSize = randomIntBetween(1, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to randomly use limited capacity.

int bulkThreadPoolQueueSize = randomIntBetween(1, 2);
threadPool = new TestThreadPool("IndexShardOperationsLockTests",
Settings.builder()
.put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize)
.put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize)
.build());
assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(),
equalTo(bulkThreadPoolQueueSize));
}

@AfterClass
Expand All @@ -82,33 +97,53 @@ public void checkNoInflightOperations() {
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
int numThreads = 10;

class DummyException extends RuntimeException {}

List<PlainActionFuture<Releasable>> futures = new ArrayList<>();
List<Thread> operationThreads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(numThreads / 2);
CountDownLatch latch = new CountDownLatch(numThreads / 4);
boolean forceExecution = randomBoolean();
for (int i = 0; i < numThreads; i++) {
// the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool
String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC);
boolean failingListener = randomBoolean();
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
super.onResponse(releasable);
if (failingListener) {
throw new DummyException();
} else {
super.onResponse(releasable);
}
}
};
Thread thread = new Thread() {
public void run() {
latch.countDown();
permits.acquire(future, ThreadPool.Names.GENERIC, true);
try {
permits.acquire(future, threadPoolName, forceExecution);
} catch (DummyException dummyException) {
// ok, notify future
assertTrue(failingListener);
future.onFailure(dummyException);
}
}
};
futures.add(future);
operationThreads.add(thread);
}

boolean closeAfterBlocking = randomBoolean();
CountDownLatch blockFinished = new CountDownLatch(1);
threadPool.generic().execute(() -> {
try {
latch.await();
blockAndWait().close();
blockFinished.countDown();
if (closeAfterBlocking) {
permits.close();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -119,7 +154,16 @@ public void run() {
}

for (PlainActionFuture<Releasable> future : futures) {
assertNotNull(future.get(1, TimeUnit.MINUTES));
try {
assertNotNull(future.get(1, TimeUnit.MINUTES));
} catch (ExecutionException e) {
if (closeAfterBlocking) {
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class))
.or(instanceOf(IndexShardClosedException.class)));
} else {
assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class)));
}
}
}

for (Thread thread : operationThreads) {
Expand Down