Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, ValidateQu
}

@Override
protected ValidateQueryResponse newResponse(ValidateQueryRequest request, AtomicReferenceArray shardsResponses,
protected ValidateQueryResponse newResponse(ValidateQueryRequest request, AtomicReferenceArray<?> shardsResponses,
ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new AsyncBroadcastAction(task, request, listener).start();
}

protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
protected abstract Response newResponse(Request request, AtomicReferenceArray<?> shardsResponses, ClusterState clusterState);

protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);

Expand All @@ -103,15 +103,15 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li

protected class AsyncBroadcastAction {

private final Task task;
private final Request request;
private final ActionListener<Response> listener;
private final ClusterState clusterState;
private final DiscoveryNodes nodes;
private final GroupShardsIterator<ShardIterator> shardsIts;
private final int expectedOps;
private final AtomicInteger counterOps = new AtomicInteger();
private final AtomicReferenceArray shardsResponses;
final Task task;
final Request request;
final ActionListener<Response> listener;
final ClusterState clusterState;
final DiscoveryNodes nodes;
final GroupShardsIterator<ShardIterator> shardsIts;
final int expectedOps;
final AtomicInteger counterOps = new AtomicInteger();
protected final AtomicReferenceArray shardsResponses;

protected AsyncBroadcastAction(Task task, Request request, ActionListener<Response> listener) {
this.task = task;
Expand Down Expand Up @@ -239,6 +239,10 @@ void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int
}
}

protected AtomicReferenceArray shardsResponses() {
return shardsResponses;
}

protected void finishHim() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
private final AtomicReference<IndexerState> state;
private final AtomicReference<JobPosition> position;
private final ThreadPool threadPool;
private final String executorName;

// throttling implementation
private volatile float currentMaxDocsPerSecond;
Expand All @@ -69,27 +68,25 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
*/
class ScheduledRunnable {
private final ThreadPool threadPool;
private final String executorName;
private final Runnable command;
private Scheduler.ScheduledCancellable scheduled;

ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) {
ScheduledRunnable(ThreadPool threadPool, TimeValue delay, Runnable command) {
this.threadPool = threadPool;
this.executorName = executorName;

// with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the
// future is already running and cancel returns true
this.command = new RunOnce(command);
this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName);
this.scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
}

public void reschedule(TimeValue delay) {
// note: cancel return true if the runnable is currently executing
if (scheduled.cancel()) {
if (delay.duration() > 0) {
scheduled = threadPool.schedule(() -> command.run(), delay, executorName);
scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
} else {
threadPool.executor(executorName).execute(() -> command.run());
threadPool.executor(ThreadPool.Names.GENERIC).execute(command::run);
}
}
}
Expand All @@ -98,13 +95,11 @@ public void reschedule(TimeValue delay) {

protected AsyncTwoPhaseIndexer(
ThreadPool threadPool,
String executorName,
AtomicReference<IndexerState> initialState,
JobPosition initialPosition,
JobStats jobStats
) {
this.threadPool = threadPool;
this.executorName = executorName;
this.state = initialState;
this.position = new AtomicReference<>(initialPosition);
this.stats = jobStats;
Expand Down Expand Up @@ -215,7 +210,7 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {

if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
threadPool.executor(executorName).execute(() -> {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
onStart(now, ActionListener.wrap(r -> {
assert r != null;
if (r) {
Expand Down Expand Up @@ -576,7 +571,6 @@ protected void nextSearch() {
);
scheduledNextSearch = new ScheduledRunnable(
threadPool,
executorName,
executionDelay,
() -> triggerNextSearch(executionDelay.getNanos())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
package org.elasticsearch.xpack.core.rollup.v2;


import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

Expand All @@ -25,15 +28,14 @@
import java.util.Objects;

public class RollupAction extends ActionType<RollupAction.Response> {

public static final RollupAction INSTANCE = new RollupAction();
public static final String NAME = "cluster:admin/xpack/rollup/action";

private RollupAction() {
super(NAME, RollupAction.Response::new);
}

public static class Request extends ActionRequest implements ToXContentObject {
public static class Request extends BroadcastRequest<Request> implements ToXContentObject {
private String sourceIndex;
private String rollupIndex;
private RollupActionConfig rollupConfig;
Expand All @@ -44,7 +46,8 @@ public Request(String sourceIndex, String rollupIndex, RollupActionConfig rollup
this.rollupConfig = rollupConfig;
}

public Request() {}
public Request() {
}

public Request(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -120,8 +123,7 @@ protected RequestBuilder(ElasticsearchClient client, RollupAction action) {
}
}

public static class Response extends ActionResponse implements Writeable, ToXContentObject {

public static class Response extends BroadcastResponse implements Writeable, ToXContentObject {
private final boolean created;

public Response(boolean created) {
Expand Down Expand Up @@ -163,4 +165,38 @@ public int hashCode() {
return Objects.hash(created);
}
}

public static class ShardRequest extends BroadcastShardRequest {
private final Request request;

public ShardRequest(StreamInput in) throws IOException {
super(in);
this.request = new Request(in);
}

public ShardRequest(ShardId shardId, Request request) {
super(shardId, request);
this.request = request;
}

public RollupActionConfig getRollupConfig() {
return request.getRollupConfig();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}

public static class ShardResponse extends BroadcastShardResponse {
public ShardResponse(StreamInput in) throws IOException {
super(in);
}

public ShardResponse(ShardId shardId) {
super(shardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
private volatile int step;
private final boolean stoppedBeforeFinished;

protected MockIndexer(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
protected MockIndexer(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) {
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats());
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
}
Expand Down Expand Up @@ -160,9 +160,9 @@ private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer<Integer, MockJobS
private volatile int processOps = 0;
private volatile int bulkOps = 0;

protected MockIndexerFiveRuns(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
protected MockIndexerFiveRuns(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
Integer initialPosition, float maxDocsPerSecond, CountDownLatch latch) {
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats());
startTime = System.nanoTime();
this.latch = latch;
this.maxDocsPerSecond = maxDocsPerSecond;
Expand Down Expand Up @@ -288,7 +288,7 @@ private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer<Integer,

protected MockIndexerThrowsFirstSearch(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
Integer initialPosition) {
super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
super(threadPool, initialState, initialPosition, new MockJobStats());
}

@Override
Expand Down Expand Up @@ -379,7 +379,7 @@ public void testStateMachine() throws Exception {
final ThreadPool threadPool = new TestThreadPool(getTestName());
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, false);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testStop_WhileIndexing() throws Exception {
final ThreadPool threadPool = new TestThreadPool(getTestName());
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, true);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
Expand Down Expand Up @@ -466,7 +466,7 @@ public void doTestFiveRuns(float docsPerSecond, Collection<TimeValue> expectedDe
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final MockThreadPool threadPool = new MockThreadPool(getTestName());
try {
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, state, 2, docsPerSecond,
null);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
Expand Down Expand Up @@ -505,7 +505,7 @@ public void doTestFiveRunsRethrottle(
final MockThreadPool threadPool = new MockThreadPool(getTestName());
try {
CountDownLatch latch = new CountDownLatch(1);
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, state, 2, docsPerSecond,
latch);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand Down Expand Up @@ -90,7 +88,6 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;

public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";

public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";

Expand Down Expand Up @@ -172,18 +169,6 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
return actions;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (transportClientMode) {
return emptyList();
}

FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, Rollup.TASK_THREAD_POOL_NAME,
4, 4, "xpack.rollup.task_thread_pool");

return Collections.singletonList(indexing);
}

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,28 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
/**
* Ctr
* @param threadPool ThreadPool to use to fire the first request of a background job.
* @param executorName Name of the executor to use to fire the first request of a background job.
* @param job The rollup job
* @param initialState Initial state for the indexer
* @param initialPosition The last indexed bucket of the task
* @param upgradedDocumentID whether job has updated IDs (for BWC)
*/
RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
RollupIndexer(ThreadPool threadPool, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, AtomicBoolean upgradedDocumentID) {
this(threadPool, executorName, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
this(threadPool, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
}

/**
* Ctr
* @param threadPool ThreadPool to use to fire the first request of a background job.
* @param executorName Name of the executor to use to fire the first request of a background job.
* @param job The rollup job
* @param initialState Initial state for the indexer
* @param initialPosition The last indexed bucket of the task
* @param upgradedDocumentID whether job has updated IDs (for BWC)
* @param jobStats jobstats instance for collecting stats
*/
RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
RollupIndexer(ThreadPool threadPool, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
super(threadPool, executorName, initialState, initialPosition, jobStats);
super(threadPool, initialState, initialPosition, jobStats);
this.job = job;
this.compositeBuilder = createCompositeBuilder(job.getConfig());
this.upgradedDocumentID = upgradedDocumentID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.rollup.Rollup;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -59,7 +58,7 @@ public static class RollupJobPersistentTasksExecutor extends PersistentTasksExec
private final ThreadPool threadPool;

public RollupJobPersistentTasksExecutor(Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool) {
super(RollupField.TASK_NAME, Rollup.TASK_THREAD_POOL_NAME);
super(RollupField.TASK_NAME, ThreadPool.Names.GENERIC);
this.client = client;
this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool;
Expand Down Expand Up @@ -102,7 +101,7 @@ protected class ClientRollupPageManager extends RollupIndexer {

ClientRollupPageManager(RollupJob job, IndexerState initialState, Map<String, Object> initialPosition,
Client client, AtomicBoolean upgradedDocumentID) {
super(threadPool, ThreadPool.Names.GENERIC, job, new AtomicReference<>(initialState),
super(threadPool, job, new AtomicReference<>(initialState),
initialPosition, upgradedDocumentID);
this.client = client;
this.job = job;
Expand Down
Loading