Skip to content

Commit 5b7a88d

Browse files
committed
reindex: automatically choose the number of slices (#26030)
In reindex APIs, when using the `slices` parameter to choose the number of slices, adds the option to specify `slices` as "auto" which will choose a reasonable number of slices. It uses the number of shards in the source index, up to a ceiling. If there is more than one source index, it uses the smallest number of shards among them. This gives users an easy way to use slicing in these APIs without having to make decisions about how to configure it, as it provides a good-enough configuration for them out of the box. This may become the default behavior for these APIs in the future.
1 parent 516ed36 commit 5b7a88d

File tree

42 files changed

+1181
-410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1181
-410
lines changed

core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
4545
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
4646
private static final int DEFAULT_SCROLL_SIZE = 1000;
4747

48+
public static final int AUTO_SLICES = 0;
49+
public static final String AUTO_SLICES_VALUE = "auto";
50+
private static final int DEFAULT_SLICES = 1;
51+
4852
/**
4953
* The search to be executed.
5054
*/
@@ -102,7 +106,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
102106
/**
103107
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
104108
*/
105-
private int slices = 1;
109+
private int slices = DEFAULT_SLICES;
106110

107111
/**
108112
* Constructor for deserialization.
@@ -152,8 +156,8 @@ public ActionRequestValidationException validate() {
152156
+ size + "]",
153157
e);
154158
}
155-
if (searchRequest.source().slice() != null && slices != 1) {
156-
e = addValidationError("can't specify both slice and workers", e);
159+
if (searchRequest.source().slice() != null && slices != DEFAULT_SLICES) {
160+
e = addValidationError("can't specify both manual and automatic slicing at the same time", e);
157161
}
158162
return e;
159163
}
@@ -341,8 +345,8 @@ public boolean getShouldStoreResult() {
341345
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
342346
*/
343347
public Self setSlices(int slices) {
344-
if (slices < 1) {
345-
throw new IllegalArgumentException("[slices] must be at least 1");
348+
if (slices < 0) {
349+
throw new IllegalArgumentException("[slices] must be at least 0 but was [" + slices + "]");
346350
}
347351
this.slices = slices;
348352
return self();
@@ -358,24 +362,28 @@ public int getSlices() {
358362
/**
359363
* Build a new request for a slice of the parent request.
360364
*/
361-
public abstract Self forSlice(TaskId slicingTask, SearchRequest slice);
365+
public abstract Self forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices);
362366

363367
/**
364368
* Setup a clone of this request with the information needed to process a slice of it.
365369
*/
366-
protected Self doForSlice(Self request, TaskId slicingTask) {
370+
protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
371+
if (totalSlices < 1) {
372+
throw new IllegalArgumentException("Number of total slices must be at least 1 but was [" + totalSlices + "]");
373+
}
374+
367375
request.setAbortOnVersionConflict(abortOnVersionConflict).setRefresh(refresh).setTimeout(timeout)
368376
.setWaitForActiveShards(activeShardCount).setRetryBackoffInitialTime(retryBackoffInitialTime).setMaxRetries(maxRetries)
369377
// Parent task will store result
370378
.setShouldStoreResult(false)
371379
// Split requests per second between all slices
372-
.setRequestsPerSecond(requestsPerSecond / slices)
380+
.setRequestsPerSecond(requestsPerSecond / totalSlices)
373381
// Sub requests don't have workers
374382
.setSlices(1);
375383
if (size != -1) {
376384
// Size is split between workers. This means the size might round
377385
// down!
378-
request.setSize(size == SIZE_ALL_MATCHES ? SIZE_ALL_MATCHES : size / slices);
386+
request.setSize(size == SIZE_ALL_MATCHES ? SIZE_ALL_MATCHES : size / totalSlices);
379387
}
380388
// Set the parent task so this task is cancelled if we cancel the parent
381389
request.setParentTask(slicingTask);
@@ -385,14 +393,7 @@ protected Self doForSlice(Self request, TaskId slicingTask) {
385393

386394
@Override
387395
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
388-
if (slices > 1) {
389-
return new ParentBulkByScrollTask(id, type, action, getDescription(), parentTaskId, slices);
390-
}
391-
/* Extract the slice from the search request so it'll be available in the status. This is potentially useful for users that manually
392-
* slice their search requests so they can keep track of it and **absolutely** useful for automatically sliced reindex requests so
393-
* they can properly track the responses. */
394-
Integer sliceId = searchRequest.source().slice() == null ? null : searchRequest.source().slice().getId();
395-
return new WorkingBulkByScrollTask(id, type, action, getDescription(), parentTaskId, sliceId, requestsPerSecond);
396+
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId);
396397
}
397398

398399
@Override
@@ -408,11 +409,7 @@ public void readFrom(StreamInput in) throws IOException {
408409
retryBackoffInitialTime = new TimeValue(in);
409410
maxRetries = in.readVInt();
410411
requestsPerSecond = in.readFloat();
411-
if (in.getVersion().onOrAfter(Version.V_5_1_1)) {
412-
slices = in.readVInt();
413-
} else {
414-
slices = 1;
415-
}
412+
slices = in.readVInt();
416413
}
417414

418415
@Override
@@ -427,13 +424,11 @@ public void writeTo(StreamOutput out) throws IOException {
427424
retryBackoffInitialTime.writeTo(out);
428425
out.writeVInt(maxRetries);
429426
out.writeFloat(requestsPerSecond);
430-
if (out.getVersion().onOrAfter(Version.V_5_1_1)) {
431-
out.writeVInt(slices);
427+
if (out.getVersion().before(Version.V_6_1_0) && slices == AUTO_SLICES) {
428+
throw new IllegalArgumentException("Slices set as \"auto\" are not supported before version [" + Version.V_6_1_0 + "]. " +
429+
"Found version [" + out.getVersion() + "]");
432430
} else {
433-
if (slices > 1) {
434-
throw new IllegalArgumentException("Attempting to send sliced reindex-style request to a node that doesn't support "
435-
+ "it. Version is [" + out.getVersion() + "] but must be [" + Version.V_5_1_1 + "]");
436-
}
431+
out.writeVInt(slices);
437432
}
438433
}
439434

core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ public Self setShouldStoreResult(boolean shouldStoreResult) {
145145
/**
146146
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
147147
*/
148-
public Self setSlices(int workers) {
149-
request.setSlices(workers);
148+
public Self setSlices(int slices) {
149+
request.setSlices(slices);
150150
return self();
151151
}
152152
}

core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ public Self setScript(@Nullable Script script) {
6868
}
6969

7070
@Override
71-
protected Self doForSlice(Self request, TaskId slicingTask) {
72-
return super.doForSlice(request, slicingTask).setScript(script);
71+
protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
72+
return super.doForSlice(request, slicingTask, totalSlices).setScript(script);
7373
}
7474

7575
@Override

core/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
190190
@Override
191191
public String toString() {
192192
StringBuilder builder = new StringBuilder();
193-
builder.append("BulkIndexByScrollResponse[");
193+
builder.append(getClass().getSimpleName()).append("[");
194194
builder.append("took=").append(took).append(',');
195195
builder.append("timed_out=").append(timedOut).append(',');
196196
status.innerToString(builder);

core/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

Lines changed: 119 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.tasks.TaskInfo;
3535

3636
import java.io.IOException;
37+
import java.util.Arrays;
38+
import java.util.Collections;
3739
import java.util.List;
3840
import java.util.Objects;
3941

@@ -43,32 +45,138 @@
4345

4446
/**
4547
* Task storing information about a currently running BulkByScroll request.
48+
*
49+
* When the request is not sliced, this task is the only task created, and starts an action to perform search requests.
50+
*
51+
* When the request is sliced, this task can either represent a coordinating task (using
52+
* {@link BulkByScrollTask#setWorkerCount(int)}) or a worker task that performs search queries (using
53+
* {@link BulkByScrollTask#setWorker(float, Integer)}).
54+
*
55+
* We don't always know if this task will be a leader or worker task when it's created, because if slices is set to "auto" it may
56+
* be either depending on the number of shards in the source indices. We figure that out when the request is handled and set it on this
57+
* class with {@link #setWorkerCount(int)} or {@link #setWorker(float, Integer)}.
4658
*/
47-
public abstract class BulkByScrollTask extends CancellableTask {
59+
public class BulkByScrollTask extends CancellableTask {
60+
61+
private LeaderBulkByScrollTaskState leaderState;
62+
private WorkerBulkByScrollTaskState workerState;
63+
4864
public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
4965
super(id, type, action, description, parentTaskId);
5066
}
5167

68+
@Override
69+
public BulkByScrollTask.Status getStatus() {
70+
if (isLeader()) {
71+
return leaderState.getStatus();
72+
}
73+
74+
if (isWorker()) {
75+
return workerState.getStatus();
76+
}
77+
78+
return emptyStatus();
79+
}
80+
81+
/**
82+
* Build the status for this task given a snapshot of the information of running slices. This is only supported if the task is
83+
* set as a leader for slice subtasks
84+
*/
85+
public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List<TaskInfo> sliceInfo) {
86+
if (isLeader() == false) {
87+
throw new IllegalStateException("This task is not set to be a leader of other slice subtasks");
88+
}
89+
90+
List<BulkByScrollTask.StatusOrException> sliceStatuses = Arrays.asList(
91+
new BulkByScrollTask.StatusOrException[leaderState.getSlices()]);
92+
for (TaskInfo t : sliceInfo) {
93+
BulkByScrollTask.Status status = (BulkByScrollTask.Status) t.getStatus();
94+
sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status));
95+
}
96+
Status status = leaderState.getStatus(sliceStatuses);
97+
return taskInfo(localNodeId, getDescription(), status);
98+
}
99+
100+
private BulkByScrollTask.Status emptyStatus() {
101+
return new Status(Collections.emptyList(), getReasonCancelled());
102+
}
103+
104+
/**
105+
* Returns true if this task is a leader for other slice subtasks
106+
*/
107+
public boolean isLeader() {
108+
return leaderState != null;
109+
}
110+
52111
/**
53-
* The number of sub-slices that are still running. {@link WorkingBulkByScrollTask} will always have 0 and
54-
* {@link ParentBulkByScrollTask} will return the number of waiting tasks. Used to decide how to perform rethrottling.
112+
* Sets this task to be a leader task for {@code slices} sliced subtasks
55113
*/
56-
public abstract int runningSliceSubTasks();
114+
public void setWorkerCount(int slices) {
115+
if (isLeader()) {
116+
throw new IllegalStateException("This task is already a leader for other slice subtasks");
117+
}
118+
if (isWorker()) {
119+
throw new IllegalStateException("This task is already a worker");
120+
}
121+
122+
leaderState = new LeaderBulkByScrollTaskState(this, slices);
123+
}
57124

58125
/**
59-
* Apply the {@code newRequestsPerSecond}.
126+
* Returns the object that tracks the state of sliced subtasks. Throws IllegalStateException if this task is not set to be
127+
* a leader task.
60128
*/
61-
public abstract void rethrottle(float newRequestsPerSecond);
129+
public LeaderBulkByScrollTaskState getLeaderState() {
130+
if (!isLeader()) {
131+
throw new IllegalStateException("This task is not set to be a leader for other slice subtasks");
132+
}
133+
return leaderState;
134+
}
62135

63-
/*
64-
* Overridden to force children to return compatible status.
136+
/**
137+
* Returns true if this task is a worker task that performs search requests. False otherwise
65138
*/
66-
public abstract BulkByScrollTask.Status getStatus();
139+
public boolean isWorker() {
140+
return workerState != null;
141+
}
67142

68143
/**
69-
* Build the status for this task given a snapshot of the information of running slices.
144+
* Sets this task to be a worker task that performs search requests
145+
* @param requestsPerSecond How many search requests per second this task should make
146+
* @param sliceId If this is is a sliced task, which slice number this task corresponds to. Null if not sliced.
70147
*/
71-
public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List<TaskInfo> sliceInfo);
148+
public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) {
149+
if (isWorker()) {
150+
throw new IllegalStateException("This task is already a worker");
151+
}
152+
if (isLeader()) {
153+
throw new IllegalStateException("This task is already a leader for other slice subtasks");
154+
}
155+
156+
workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond);
157+
}
158+
159+
/**
160+
* Returns the object that manages sending search requests. Throws IllegalStateException if this task is not set to be a
161+
* worker task.
162+
*/
163+
public WorkerBulkByScrollTaskState getWorkerState() {
164+
if (!isWorker()) {
165+
throw new IllegalStateException("This task is not set to be a worker");
166+
}
167+
return workerState;
168+
}
169+
170+
@Override
171+
public void onCancelled() {
172+
if (isLeader()) {
173+
// The task cancellation task automatically finds children and cancels them, nothing extra to do
174+
} else if (isWorker()) {
175+
workerState.handleCancel();
176+
} else {
177+
throw new IllegalStateException("This task has not had its sliced state initialized and doesn't know how to cancel itself");
178+
}
179+
}
72180

73181
@Override
74182
public boolean shouldCancelChildrenOnCancellation() {

core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public ActionRequestValidationException validate() {
8181
}
8282

8383
@Override
84-
public DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
85-
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask);
84+
public DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) {
85+
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask, totalSlices);
8686
}
8787

8888
@Override

0 commit comments

Comments
 (0)