Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
3606365
Reduce mocking in ClusterStateChanges
DaveCTurner Apr 8, 2022
d113152
Dedicated API for unbatched master tasks
DaveCTurner Apr 8, 2022
ffc3a97
Radix-sorted, lookup-free, typesafe, nonblocking, priority-boosting p…
DaveCTurner Apr 8, 2022
e26a000
WIP working towards one processor to rule them all
DaveCTurner Apr 9, 2022
ae8daaa
Track insertion time/order
DaveCTurner Apr 10, 2022
da05096
Fix unbatched timeout handler
DaveCTurner Apr 10, 2022
143683d
TaskTimeoutHandler
DaveCTurner Apr 10, 2022
f3ec22a
Drop timedOut flag on unbatched tasks
DaveCTurner Apr 10, 2022
ca6ee19
Merge branch 'master' into 2022-04-07-master-service-new-queues
DaveCTurner Apr 26, 2022
924b19f
Fixup
DaveCTurner Apr 26, 2022
facbd4d
Moar fix
DaveCTurner Apr 26, 2022
f8350bb
Moar fix
DaveCTurner Apr 26, 2022
3bbfadb
Moar fix
DaveCTurner Apr 26, 2022
7cff962
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 26, 2022
d0e0d7f
Spotless
DaveCTurner Sep 26, 2022
2db9298
Misc fixups
DaveCTurner Sep 26, 2022
4bfc703
Moar fixup
DaveCTurner Sep 26, 2022
d5e7ef3
Moar fixup
DaveCTurner Sep 26, 2022
95f1ab9
Moar fixup
DaveCTurner Sep 26, 2022
db0b905
Precommit fixup
DaveCTurner Sep 26, 2022
ad3415a
Misc fixes
DaveCTurner Sep 27, 2022
037df5d
Missing thread context
DaveCTurner Sep 27, 2022
59baacd
WIP on MasterServiceTests
DaveCTurner Sep 27, 2022
092fb6a
Fix MasterServiceTests
DaveCTurner Sep 27, 2022
b7f5f07
Revert TestLogging
DaveCTurner Sep 27, 2022
9f9c5bf
Test fixes
DaveCTurner Sep 27, 2022
fd3b0be
More test fixes
DaveCTurner Sep 27, 2022
3512d47
Workaround mock mess
DaveCTurner Sep 27, 2022
5e8d9da
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 27, 2022
aef7fb1
Expect more kinds of failure
DaveCTurner Sep 27, 2022
c80fa58
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 27, 2022
d9899fc
Fix more tests
DaveCTurner Sep 27, 2022
c77c6cb
Fix more tests
DaveCTurner Sep 27, 2022
1ee5295
Revert
DaveCTurner Sep 27, 2022
c41ed76
Fix more tests
DaveCTurner Sep 27, 2022
d405c27
Reinstate tests
DaveCTurner Sep 27, 2022
afa6160
Fix warning
DaveCTurner Sep 27, 2022
b9d8a0e
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 27, 2022
d9f3b17
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 27, 2022
f240ca4
More robust delay
DaveCTurner Sep 27, 2022
6521ebe
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Sep 28, 2022
50b3e40
Clean up some TODOs
DaveCTurner Sep 28, 2022
6268e72
Inline
DaveCTurner Sep 28, 2022
88e487c
Test fixes
DaveCTurner Sep 28, 2022
e58250a
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Nov 28, 2022
6b6a279
Spotless
DaveCTurner Nov 28, 2022
939cbfa
Fixup after merge
DaveCTurner Nov 28, 2022
d66f673
Test fix
DaveCTurner Nov 28, 2022
929fb66
TODO temp fix
DaveCTurner Nov 28, 2022
8e2f21f
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Nov 28, 2022
bd51e30
Make UnbatchedExecutor a class
DaveCTurner Nov 29, 2022
391dbd5
Comments
DaveCTurner Nov 29, 2022
315b2b5
Lazy BatchSummary again
DaveCTurner Nov 29, 2022
d8d61f5
Move constant
DaveCTurner Nov 29, 2022
686a89a
Lazy grouping of tasks by source too
DaveCTurner Nov 29, 2022
de02f27
Test already exists
DaveCTurner Nov 29, 2022
b3c0643
Tests for pending tasks APIs
DaveCTurner Nov 29, 2022
b3e21f2
Test behaviour on rejection/close
DaveCTurner Nov 29, 2022
8240db9
Fix masterService.getMaxTaskWaitTime
DaveCTurner Nov 29, 2022
d25e6b8
Fix tests that shut down master service too early
DaveCTurner Nov 29, 2022
3aa2260
Add test for timeouts
DaveCTurner Nov 29, 2022
343d76b
Migrate starvation watching
DaveCTurner Nov 29, 2022
4e459e4
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Nov 29, 2022
bf5c34e
Trivial reverts
DaveCTurner Nov 29, 2022
4c38603
Move
DaveCTurner Nov 29, 2022
a325d1d
Merge branch 'main' into 2022-04-07-master-service-new-queues
DaveCTurner Nov 29, 2022
a6bc582
Catch exceptions from failure handler
DaveCTurner Nov 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -50,13 +50,14 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
volatile TimeValue pollInterval;
volatile Scheduler.Cancellable job;
private final AtomicBoolean running = new AtomicBoolean(false);
private final ClusterStateTaskExecutor<UpdateTimeSeriesTask> taskExecutor = new UpdateTimeSeriesExecutor();
private final MasterServiceTaskQueue<UpdateTimeSeriesTask> taskQueue;

UpdateTimeSeriesRangeService(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
this.pollInterval = DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.get(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL, this::setPollInterval);
this.taskQueue = clusterService.getTaskQueue("update-time-series-range", Priority.URGENT, new UpdateTimeSeriesExecutor());
}

void perform(Runnable onComplete) {
Expand All @@ -69,8 +70,7 @@ void perform(Runnable onComplete) {
running.set(false);
onComplete.run();
});
var config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTask("update_tsdb_data_stream_end_times", task, config, taskExecutor);
taskQueue.submitTask("update_tsdb_data_stream_end_times", task, null);
} else {
LOGGER.debug("not starting tsdb update task, because another execution is still running");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER
threadPool.scheduler()
);
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
Expand Down Expand Up @@ -208,8 +207,7 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER
threadPool.scheduler()
);
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -28,17 +27,12 @@ public class AutoCreateIndexIT extends ESIntegTestCase {
public void testBatchingWithDeprecationWarnings() throws Exception {
final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);
masterNodeClusterService.submitStateUpdateTask(
"block",
e -> { assert false : e; },
ClusterStateTaskConfig.build(Priority.NORMAL),
batchExecutionContext -> {
barrier.await(10, TimeUnit.SECONDS);
barrier.await(10, TimeUnit.SECONDS);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}
);
masterNodeClusterService.getTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
barrier.await(10, TimeUnit.SECONDS);
barrier.await(10, TimeUnit.SECONDS);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}).submitTask("block", e -> { assert false : e; }, null);

barrier.await(10, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -25,6 +26,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -414,11 +416,7 @@ public void onFailure(Exception e) {
});
}

final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 0) {
// noinspection BusyWait
Thread.sleep(100);
}
waitForTimeToElapse();

pendingClusterTasks = clusterService.getMasterService().pendingTasks();
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
Expand All @@ -441,4 +439,28 @@ public void onFailure(Exception e) {
block2.countDown();
}
}

private static void waitForTimeToElapse() throws InterruptedException {
final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false)
.map(ClusterService::threadPool)
.toArray(ThreadPool[]::new);
final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray();

final var startNanoTime = System.nanoTime();
while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) {
// noinspection BusyWait
Thread.sleep(100);
}

outer: do {
for (int i = 0; i < threadPools.length; i++) {
if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) {
// noinspection BusyWait
Thread.sleep(100);
continue outer;
}
}
return;
} while (true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Tuple;
Expand All @@ -31,7 +30,7 @@

public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction<DeleteDesiredNodesAction.Request, ActionResponse.Empty> {

private final ClusterStateTaskExecutor<DeleteDesiredNodesTask> taskExecutor = new DeleteDesiredNodesExecutor();
private final MasterServiceTaskQueue<DeleteDesiredNodesTask> taskQueue;

@Inject
public TransportDeleteDesiredNodesAction(
Expand All @@ -52,6 +51,7 @@ public TransportDeleteDesiredNodesAction(
in -> ActionResponse.Empty.INSTANCE,
ThreadPool.Names.SAME
);
this.taskQueue = clusterService.getTaskQueue("delete-desired-nodes", Priority.HIGH, new DeleteDesiredNodesExecutor());
}

@Override
Expand All @@ -61,12 +61,7 @@ protected void masterOperation(
ClusterState state,
ActionListener<ActionResponse.Empty> listener
) throws Exception {
clusterService.submitStateUpdateTask(
"delete-desired-nodes",
new DeleteDesiredNodesTask(listener),
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
taskExecutor
);
taskQueue.submitTask("delete-desired-nodes", new DeleteDesiredNodesTask(listener), request.masterNodeTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -41,7 +41,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
private static final Logger logger = LogManager.getLogger(TransportUpdateDesiredNodesAction.class);

private final DesiredNodesSettingsValidator settingsValidator;
private final ClusterStateTaskExecutor<UpdateDesiredNodesTask> taskExecutor;
private final MasterServiceTaskQueue<UpdateDesiredNodesTask> taskQueue;

@Inject
public TransportUpdateDesiredNodesAction(
Expand All @@ -66,7 +66,11 @@ public TransportUpdateDesiredNodesAction(
ThreadPool.Names.SAME
);
this.settingsValidator = settingsValidator;
this.taskExecutor = new UpdateDesiredNodesExecutor(clusterService.getRerouteService(), allocationService);
this.taskQueue = clusterService.getTaskQueue(
"update-desired-nodes",
Priority.URGENT,
new UpdateDesiredNodesExecutor(clusterService.getRerouteService(), allocationService)
);
}

@Override
Expand All @@ -83,12 +87,7 @@ protected void masterOperation(
) throws Exception {
try {
settingsValidator.validate(request.getNodes());
clusterService.submitStateUpdateTask(
"update-desired-nodes",
new UpdateDesiredNodesTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
taskExecutor
);
taskQueue.submitTask("update-desired-nodes", new UpdateDesiredNodesTask(request, listener), request.masterNodeTimeout());
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.LocalMasterServiceTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -163,13 +165,20 @@ public void onFailure(Exception e) {
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
final Level level = e instanceof NotMasterException ? Level.TRACE : Level.ERROR;
assert e instanceof NotMasterException : e; // task cannot fail, nor will it trigger a publication which fails
final Level level = isExpectedFailure(e) ? Level.TRACE : Level.ERROR;
logger.log(level, () -> "unexpected failure during [" + source + "]", e);
assert isExpectedFailure(e) : e; // task cannot fail, nor will it trigger a publication which fails
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(e);
}
}

static boolean isExpectedFailure(Exception e) {
return e instanceof NotMasterException
|| e instanceof FailedToCommitClusterStateException
&& e.getCause()instanceof EsRejectedExecutionException esre
&& esre.isExecutorShutdown();
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -36,6 +35,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -77,7 +77,7 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

private final ClusterStateTaskExecutor<CreateIndexTask> executor;
private final MasterServiceTaskQueue<CreateIndexTask> taskQueue;

@Inject
public TransportAction(
Expand Down Expand Up @@ -107,7 +107,7 @@ public TransportAction(
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
this.autoCreateIndex = autoCreateIndex;
this.executor = batchExecutionContext -> {
this.taskQueue = clusterService.getTaskQueue("auto-create", Priority.URGENT, batchExecutionContext -> {
final var listener = new AllocationActionMultiListener<CreateIndexResponse>(threadPool.getThreadContext());
final var taskContexts = batchExecutionContext.taskContexts();
final var successfulRequests = Maps.<CreateIndexRequest, String>newMapWithExpectedSize(taskContexts.size());
Expand All @@ -129,7 +129,7 @@ public TransportAction(
listener.noRerouteNeeded();
}
return state;
};
});
}

@Override
Expand All @@ -139,11 +139,10 @@ protected void masterOperation(
ClusterState state,
ActionListener<CreateIndexResponse> listener
) {
clusterService.submitStateUpdateTask(
taskQueue.submitTask(
"auto create [" + request.index() + "]",
new CreateIndexTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
executor
request.masterNodeTimeout()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -35,6 +34,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class);

private final Client client;
private final RolloverExecutor rolloverTaskExecutor;
private final MasterServiceTaskQueue<RolloverTask> rolloverTaskQueue;

@Inject
public TransportRolloverAction(
Expand All @@ -89,7 +89,11 @@ public TransportRolloverAction(
ThreadPool.Names.SAME
);
this.client = client;
this.rolloverTaskExecutor = new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool);
this.rolloverTaskQueue = clusterService.getTaskQueue(
"rollover",
Priority.NORMAL,
new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool)
);
}

@Override
Expand Down Expand Up @@ -173,8 +177,7 @@ protected void masterOperation(
if (rolloverRequest.areConditionsMet(trialConditionResults)) {
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor);
rolloverTaskQueue.submitTask(source, rolloverTask, rolloverRequest.masterNodeTimeout());
} else {
// conditions not met
listener.onResponse(trialRolloverResponse);
Expand Down
Loading