Skip to content

Commit 12b21ee

Browse files
authored
Remove the rollup thread pool (#65958) (#66463)
This commit removes the rollup thread pool usage in the RollupAsyncIndexer that is used by the legacy rollup and transforms. All actions performed by this indexer are asynchronous so using the generic thread pool to fire search and bulk requests is ok.
1 parent de9ddc5 commit 12b21ee

File tree

19 files changed

+124
-129
lines changed

19 files changed

+124
-129
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, ValidateQu
152152
}
153153

154154
@Override
155-
protected ValidateQueryResponse newResponse(ValidateQueryRequest request, AtomicReferenceArray shardsResponses,
155+
protected ValidateQueryResponse newResponse(ValidateQueryRequest request, AtomicReferenceArray<?> shardsResponses,
156156
ClusterState clusterState) {
157157
int successfulShards = 0;
158158
int failedShards = 0;

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8383
new AsyncBroadcastAction(task, request, listener).start();
8484
}
8585

86-
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
86+
protected abstract Response newResponse(Request request, AtomicReferenceArray<?> shardsResponses, ClusterState clusterState);
8787

8888
protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);
8989

@@ -103,15 +103,15 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
103103

104104
protected class AsyncBroadcastAction {
105105

106-
private final Task task;
107-
private final Request request;
108-
private final ActionListener<Response> listener;
109-
private final ClusterState clusterState;
110-
private final DiscoveryNodes nodes;
111-
private final GroupShardsIterator<ShardIterator> shardsIts;
112-
private final int expectedOps;
113-
private final AtomicInteger counterOps = new AtomicInteger();
114-
private final AtomicReferenceArray shardsResponses;
106+
final Task task;
107+
final Request request;
108+
final ActionListener<Response> listener;
109+
final ClusterState clusterState;
110+
final DiscoveryNodes nodes;
111+
final GroupShardsIterator<ShardIterator> shardsIts;
112+
final int expectedOps;
113+
final AtomicInteger counterOps = new AtomicInteger();
114+
protected final AtomicReferenceArray shardsResponses;
115115

116116
protected AsyncBroadcastAction(Task task, Request request, ActionListener<Response> listener) {
117117
this.task = task;
@@ -239,6 +239,10 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
239239
}
240240
}
241241

242+
protected AtomicReferenceArray shardsResponses() {
243+
return shardsResponses;
244+
}
245+
242246
protected void finishHim() {
243247
try {
244248
listener.onResponse(newResponse(request, shardsResponses, clusterState));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
5656
private final AtomicReference<IndexerState> state;
5757
private final AtomicReference<JobPosition> position;
5858
private final ThreadPool threadPool;
59-
private final String executorName;
6059

6160
// throttling implementation
6261
private volatile float currentMaxDocsPerSecond;
@@ -69,27 +68,25 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
6968
*/
7069
class ScheduledRunnable {
7170
private final ThreadPool threadPool;
72-
private final String executorName;
7371
private final Runnable command;
7472
private Scheduler.ScheduledCancellable scheduled;
7573

76-
ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) {
74+
ScheduledRunnable(ThreadPool threadPool, TimeValue delay, Runnable command) {
7775
this.threadPool = threadPool;
78-
this.executorName = executorName;
7976

8077
// with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the
8178
// future is already running and cancel returns true
8279
this.command = new RunOnce(command);
83-
this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName);
80+
this.scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
8481
}
8582

8683
public void reschedule(TimeValue delay) {
8784
// note: cancel return true if the runnable is currently executing
8885
if (scheduled.cancel()) {
8986
if (delay.duration() > 0) {
90-
scheduled = threadPool.schedule(() -> command.run(), delay, executorName);
87+
scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
9188
} else {
92-
threadPool.executor(executorName).execute(() -> command.run());
89+
threadPool.executor(ThreadPool.Names.GENERIC).execute(command::run);
9390
}
9491
}
9592
}
@@ -98,13 +95,11 @@ public void reschedule(TimeValue delay) {
9895

9996
protected AsyncTwoPhaseIndexer(
10097
ThreadPool threadPool,
101-
String executorName,
10298
AtomicReference<IndexerState> initialState,
10399
JobPosition initialPosition,
104100
JobStats jobStats
105101
) {
106102
this.threadPool = threadPool;
107-
this.executorName = executorName;
108103
this.state = initialState;
109104
this.position = new AtomicReference<>(initialPosition);
110105
this.stats = jobStats;
@@ -215,7 +210,7 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
215210

216211
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
217212
// fire off the search. Note this is async, the method will return from here
218-
threadPool.executor(executorName).execute(() -> {
213+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
219214
onStart(now, ActionListener.wrap(r -> {
220215
assert r != null;
221216
if (r) {
@@ -576,7 +571,6 @@ protected void nextSearch() {
576571
);
577572
scheduledNextSearch = new ScheduledRunnable(
578573
threadPool,
579-
executorName,
580574
executionDelay,
581575
() -> triggerNextSearch(executionDelay.getNanos())
582576
);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/v2/RollupAction.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,20 @@
66
package org.elasticsearch.xpack.core.rollup.v2;
77

88

9-
import org.elasticsearch.action.ActionRequest;
109
import org.elasticsearch.action.ActionRequestBuilder;
1110
import org.elasticsearch.action.ActionRequestValidationException;
12-
import org.elasticsearch.action.ActionResponse;
1311
import org.elasticsearch.action.ActionType;
12+
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
13+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
14+
import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
15+
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
1416
import org.elasticsearch.client.ElasticsearchClient;
1517
import org.elasticsearch.common.io.stream.StreamInput;
1618
import org.elasticsearch.common.io.stream.StreamOutput;
1719
import org.elasticsearch.common.io.stream.Writeable;
1820
import org.elasticsearch.common.xcontent.ToXContentObject;
1921
import org.elasticsearch.common.xcontent.XContentBuilder;
22+
import org.elasticsearch.index.shard.ShardId;
2023
import org.elasticsearch.tasks.Task;
2124
import org.elasticsearch.tasks.TaskId;
2225

@@ -25,15 +28,14 @@
2528
import java.util.Objects;
2629

2730
public class RollupAction extends ActionType<RollupAction.Response> {
28-
2931
public static final RollupAction INSTANCE = new RollupAction();
3032
public static final String NAME = "cluster:admin/xpack/rollup/action";
3133

3234
private RollupAction() {
3335
super(NAME, RollupAction.Response::new);
3436
}
3537

36-
public static class Request extends ActionRequest implements ToXContentObject {
38+
public static class Request extends BroadcastRequest<Request> implements ToXContentObject {
3739
private String sourceIndex;
3840
private String rollupIndex;
3941
private RollupActionConfig rollupConfig;
@@ -44,7 +46,8 @@ public Request(String sourceIndex, String rollupIndex, RollupActionConfig rollup
4446
this.rollupConfig = rollupConfig;
4547
}
4648

47-
public Request() {}
49+
public Request() {
50+
}
4851

4952
public Request(StreamInput in) throws IOException {
5053
super(in);
@@ -120,8 +123,7 @@ protected RequestBuilder(ElasticsearchClient client, RollupAction action) {
120123
}
121124
}
122125

123-
public static class Response extends ActionResponse implements Writeable, ToXContentObject {
124-
126+
public static class Response extends BroadcastResponse implements Writeable, ToXContentObject {
125127
private final boolean created;
126128

127129
public Response(boolean created) {
@@ -163,4 +165,38 @@ public int hashCode() {
163165
return Objects.hash(created);
164166
}
165167
}
168+
169+
public static class ShardRequest extends BroadcastShardRequest {
170+
private final Request request;
171+
172+
public ShardRequest(StreamInput in) throws IOException {
173+
super(in);
174+
this.request = new Request(in);
175+
}
176+
177+
public ShardRequest(ShardId shardId, Request request) {
178+
super(shardId, request);
179+
this.request = request;
180+
}
181+
182+
public RollupActionConfig getRollupConfig() {
183+
return request.getRollupConfig();
184+
}
185+
186+
@Override
187+
public void writeTo(StreamOutput out) throws IOException {
188+
super.writeTo(out);
189+
request.writeTo(out);
190+
}
191+
}
192+
193+
public static class ShardResponse extends BroadcastShardResponse {
194+
public ShardResponse(StreamInput in) throws IOException {
195+
super(in);
196+
}
197+
198+
public ShardResponse(ShardId shardId) {
199+
super(shardId);
200+
}
201+
}
166202
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
5656
private volatile int step;
5757
private final boolean stoppedBeforeFinished;
5858

59-
protected MockIndexer(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
59+
protected MockIndexer(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
6060
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) {
61-
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
61+
super(threadPool, initialState, initialPosition, new MockJobStats());
6262
this.latch = latch;
6363
this.stoppedBeforeFinished = stoppedBeforeFinished;
6464
}
@@ -160,9 +160,9 @@ private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer<Integer, MockJobS
160160
private volatile int processOps = 0;
161161
private volatile int bulkOps = 0;
162162

163-
protected MockIndexerFiveRuns(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
163+
protected MockIndexerFiveRuns(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
164164
Integer initialPosition, float maxDocsPerSecond, CountDownLatch latch) {
165-
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
165+
super(threadPool, initialState, initialPosition, new MockJobStats());
166166
startTime = System.nanoTime();
167167
this.latch = latch;
168168
this.maxDocsPerSecond = maxDocsPerSecond;
@@ -288,7 +288,7 @@ private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer<Integer,
288288

289289
protected MockIndexerThrowsFirstSearch(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
290290
Integer initialPosition) {
291-
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
291+
super(threadPool, initialState, initialPosition, new MockJobStats());
292292
}
293293

294294
@Override
@@ -379,7 +379,7 @@ public void testStateMachine() throws Exception {
379379
final ThreadPool threadPool = new TestThreadPool(getTestName());
380380
try {
381381
CountDownLatch countDownLatch = new CountDownLatch(1);
382-
MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, false);
382+
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false);
383383
indexer.start();
384384
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
385385
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -422,7 +422,7 @@ public void testStop_WhileIndexing() throws Exception {
422422
final ThreadPool threadPool = new TestThreadPool(getTestName());
423423
try {
424424
CountDownLatch countDownLatch = new CountDownLatch(1);
425-
MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, true);
425+
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true);
426426
indexer.start();
427427
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
428428
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -466,7 +466,7 @@ public void doTestFiveRuns(float docsPerSecond, Collection<TimeValue> expectedDe
466466
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
467467
final MockThreadPool threadPool = new MockThreadPool(getTestName());
468468
try {
469-
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
469+
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, state, 2, docsPerSecond,
470470
null);
471471
indexer.start();
472472
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
@@ -505,7 +505,7 @@ public void doTestFiveRunsRethrottle(
505505
final MockThreadPool threadPool = new MockThreadPool(getTestName());
506506
try {
507507
CountDownLatch latch = new CountDownLatch(1);
508-
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
508+
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, state, 2, docsPerSecond,
509509
latch);
510510
indexer.start();
511511
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import org.elasticsearch.rest.RestHandler;
3333
import org.elasticsearch.rollup.RollupV2;
3434
import org.elasticsearch.script.ScriptService;
35-
import org.elasticsearch.threadpool.ExecutorBuilder;
36-
import org.elasticsearch.threadpool.FixedExecutorBuilder;
3735
import org.elasticsearch.threadpool.ThreadPool;
3836
import org.elasticsearch.watcher.ResourceWatcherService;
3937
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -88,7 +86,6 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
8886
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;
8987

9088
public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
91-
public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";
9289

9390
public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";
9491

@@ -166,18 +163,6 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
166163
return actions;
167164
}
168165

169-
@Override
170-
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
171-
if (transportClientMode) {
172-
return emptyList();
173-
}
174-
175-
FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, Rollup.TASK_THREAD_POOL_NAME,
176-
4, 4, "xpack.rollup.task_thread_pool");
177-
178-
return Collections.singletonList(indexing);
179-
}
180-
181166
@Override
182167
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
183168
ThreadPool threadPool,

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,30 +68,28 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
6868
/**
6969
* Ctr
7070
* @param threadPool ThreadPool to use to fire the first request of a background job.
71-
* @param executorName Name of the executor to use to fire the first request of a background job.
7271
* @param job The rollup job
7372
* @param initialState Initial state for the indexer
7473
* @param initialPosition The last indexed bucket of the task
7574
* @param upgradedDocumentID whether job has updated IDs (for BWC)
7675
*/
77-
RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
76+
RollupIndexer(ThreadPool threadPool, RollupJob job, AtomicReference<IndexerState> initialState,
7877
Map<String, Object> initialPosition, AtomicBoolean upgradedDocumentID) {
79-
this(threadPool, executorName, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
78+
this(threadPool, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
8079
}
8180

8281
/**
8382
* Ctr
8483
* @param threadPool ThreadPool to use to fire the first request of a background job.
85-
* @param executorName Name of the executor to use to fire the first request of a background job.
8684
* @param job The rollup job
8785
* @param initialState Initial state for the indexer
8886
* @param initialPosition The last indexed bucket of the task
8987
* @param upgradedDocumentID whether job has updated IDs (for BWC)
9088
* @param jobStats jobstats instance for collecting stats
9189
*/
92-
RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
90+
RollupIndexer(ThreadPool threadPool, RollupJob job, AtomicReference<IndexerState> initialState,
9391
Map<String, Object> initialPosition, AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
94-
super(threadPool, executorName, initialState, initialPosition, jobStats);
92+
super(threadPool, initialState, initialPosition, jobStats);
9593
this.job = job;
9694
this.compositeBuilder = createCompositeBuilder(job.getConfig());
9795
this.upgradedDocumentID = upgradedDocumentID;

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
3737
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
3838
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
39-
import org.elasticsearch.xpack.rollup.Rollup;
4039

4140
import java.util.Map;
4241
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,7 +58,7 @@ public static class RollupJobPersistentTasksExecutor extends PersistentTasksExec
5958
private final ThreadPool threadPool;
6059

6160
public RollupJobPersistentTasksExecutor(Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool) {
62-
super(RollupField.TASK_NAME, Rollup.TASK_THREAD_POOL_NAME);
61+
super(RollupField.TASK_NAME, ThreadPool.Names.GENERIC);
6362
this.client = client;
6463
this.schedulerEngine = schedulerEngine;
6564
this.threadPool = threadPool;
@@ -102,7 +101,7 @@ protected class ClientRollupPageManager extends RollupIndexer {
102101

103102
ClientRollupPageManager(RollupJob job, IndexerState initialState, Map<String, Object> initialPosition,
104103
Client client, AtomicBoolean upgradedDocumentID) {
105-
super(threadPool, ThreadPool.Names.GENERIC, job, new AtomicReference<>(initialState),
104+
super(threadPool, job, new AtomicReference<>(initialState),
106105
initialPosition, upgradedDocumentID);
107106
this.client = client;
108107
this.job = job;

0 commit comments

Comments
 (0)