Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ public boolean isCancelled() {
}
};
}
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
return scheduler.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC);
}

// needs to be executed under a lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
retryCancellable = scheduler.schedule(() -> this.execute(bulkRequestForRetry), next, ThreadPool.Names.SAME);
}

private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void onRejection(Exception e) {
}

private void schedule(Runnable runnable, TimeValue delay) {
// schedule does not preserve context so have to do this manually
threadPool.schedule(threadPool.preserveContext(runnable), delay, ThreadPool.Names.SAME);
threadPool.schedule(runnable, delay, ThreadPool.Names.SAME);
}
}
12 changes: 1 addition & 11 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,7 @@ static boolean awaitTermination(final ScheduledThreadPoolExecutor scheduledThrea
}

/**
* Does nothing by default but can be used by subclasses to save the current thread context and wraps the command in a Runnable
* that restores that context before running the command.
*/
default Runnable preserveContext(Runnable command) {
return command;
}

/**
* Schedules a one-shot command to be run after a given delay. The command is not run in the context of the calling thread.
* To preserve the context of the calling thread you may call {@link #preserveContext(Runnable)} on the runnable before passing
* it to this method.
* Schedules a one-shot command to be run after a given delay. The command is run in the context of the calling thread.
* The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow
* to execute on a different executor, in which case blocking calls are allowed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,7 @@ public ExecutorService executor(String name) {
}

/**
* Schedules a one-shot command to run after a given delay. The command is not run in the context of the calling thread. To preserve the
* context of the calling thread you may call <code>threadPool.getThreadContext().preserveContext</code> on the runnable before passing
* it to this method.
* Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread.
*
* @param command the command to run
* @param delay delay before the task executes
Expand All @@ -339,6 +337,7 @@ public ExecutorService executor(String name) {
*/
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
command = threadContext.preserveContext(command);
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor));
}
Expand Down Expand Up @@ -371,11 +370,6 @@ public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval,
command, executor), e));
}

@Override
public Runnable preserveContext(Runnable command) {
return getThreadContext().preserveContext(command);
}

protected final void stopCachedTimeThread() {
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
Expand Down Expand Up @@ -102,4 +104,35 @@ public void testAssertCurrentMethodIsNotCalledRecursively() {
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorialForked is called recursively"));
terminate(threadPool);
}

public void testInheritContextOnSchedule() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch executed = new CountDownLatch(1);

TestThreadPool threadPool = new TestThreadPool("test");
try {
threadPool.getThreadContext().putHeader("foo", "bar");
final Integer one = Integer.valueOf(1);
threadPool.getThreadContext().putTransient("foo", one);
threadPool.schedule(() -> {
try {
latch.await();
} catch (InterruptedException e) {
fail();
}
assertEquals(threadPool.getThreadContext().getHeader("foo"), "bar");
assertSame(threadPool.getThreadContext().getTransient("foo"), one);
assertNull(threadPool.getThreadContext().getHeader("bar"));
assertNull(threadPool.getThreadContext().getTransient("bar"));
executed.countDown();
}, TimeValue.timeValueMillis(randomInt(100)), randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC));
threadPool.getThreadContext().putTransient("bar", "boom");
threadPool.getThreadContext().putHeader("bar", "boom");
latch.countDown();
executed.await();
} finally {
latch.countDown();
terminate(threadPool);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,6 @@ public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval,
return super.scheduleWithFixedDelay(command, interval, executor);
}

@Override
public Runnable preserveContext(Runnable command) {
return command;
}

@Override
public void shutdown() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ private void internalAddCompletionListener(ActionListener<AsyncSearchResponse> l

final Cancellable cancellable;
try {
cancellable = threadPool.schedule(threadPool.preserveContext(() -> {
cancellable = threadPool.schedule(() -> {
if (hasRun.compareAndSet(false, true)) {
// timeout occurred before completion
removeCompletionListener(id);
listener.onResponse(getResponse());
}
}), waitForCompletion, "generic");
}, waitForCompletion, "generic");
} catch (EsRejectedExecutionException exc) {
listener.onFailure(exc);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,8 @@ private void indexInvalidation(Collection<String> tokenIds, SecurityIndexManager
retryTokenDocIds.size(), tokenIds.size());
final TokensInvalidationResult incompleteResult = new TokensInvalidationResult(invalidated,
previouslyInvalidated, failedRequestResponses);
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> indexInvalidation(retryTokenDocIds, tokensIndexManager, backoff,
srcPrefix, incompleteResult, listener));
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
client.threadPool().schedule(() -> indexInvalidation(retryTokenDocIds, tokensIndexManager, backoff,
srcPrefix, incompleteResult, listener), backoff.next(), GENERIC);
} else {
if (retryTokenDocIds.isEmpty() == false) {
logger.warn("failed to invalidate [{}] tokens out of [{}] after all retries", retryTokenDocIds.size(),
Expand All @@ -809,10 +807,8 @@ private void indexInvalidation(Collection<String> tokenIds, SecurityIndexManager
traceLog("invalidate tokens", cause);
if (isShardNotAvailableException(cause) && backoff.hasNext()) {
logger.debug("failed to invalidate tokens, retrying ");
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> indexInvalidation(tokenIds, tokensIndexManager, backoff, srcPrefix,
previousResult, listener));
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
client.threadPool().schedule(() -> indexInvalidation(tokenIds, tokensIndexManager, backoff, srcPrefix,
previousResult, listener), backoff.next(), GENERIC);
} else {
listener.onFailure(e);
}
Expand Down Expand Up @@ -894,9 +890,8 @@ private void findTokenFromRefreshToken(String refreshToken, SecurityIndexManager
if (backoff.hasNext()) {
final TimeValue backofTimeValue = backoff.next();
logger.debug("retrying after [{}] back off", backofTimeValue);
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> findTokenFromRefreshToken(refreshToken, tokensIndexManager, backoff, listener));
client.threadPool().schedule(retryWithContextRunnable, backofTimeValue, GENERIC);
client.threadPool().schedule(() -> findTokenFromRefreshToken(refreshToken, tokensIndexManager, backoff, listener),
backofTimeValue, GENERIC);
} else {
logger.warn("failed to find token from refresh token after all retries");
onFailure.accept(ex);
Expand Down Expand Up @@ -1018,10 +1013,8 @@ private void innerRefresh(String refreshToken, String tokenDocId, Map<String, Ob
} else if (backoff.hasNext()) {
logger.info("failed to update the original token document [{}], the update result was [{}]. Retrying",
tokenDocId, updateResponse.getResult());
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> innerRefresh(refreshToken, tokenDocId, source, seqNo, primaryTerm, clientAuth,
backoff, refreshRequested, listener));
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
client.threadPool().schedule(() -> innerRefresh(refreshToken, tokenDocId, source, seqNo, primaryTerm,
clientAuth, backoff, refreshRequested, listener), backoff.next(), GENERIC);
} else {
logger.info("failed to update the original token document [{}] after all retries, the update result was [{}]. ",
tokenDocId, updateResponse.getResult());
Expand Down Expand Up @@ -1049,9 +1042,8 @@ public void onFailure(Exception e) {
if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.info("could not get token document [{}] for refresh, retrying", tokenDocId);
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> getTokenDocAsync(tokenDocId, refreshedTokenIndex, this));
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
client.threadPool().schedule(() -> getTokenDocAsync(tokenDocId, refreshedTokenIndex, this),
backoff.next(), GENERIC);
} else {
logger.warn("could not get token document [{}] for refresh after all retries", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
Expand All @@ -1064,10 +1056,8 @@ public void onFailure(Exception e) {
} else if (isShardNotAvailableException(e)) {
if (backoff.hasNext()) {
logger.debug("failed to update the original token document [{}], retrying", tokenDocId);
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
.preserveContext(() -> innerRefresh(refreshToken, tokenDocId, source, seqNo, primaryTerm,
clientAuth, backoff, refreshRequested, listener));
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
client.threadPool().schedule(() -> innerRefresh(refreshToken, tokenDocId, source, seqNo, primaryTerm,
clientAuth, backoff, refreshRequested, listener), backoff.next(), GENERIC);
} else {
logger.warn("failed to update the original token document [{}], after all retries", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
Expand Down