Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
private static final int DEFAULT_SCROLL_SIZE = 1000;

public static final int AUTO_SLICES = 0;
public static final String AUTO_SLICES_VALUE = "auto";
private static final int DEFAULT_SLICES = 1;

/**
* The search to be executed.
*/
Expand Down Expand Up @@ -102,7 +106,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
/**
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
*/
private int slices = 1;
private int slices = DEFAULT_SLICES;

/**
* Constructor for deserialization.
Expand Down Expand Up @@ -152,8 +156,8 @@ public ActionRequestValidationException validate() {
+ size + "]",
e);
}
if (searchRequest.source().slice() != null && slices != 1) {
e = addValidationError("can't specify both slice and workers", e);
if (searchRequest.source().slice() != null && slices != DEFAULT_SLICES) {
e = addValidationError("can't specify both manual and automatic slicing at the same time", e);
}
return e;
}
Expand Down Expand Up @@ -341,8 +345,8 @@ public boolean getShouldStoreResult() {
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
*/
public Self setSlices(int slices) {
if (slices < 1) {
throw new IllegalArgumentException("[slices] must be at least 1");
if (slices < 0) {
throw new IllegalArgumentException("[slices] must be at least 0 but was [" + slices + "]");
}
this.slices = slices;
return self();
Expand All @@ -358,24 +362,28 @@ public int getSlices() {
/**
* Build a new request for a slice of the parent request.
*/
public abstract Self forSlice(TaskId slicingTask, SearchRequest slice);
public abstract Self forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices);

/**
* Setup a clone of this request with the information needed to process a slice of it.
*/
protected Self doForSlice(Self request, TaskId slicingTask) {
protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
if (totalSlices < 1) {
throw new IllegalArgumentException("Number of total slices must be at least 1 but was [" + totalSlices + "]");
}

request.setAbortOnVersionConflict(abortOnVersionConflict).setRefresh(refresh).setTimeout(timeout)
.setWaitForActiveShards(activeShardCount).setRetryBackoffInitialTime(retryBackoffInitialTime).setMaxRetries(maxRetries)
// Parent task will store result
.setShouldStoreResult(false)
// Split requests per second between all slices
.setRequestsPerSecond(requestsPerSecond / slices)
.setRequestsPerSecond(requestsPerSecond / totalSlices)
// Sub requests don't have workers
.setSlices(1);
if (size != -1) {
// Size is split between workers. This means the size might round
// down!
request.setSize(size == SIZE_ALL_MATCHES ? SIZE_ALL_MATCHES : size / slices);
request.setSize(size == SIZE_ALL_MATCHES ? SIZE_ALL_MATCHES : size / totalSlices);
}
// Set the parent task so this task is cancelled if we cancel the parent
request.setParentTask(slicingTask);
Expand All @@ -385,14 +393,7 @@ protected Self doForSlice(Self request, TaskId slicingTask) {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
if (slices > 1) {
return new ParentBulkByScrollTask(id, type, action, getDescription(), parentTaskId, slices);
}
/* Extract the slice from the search request so it'll be available in the status. This is potentially useful for users that manually
* slice their search requests so they can keep track of it and **absolutely** useful for automatically sliced reindex requests so
* they can properly track the responses. */
Integer sliceId = searchRequest.source().slice() == null ? null : searchRequest.source().slice().getId();
return new WorkingBulkByScrollTask(id, type, action, getDescription(), parentTaskId, sliceId, requestsPerSecond);
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId);
}

@Override
Expand All @@ -408,11 +409,7 @@ public void readFrom(StreamInput in) throws IOException {
retryBackoffInitialTime = new TimeValue(in);
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
if (in.getVersion().onOrAfter(Version.V_5_1_1)) {
slices = in.readVInt();
} else {
slices = 1;
}
slices = in.readVInt();
}

@Override
Expand All @@ -427,13 +424,11 @@ public void writeTo(StreamOutput out) throws IOException {
retryBackoffInitialTime.writeTo(out);
out.writeVInt(maxRetries);
out.writeFloat(requestsPerSecond);
if (out.getVersion().onOrAfter(Version.V_5_1_1)) {
out.writeVInt(slices);
if (out.getVersion().before(Version.V_6_1_0) && slices == AUTO_SLICES) {
throw new IllegalArgumentException("Slices set as \"auto\" are not supported before version [" + Version.V_6_1_0 + "]. " +
"Found version [" + out.getVersion() + "]");
} else {
if (slices > 1) {
throw new IllegalArgumentException("Attempting to send sliced reindex-style request to a node that doesn't support "
+ "it. Version is [" + out.getVersion() + "] but must be [" + Version.V_5_1_1 + "]");
}
out.writeVInt(slices);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public Self setShouldStoreResult(boolean shouldStoreResult) {
/**
* The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.
*/
public Self setSlices(int workers) {
request.setSlices(workers);
public Self setSlices(int slices) {
request.setSlices(slices);
return self();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public Self setScript(@Nullable Script script) {
}

@Override
protected Self doForSlice(Self request, TaskId slicingTask) {
return super.doForSlice(request, slicingTask).setScript(script);
protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
return super.doForSlice(request, slicingTask, totalSlices).setScript(script);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BulkIndexByScrollResponse[");
builder.append(getClass().getSimpleName()).append("[");
builder.append("took=").append(took).append(',');
builder.append("timed_out=").append(timedOut).append(',');
status.innerToString(builder);
Expand Down
130 changes: 119 additions & 11 deletions core/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -43,32 +45,138 @@

/**
* Task storing information about a currently running BulkByScroll request.
*
* When the request is not sliced, this task is the only task created, and starts an action to perform search requests.
*
* When the request is sliced, this task can either represent a coordinating task (using
* {@link BulkByScrollTask#setWorkerCount(int)}) or a worker task that performs search queries (using
* {@link BulkByScrollTask#setWorker(float, Integer)}).
*
* We don't always know if this task will be a leader or worker task when it's created, because if slices is set to "auto" it may
* be either depending on the number of shards in the source indices. We figure that out when the request is handled and set it on this
* class with {@link #setWorkerCount(int)} or {@link #setWorker(float, Integer)}.
*/
public abstract class BulkByScrollTask extends CancellableTask {
public class BulkByScrollTask extends CancellableTask {

private LeaderBulkByScrollTaskState leaderState;
private WorkerBulkByScrollTaskState workerState;

public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
}

@Override
public BulkByScrollTask.Status getStatus() {
if (isLeader()) {
return leaderState.getStatus();
}

if (isWorker()) {
return workerState.getStatus();
}

return emptyStatus();
}

/**
* Build the status for this task given a snapshot of the information of running slices. This is only supported if the task is
* set as a leader for slice subtasks
*/
public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List<TaskInfo> sliceInfo) {
if (isLeader() == false) {
throw new IllegalStateException("This task is not set to be a leader of other slice subtasks");
}

List<BulkByScrollTask.StatusOrException> sliceStatuses = Arrays.asList(
new BulkByScrollTask.StatusOrException[leaderState.getSlices()]);
for (TaskInfo t : sliceInfo) {
BulkByScrollTask.Status status = (BulkByScrollTask.Status) t.getStatus();
sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status));
}
Status status = leaderState.getStatus(sliceStatuses);
return taskInfo(localNodeId, getDescription(), status);
}

private BulkByScrollTask.Status emptyStatus() {
return new Status(Collections.emptyList(), getReasonCancelled());
}

/**
* Returns true if this task is a leader for other slice subtasks
*/
public boolean isLeader() {
return leaderState != null;
}

/**
* The number of sub-slices that are still running. {@link WorkingBulkByScrollTask} will always have 0 and
* {@link ParentBulkByScrollTask} will return the number of waiting tasks. Used to decide how to perform rethrottling.
* Sets this task to be a leader task for {@code slices} sliced subtasks
*/
public abstract int runningSliceSubTasks();
public void setWorkerCount(int slices) {
if (isLeader()) {
throw new IllegalStateException("This task is already a leader for other slice subtasks");
}
if (isWorker()) {
throw new IllegalStateException("This task is already a worker");
}

leaderState = new LeaderBulkByScrollTaskState(this, slices);
}

/**
* Apply the {@code newRequestsPerSecond}.
* Returns the object that tracks the state of sliced subtasks. Throws IllegalStateException if this task is not set to be
* a leader task.
*/
public abstract void rethrottle(float newRequestsPerSecond);
public LeaderBulkByScrollTaskState getLeaderState() {
if (!isLeader()) {
throw new IllegalStateException("This task is not set to be a leader for other slice subtasks");
}
return leaderState;
}

/*
* Overridden to force children to return compatible status.
/**
* Returns true if this task is a worker task that performs search requests. False otherwise
*/
public abstract BulkByScrollTask.Status getStatus();
public boolean isWorker() {
return workerState != null;
}

/**
* Build the status for this task given a snapshot of the information of running slices.
* Sets this task to be a worker task that performs search requests
* @param requestsPerSecond How many search requests per second this task should make
* @param sliceId If this is is a sliced task, which slice number this task corresponds to. Null if not sliced.
*/
public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List<TaskInfo> sliceInfo);
public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) {
if (isWorker()) {
throw new IllegalStateException("This task is already a worker");
}
if (isLeader()) {
throw new IllegalStateException("This task is already a leader for other slice subtasks");
}

workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond);
}

/**
* Returns the object that manages sending search requests. Throws IllegalStateException if this task is not set to be a
* worker task.
*/
public WorkerBulkByScrollTaskState getWorkerState() {
if (!isWorker()) {
throw new IllegalStateException("This task is not set to be a worker");
}
return workerState;
}

@Override
public void onCancelled() {
if (isLeader()) {
// The task cancellation task automatically finds children and cancels them, nothing extra to do
} else if (isWorker()) {
workerState.handleCancel();
} else {
throw new IllegalStateException("This task has not had its sliced state initialized and doesn't know how to cancel itself");
}
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public ActionRequestValidationException validate() {
}

@Override
public DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask);
public DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) {
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask, totalSlices);
}

@Override
Expand Down
Loading