diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 8627a465805ce..4a3a88b728ec5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -11,7 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.Builder; @@ -38,7 +39,6 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -96,12 +96,11 @@ public class MasterService extends AbstractLifecycleComponent { private final ClusterStateTaskExecutor unbatchedExecutor; - ClusterStatePublisher clusterStatePublisher; + private ClusterStatePublisher clusterStatePublisher; + private Supplier clusterStateSupplier; private final String nodeName; - private java.util.function.Supplier clusterStateSupplier; - private volatile TimeValue slowTaskLoggingThreshold; private final TimeValue starvationLoggingThreshold; @@ -144,7 +143,7 @@ public synchronized void setClusterStatePublisher(ClusterStatePublisher publishe clusterStatePublisher = publisher; } - public synchronized void setClusterStateSupplier(java.util.function.Supplier clusterStateSupplier) { + public synchronized void setClusterStateSupplier(Supplier clusterStateSupplier) { this.clusterStateSupplier = clusterStateSupplier; } @@ -204,10 +203,12 @@ assert isMasterUpdateThread() == false private void executeAndPublishBatch( final ClusterStateTaskExecutor executor, final List> executionResults, - final BatchSummary summary + final BatchSummary summary, + final ActionListener listener ) { if (lifecycle.started() == false) { logger.debug("processing [{}]: ignoring, master service not started", summary); + listener.onResponse(null); return; } @@ -220,6 +221,7 @@ private void executeAndPublishBatch( executionResult.onBatchFailure(new NotMasterException("no longer master")); executionResult.notifyFailure(); } + listener.onResponse(null); return; } @@ -244,9 +246,41 @@ private void executeAndPublishBatch( final TimeValue executionTime = getTimeSince(notificationStartTime); logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); clusterStateUpdateStatsTracker.onUnchangedClusterState(computationTime.millis(), executionTime.millis()); + listener.onResponse(null); } else { + final long publicationStartTime = threadPool.rawRelativeTimeInMillis(); try (var ignored = threadPool.getThreadContext().newTraceContext()) { - publishClusterStateUpdate(executor, summary, previousClusterState, executionResults, newClusterState, computationTime); + final Task task = taskManager.register("master", STATE_UPDATE_ACTION_NAME, new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} + + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; + } + + @Override + public String getDescription() { + return "publication of cluster state [" + newClusterState.getVersion() + "]"; + } + }); + ActionListener.run(ActionListener.runAfter(listener, () -> taskManager.unregister(task)).delegateResponse((l, e) -> { + assert publicationMayFail() : e; + handleException(summary, publicationStartTime, newClusterState, e); + l.onResponse(null); + }), + l -> publishClusterStateUpdate( + executor, + summary, + previousClusterState, + executionResults, + newClusterState, + computationTime, + publicationStartTime, + task, + l + ) + ); } } } @@ -257,147 +291,140 @@ private void publishClusterStateUpdate( ClusterState previousClusterState, List> executionResults, ClusterState newClusterState, - TimeValue computationTime + TimeValue computationTime, + long publicationStartTime, + Task task, + ActionListener listener ) { - final Task task = taskManager.register("master", STATE_UPDATE_ACTION_NAME, new TaskAwareRequest() { - @Override - public void setParentTask(TaskId taskId) {} + if (logger.isTraceEnabled()) { + logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); + } else { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); + } - @Override - public TaskId getParentTask() { - return TaskId.EMPTY_TASK_ID; - } + final ClusterStatePublicationEvent clusterStatePublicationEvent = new ClusterStatePublicationEvent( + summary, + previousClusterState, + newClusterState, + task, + computationTime.millis(), + publicationStartTime + ); - @Override - public String getDescription() { - return "publication of cluster state [" + newClusterState.getVersion() + "]"; - } - }); - try { - if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); - } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); - } - final long publicationStartTime = threadPool.rawRelativeTimeInMillis(); - try { - final ClusterStatePublicationEvent clusterStatePublicationEvent = new ClusterStatePublicationEvent( + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = newClusterState.nodes().delta(previousClusterState.nodes()); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String nodesDeltaSummary = nodesDelta.shortSummary(); + if (nodesDeltaSummary.length() > 0) { + logger.info( + "{}, term: {}, version: {}, delta: {}", summary, - previousClusterState, - newClusterState, - task, - computationTime.millis(), - publicationStartTime + newClusterState.term(), + newClusterState.version(), + nodesDeltaSummary ); + } + } - // new cluster state, notify all listeners - final DiscoveryNodes.Delta nodesDelta = newClusterState.nodes().delta(previousClusterState.nodes()); - if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { - String nodesDeltaSummary = nodesDelta.shortSummary(); - if (nodesDeltaSummary.length() > 0) { - logger.info( - "{}, term: {}, version: {}, delta: {}", - summary, - newClusterState.term(), + logger.debug("publishing cluster state version [{}]", newClusterState.version()); + // initialize routing nodes and the indices lookup concurrently, we will need both of them for the cluster state + // application and can compute them while we wait for the other nodes during publication + newClusterState.initializeAsync(threadPool.generic()); + publish( + clusterStatePublicationEvent, + new CompositeTaskAckListener( + executionResults.stream() + .map(ExecutionResult::getContextPreservingAckListener) + .filter(Objects::nonNull) + .map( + contextPreservingAckListener -> new TaskAckListener( + contextPreservingAckListener, newClusterState.version(), - nodesDeltaSummary + newClusterState.nodes(), + threadPool + ) + ) + .toList() + ), + ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); + for (final var executionResult : executionResults) { + executionResult.onPublishSuccess(newClusterState); + } + + try { + executor.clusterStatePublished(newClusterState); + } catch (Exception e) { + logger.error( + () -> format("exception thrown while notifying executor of new cluster state publication [%s]", summary), + e ); } + final TimeValue executionTime = getTimeSince(notificationStartTime); + logExecutionTime( + executionTime, + "notify listeners on successful publication of cluster state (version: " + + newClusterState.version() + + ", uuid: " + + newClusterState.stateUUID() + + ')', + summary + ); + clusterStateUpdateStatsTracker.onPublicationSuccess( + threadPool.rawRelativeTimeInMillis(), + clusterStatePublicationEvent, + executionTime.millis() + ); } - logger.debug("publishing cluster state version [{}]", newClusterState.version()); - // initialize routing nodes and the indices lookup concurrently, we will need both of them for the cluster state - // application and can compute them while we wait for the other nodes during publication - newClusterState.initializeAsync(threadPool.generic()); - publish( - clusterStatePublicationEvent, - new CompositeTaskAckListener( - executionResults.stream() - .map(ExecutionResult::getContextPreservingAckListener) - .filter(Objects::nonNull) - .map( - contextPreservingAckListener -> new TaskAckListener( - contextPreservingAckListener, - newClusterState.version(), - newClusterState.nodes(), - threadPool - ) - ) - .toList() - ), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); - for (final var executionResult : executionResults) { - executionResult.onPublishSuccess(newClusterState); - } - - try { - executor.clusterStatePublished(newClusterState); - } catch (Exception e) { - logger.error( - () -> format( - "exception thrown while notifying executor of new cluster state publication [%s]", - summary - ), - e - ); - } - final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime( - executionTime, - "notify listeners on successful publication of cluster state (version: " - + newClusterState.version() - + ", uuid: " - + newClusterState.stateUUID() - + ')', - summary - ); - clusterStateUpdateStatsTracker.onPublicationSuccess( - threadPool.rawRelativeTimeInMillis(), - clusterStatePublicationEvent, - executionTime.millis() - ); - } - - @Override - public void onFailure(Exception exception) { - if (exception instanceof FailedToCommitClusterStateException failedToCommitClusterStateException) { - final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); - final long version = newClusterState.version(); - logger.warn( - () -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version), - exception - ); - for (final var executionResult : executionResults) { - executionResult.onPublishFailure(failedToCommitClusterStateException); - } - final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime; - clusterStateUpdateStatsTracker.onPublicationFailure( - threadPool.rawRelativeTimeInMillis(), - clusterStatePublicationEvent, - notificationMillis - ); - } else { - assert publicationMayFail() : exception; - clusterStateUpdateStatsTracker.onPublicationFailure( - threadPool.rawRelativeTimeInMillis(), - clusterStatePublicationEvent, - 0L - ); - handleException(summary, publicationStartTime, newClusterState, exception); - } + @Override + public void onFailure(Exception exception) { + if (exception instanceof FailedToCommitClusterStateException failedToCommitClusterStateException) { + final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); + final long version = newClusterState.version(); + logger.warn(() -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version), exception); + for (final var executionResult : executionResults) { + executionResult.onPublishFailure(failedToCommitClusterStateException); } + final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime; + clusterStateUpdateStatsTracker.onPublicationFailure( + threadPool.rawRelativeTimeInMillis(), + clusterStatePublicationEvent, + notificationMillis + ); + } else { + assert publicationMayFail() || (exception instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) + : exception; + clusterStateUpdateStatsTracker.onPublicationFailure( + threadPool.rawRelativeTimeInMillis(), + clusterStatePublicationEvent, + 0L + ); + handleException(summary, publicationStartTime, newClusterState, exception); } - ); - } catch (Exception e) { - assert publicationMayFail() : e; - handleException(summary, publicationStartTime, newClusterState, e); - } - } finally { - taskManager.unregister(task); - } + } + + @Override + public String toString() { + return Strings.format( + "publication completion listener for version [%d]", + clusterStatePublicationEvent.getNewState().version() + ); + } + }, new Runnable() { + @Override + public void run() { + listener.onResponse(null); + } + + @Override + public String toString() { + return listener + "/onResponse"; + } + }) + ); } protected boolean publicationMayFail() { @@ -413,17 +440,17 @@ protected void publish( ClusterStatePublisher.AckListener ackListener, ActionListener publicationListener ) { - final var fut = new PlainActionFuture() { - @Override - protected boolean blockingAllowed() { - return isMasterUpdateThread() || super.blockingAllowed(); - } - }; - clusterStatePublisher.publish(clusterStatePublicationEvent, fut, ackListener); - - ActionListener.completeWith( - publicationListener, - () -> FutureUtils.get(fut) // indefinitely wait for publication to complete + clusterStatePublisher.publish( + clusterStatePublicationEvent, + // Fork the completion of publicationListener back onto the master service thread, mainly for legacy reasons; note that this + // might be rejected if the MasterService shut down mid-publication. The master service thread remains idle until this listener + // is completed at the end of the publication, at which point the publicationListener performs various bits of cleanup and then + // picks up the next waiting task. + new ThreadedActionListener<>( + threadPoolExecutor, + new ContextPreservingActionListener<>(threadPool.getThreadContext().newRestorableContext(false), publicationListener) + ), + ackListener ); } @@ -1147,31 +1174,50 @@ public void doRun() { assert threadPool.getThreadContext().isSystemContext(); assert totalQueueSize.get() > 0; assert currentlyExecutingBatch == null; - final var nextBatch = takeNextBatch(); - assert currentlyExecutingBatch == nextBatch; - if (lifecycle.started()) { - nextBatch.run(); - } else { - nextBatch.onRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); - } + + ActionListener.run(new ActionListener() { + @Override + public void onResponse(Void unused) { + onCompletion(); + } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected exception executing queue entry", e); + assert false : e; + onCompletion(); + } + + @Override + public String toString() { + return "master service batch completion listener"; + } + }, batchCompletionListener -> { + final var nextBatch = takeNextBatch(); + assert currentlyExecutingBatch == nextBatch; + if (lifecycle.started()) { + nextBatch.run(batchCompletionListener); + } else { + nextBatch.onRejection(new FailedToCommitClusterStateException("node closed", getRejectionException())); + batchCompletionListener.onResponse(null); + } + }); } @Override public void onFailure(Exception e) { logger.error("unexpected exception executing queue entry", e); assert false : e; + onCompletion(); } - @Override - public void onAfter() { - if (currentlyExecutingBatch != null) { - currentlyExecutingBatch = null; - if (totalQueueSize.decrementAndGet() > 0) { - starvationWatcher.onNonemptyQueue(); - forkQueueProcessor(); - } else { - starvationWatcher.onEmptyQueue(); - } + private void onCompletion() { + currentlyExecutingBatch = null; + if (totalQueueSize.decrementAndGet() > 0) { + starvationWatcher.onNonemptyQueue(); + forkQueueProcessor(); + } else { + starvationWatcher.onEmptyQueue(); } } @@ -1271,7 +1317,7 @@ Priority priority() { private interface Batch { - void run(); + void run(ActionListener listener); /** * Called when the batch is rejected due to the master service shutting down. @@ -1333,7 +1379,12 @@ public MasterServiceTaskQueue createTask @FunctionalInterface private interface BatchConsumer { - void runBatch(ClusterStateTaskExecutor executor, List> tasks, BatchSummary summary); + void runBatch( + ClusterStateTaskExecutor executor, + List> tasks, + BatchSummary summary, + ActionListener listener + ); } private static class TaskTimeoutHandler extends AbstractRunnable { @@ -1504,7 +1555,7 @@ public void onRejection(FailedToCommitClusterStateException e) { } @Override - public void run() { + public void run(ActionListener listener) { assert executing.isEmpty() : executing; final var entryCount = queueSize.getAndSet(0); var taskCount = 0; @@ -1517,6 +1568,7 @@ public void run() { } } if (taskCount == 0) { + listener.onResponse(null); return; } final var finalTaskCount = taskCount; @@ -1526,12 +1578,10 @@ public void run() { new ExecutionResult<>(entry.source(), entry.task(), threadPool.getThreadContext(), entry.storedContextSupplier()) ); } - try { - batchConsumer.runBatch(executor, tasks, new BatchSummary(() -> buildTasksDescription(tasks))); - } finally { + ActionListener.run(ActionListener.runBefore(listener, () -> { assert executing.size() == finalTaskCount; executing.clear(); - } + }), l -> batchConsumer.runBatch(executor, tasks, new BatchSummary(() -> buildTasksDescription(tasks)), l)); } private String buildTasksDescription(List> tasks) { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index b1f6e78790780..2fbebcc7f4df4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -176,7 +176,8 @@ protected ExecutorService createThreadPoolExecutor() { masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> { clusterStateRef.set(clusterStatePublicationEvent.getNewState()); ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent); - publishListener.onResponse(null); + threadPool.executor(randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC)) + .execute(() -> publishListener.onResponse(null)); }); masterService.setClusterStateSupplier(clusterStateRef::get); masterService.start(); @@ -2365,7 +2366,7 @@ public void taskSucceeded(ClusterStateUpdateTask clusterStateTaskListener, Void public ClusterState execute(ClusterState currentState) { assertEquals(priority, prioritiesQueue.poll()); assertEquals(priority, priority()); - return currentState; + return randomBoolean() ? currentState : ClusterState.builder(currentState).build(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index ede6d5dc1e9ed..0122a0acbd436 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -7,8 +7,6 @@ */ package org.elasticsearch.cluster.service; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStatePublicationEvent; @@ -16,45 +14,30 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.StoppableExecutorServiceWrapper; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.lucene.tests.util.LuceneTestCase.random; -import static org.elasticsearch.test.ESTestCase.randomInt; public class FakeThreadPoolMasterService extends MasterService { - private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class); - private final String name; - private final List pendingTasks = new ArrayList<>(); - private final Consumer onTaskAvailableToRun; - private boolean scheduledNextTask = false; - private boolean taskInProgress = false; - private boolean waitForPublish = false; + private final Consumer taskExecutor; + private final ThreadContext threadContext; - public FakeThreadPoolMasterService( - String nodeName, - String serviceName, - ThreadPool threadPool, - Consumer onTaskAvailableToRun - ) { + public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, Consumer taskExecutor) { this( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool, - serviceName, - onTaskAvailableToRun + taskExecutor ); } @@ -62,76 +45,23 @@ private FakeThreadPoolMasterService( Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, - String serviceName, - Consumer onTaskAvailableToRun + Consumer taskExecutor ) { super(settings, clusterSettings, threadPool, new TaskManager(settings, threadPool, Set.of())); - this.name = serviceName; - this.onTaskAvailableToRun = onTaskAvailableToRun; + this.taskExecutor = taskExecutor; + this.threadContext = threadPool.getThreadContext(); } @Override protected ExecutorService createThreadPoolExecutor() { - return new PrioritizedEsThreadPoolExecutor( - name, - 1, - 1, - 1, - TimeUnit.SECONDS, - r -> { throw new AssertionError("should not create new threads"); }, - null, - null - ) { - - @Override - public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { - execute(command); - } - + return new StoppableExecutorServiceWrapper(EsExecutors.DIRECT_EXECUTOR_SERVICE) { @Override public void execute(Runnable command) { - if (command.toString().equals("awaitsfix thread keepalive") == false) { - // TODO remove this temporary fix - pendingTasks.add(command); - scheduleNextTaskIfNecessary(); - } + taskExecutor.accept(threadContext.preserveContext(command)); } }; } - private void scheduleNextTaskIfNecessary() { - if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) { - scheduledNextTask = true; - onTaskAvailableToRun.accept(new Runnable() { - @Override - public String toString() { - return "master service scheduling next task"; - } - - @Override - public void run() { - assert taskInProgress == false; - assert waitForPublish == false; - assert scheduledNextTask; - final int taskIndex = randomInt(pendingTasks.size() - 1); - logger.debug("next master service task: choosing task {} of {}", taskIndex, pendingTasks.size()); - final Runnable task = pendingTasks.remove(taskIndex); - taskInProgress = true; - scheduledNextTask = false; - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.markAsSystemContext(); - task.run(); - } - if (waitForPublish == false) { - taskInProgress = false; - } - FakeThreadPoolMasterService.this.scheduleNextTaskIfNecessary(); - } - }); - } - } - @Override public ClusterState.Builder incrementVersion(ClusterState clusterState) { // generate cluster UUID deterministically for repeatable tests @@ -144,44 +74,11 @@ protected void publish( AckListener ackListener, ActionListener publicationListener ) { - assert waitForPublish == false; - waitForPublish = true; - final ActionListener publishListener = new ActionListener<>() { - - private boolean listenerCalled = false; - - @Override - public void onResponse(Void aVoid) { - assert listenerCalled == false; - listenerCalled = true; - assert waitForPublish; - waitForPublish = false; - try { - publicationListener.onResponse(null); - } finally { - taskInProgress = false; - scheduleNextTaskIfNecessary(); - } - } - - @Override - public void onFailure(Exception e) { - assert listenerCalled == false; - listenerCalled = true; - assert waitForPublish; - waitForPublish = false; - try { - publicationListener.onFailure(e); - } finally { - taskInProgress = false; - scheduleNextTaskIfNecessary(); - } - } - }; + // fork the publication to add a little extra room for concurrent activity here threadPool.generic().execute(threadPool.getThreadContext().preserveContext(new Runnable() { @Override public void run() { - clusterStatePublisher.publish(clusterStatePublicationEvent, publishListener, wrapAckListener(ackListener)); + FakeThreadPoolMasterService.super.publish(clusterStatePublicationEvent, wrapAckListener(ackListener), publicationListener); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index 130ca526ec786..202bae6177353 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -97,7 +97,7 @@ public void onFailure(Exception e) { assertFalse(firstTaskCompleted.get()); final Runnable scheduleTask = runnableTasks.remove(0); - assertThat(scheduleTask, hasToString("master service scheduling next task")); + assertThat(scheduleTask, hasToString("master service queue processor")); scheduleTask.run(); // run tasks for computing routing nodes and indices lookup @@ -137,6 +137,7 @@ public void onFailure(Exception e) { assertThat(runnableTasks.size(), equalTo(0)); publishingCallback.getAndSet(null).onResponse(null); + runnableTasks.remove(0).run(); // complete publication back on master service thread assertTrue(firstTaskCompleted.get()); assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued @@ -151,6 +152,7 @@ public void onFailure(Exception e) { assertNotNull(publishingCallback.get()); assertFalse(secondTaskCompleted.get()); publishingCallback.getAndSet(null).onResponse(null); + runnableTasks.remove(0).run(); // complete publication back on master service thread assertTrue(secondTaskCompleted.get()); assertThat(runnableTasks.size(), equalTo(0)); // check that no more tasks are queued }