Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
e3191f1
WIP
Tim-Brooks Jun 4, 2019
c5cf831
Work on task
Tim-Brooks Jun 5, 2019
d3f76d9
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 6, 2019
03c55d6
WIP
Tim-Brooks Jun 6, 2019
8a29bb7
WIP
Tim-Brooks Jun 6, 2019
7272fc0
WIP
Tim-Brooks Jun 7, 2019
15d6a45
Changes
Tim-Brooks Jun 7, 2019
a8fdc23
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 10, 2019
7dc09e3
Changes
Tim-Brooks Jun 10, 2019
addb374
Changes
Tim-Brooks Jun 10, 2019
6e856c4
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 11, 2019
7265547
WIP
Tim-Brooks Jun 12, 2019
e354574
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 14, 2019
2037fdc
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 17, 2019
a8ffa25
Work on test
Tim-Brooks Jun 18, 2019
4023500
REmove
Tim-Brooks Jun 18, 2019
22ba62b
Changes
Tim-Brooks Jun 18, 2019
1fb3f57
Change
Tim-Brooks Jun 19, 2019
ccb1a00
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jun 19, 2019
02c3e59
License
Tim-Brooks Jun 19, 2019
9ee47c9
Security changes
Tim-Brooks Jun 19, 2019
0db2bae
Security fixes
Tim-Brooks Jun 19, 2019
b394a10
Small cleanup
Tim-Brooks Jun 19, 2019
544365a
Change
Tim-Brooks Jun 20, 2019
b578938
Changes
Tim-Brooks Jun 20, 2019
03e38d9
Merge remote-tracking branch 'upstream/reindex_v2' into persistent_re…
Tim-Brooks Jun 27, 2019
4941da5
Changes
Tim-Brooks Jun 27, 2019
e852a72
Changes
Tim-Brooks Jun 27, 2019
0065306
WIP
Tim-Brooks Jun 27, 2019
852a6b0
Merge branch 'reindex_v2' into persistent_reindex
Tim-Brooks Jun 28, 2019
5274f9a
Changes
Tim-Brooks Jun 28, 2019
19a4636
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jul 1, 2019
1bdf6e2
Changes
Tim-Brooks Jul 1, 2019
c8cf5e2
Merge branch 'reindex_v2' into persistent_reindex
Tim-Brooks Jul 1, 2019
d691b83
Change
Tim-Brooks Jul 1, 2019
599a3b0
Merge remote-tracking branch 'upstream/master' into persistent_reindex
Tim-Brooks Jul 2, 2019
67a82cf
Fixes
Tim-Brooks Jul 2, 2019
65ed892
Changes
Tim-Brooks Jul 2, 2019
6f9d1bb
Work on serialization
Tim-Brooks Jul 3, 2019
b9cca0b
Change
Tim-Brooks Jul 4, 2019
0db4a2b
Merge remote-tracking branch 'upstream/reindex_v2' into persistent_re…
Tim-Brooks Jul 5, 2019
fd55935
Changes
Tim-Brooks Jul 6, 2019
6b9a1f8
Change assertion
Tim-Brooks Jul 6, 2019
321a790
Store task
Tim-Brooks Jul 6, 2019
15fe610
Changes
Tim-Brooks Jul 8, 2019
ab39730
WIP
Tim-Brooks Jul 9, 2019
82f338a
Changes
Tim-Brooks Jul 9, 2019
8b9c7b6
Change
Tim-Brooks Jul 9, 2019
5995437
Propogate headers
Tim-Brooks Jul 9, 2019
99b906a
Changes
Tim-Brooks Jul 10, 2019
a3e7f1f
Fix
Tim-Brooks Jul 10, 2019
1c26b82
Rethrottle
Tim-Brooks Jul 11, 2019
59c671f
Add validation
Tim-Brooks Jul 11, 2019
ff67a35
Changes
Tim-Brooks Jul 11, 2019
1e3de1a
Tests
Tim-Brooks Jul 11, 2019
9dc3c1a
Fix test
Tim-Brooks Jul 11, 2019
d42bd72
Merge branch 'reindex_v2' into persistent_reindex
Tim-Brooks Jul 11, 2019
41837ff
Change
Tim-Brooks Jul 11, 2019
a847c4e
Dispatch
Tim-Brooks Jul 12, 2019
ce5ec9b
Wait
Tim-Brooks Jul 12, 2019
6b59df0
Merge branch 'reindex_v2' into persistent_reindex
Tim-Brooks Jul 15, 2019
68586d9
Changes
Tim-Brooks Jul 15, 2019
f347160
Review changes
Tim-Brooks Jul 16, 2019
9fd28bf
Changes
Tim-Brooks Jul 17, 2019
555c8a0
Merge branch 'reindex_v2' into persistent_reindex
Tim-Brooks Jul 17, 2019
b1beb76
Fix
Tim-Brooks Jul 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public void testGetValidTask() throws Exception {
}
TaskInfo info = taskResponse.getTaskInfo();
assertTrue(info.isCancellable());
assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
assertEquals("persistent reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("reindex/job[c]", info.getAction());
if (taskResponse.isCompleted() == false) {
assertBusy(ReindexIT.checkCompletionStatus(client(), taskId.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand All @@ -50,23 +56,36 @@
import java.util.List;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;
public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {

public class ReindexPlugin extends Plugin implements ActionPlugin {
public static final String NAME = "reindex";

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(new ActionHandler<>(ReindexAction.INSTANCE, TransportReindexAction.class),
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class));
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
new ActionHandler<>(StartReindexJobAction.INSTANCE, TransportStartReindexJobAction.class)
);
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return singletonList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new));
return Arrays.asList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ReindexJob.NAME, ReindexJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexJobState.NAME, ReindexJobState::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexJobState.NAME, ReindexJobState::new));
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ReindexJob.NAME), ReindexJob::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ReindexJobState.NAME), ReindexJobState::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(ReindexJobState.NAME),
ReindexJobState::fromXContent));
}

@Override
Expand Down Expand Up @@ -95,4 +114,10 @@ public List<Setting<?>> getSettings() {
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client,
SettingsModule settingsModule) {
return Collections.singletonList(new ReindexTask.ReindexPersistentTasksExecutor(clusterService, client));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand All @@ -47,7 +56,55 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return doPrepareRequest(request, client, true, true);
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", true);

// Build the internal request
StartReindexJobAction.Request internal = new StartReindexJobAction.Request(setCommonOptions(request, buildRequest(request)),
waitForCompletion);
/*
* Let's try and validate before forking so the user gets some error. The
* task can't totally validate until it starts but this is better than
* nothing.
*/
ActionRequestValidationException validationException = internal.getReindexRequest().validate();
if (validationException != null) {
throw validationException;
}

// Executes the request and waits for completion
if (waitForCompletion) {
Map<String, String> params = new HashMap<>();
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(true));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(true));

return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new ActionListener<>() {

private BulkIndexByScrollResponseContentListener listener = new BulkIndexByScrollResponseContentListener(channel, params);

@Override
public void onResponse(StartReindexJobAction.Response response) {
listener.onResponse(response.getReindexResponse());
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new RestBuilderListener<>(channel) {
@Override
public RestResponse buildResponse(StartReindexJobAction.Response response, XContentBuilder builder) throws Exception {
builder.startObject();
// This is the ephemeral task-id from the first node that is assigned the task (for BWC).
builder.field("task", response.getTaskId());

// TODO: Are there error conditions for the non-wait case?
return new BytesRestResponse(RestStatus.OK, builder.endObject());
}
});
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

public class StartReindexJobAction extends ActionType<StartReindexJobAction.Response> {

public static final StartReindexJobAction INSTANCE = new StartReindexJobAction();
// TODO: Name
public static final String NAME = "indices:data/write/start_reindex";

private StartReindexJobAction() {
super(NAME, Response::new);
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject, CompositeIndicesRequest {

private final ReindexRequest reindexRequest;
private final boolean waitForCompletion;


public Request(ReindexRequest reindexRequest) {
this(reindexRequest, false);
}

public Request(ReindexRequest reindexRequest, boolean waitForCompletion) {
this.reindexRequest = reindexRequest;
this.waitForCompletion = waitForCompletion;
}

public Request(StreamInput in) throws IOException {
super(in);
reindexRequest = new ReindexRequest(in);
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
reindexRequest.writeTo(out);
out.writeBoolean(waitForCompletion);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
return builder;
}

public ReindexRequest getReindexRequest() {
return reindexRequest;
}

public boolean getWaitForCompletion() {
return waitForCompletion;
}
}

public static class Response extends ActionResponse {

static final ParseField TASK_ID = new ParseField("task_id");
static final ParseField REINDEX_RESPONSE = new ParseField("reindex_response");

private static final ConstructingObjectParser<Response, Void> PARSER = new ConstructingObjectParser<>(
"start_reindex_response", true, args -> new Response((String) args[0], (BulkByScrollResponse) args[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_ID);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> BulkByScrollResponse.fromXContent(parser), REINDEX_RESPONSE);
}

private final String taskId;
@Nullable private final BulkByScrollResponse reindexResponse;

public Response(String taskId) {
this(taskId, null);
}

public Response(String taskId, BulkByScrollResponse reindexResponse) {
this.taskId = taskId;
this.reindexResponse = reindexResponse;
}

public Response(StreamInput in) throws IOException {
super(in);
taskId = in.readString();
reindexResponse = in.readOptionalWriteable(BulkByScrollResponse::new);
}

@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(taskId);
out.writeOptionalWriteable(reindexResponse);
}

public String getTaskId() {
return taskId;
}

public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}

public static Response fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -107,7 +108,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>) ReindexRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -120,22 +121,30 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
indexNameExpressionResolver, autoCreateIndex, state);

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;

BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
listener).start();
// We dispatch here because the new ReindexTask uses this action. When an action is executed locally,
// it is not dispatched from the ctor argument.
threadPool.generic().execute(new ActionRunnable<>(listener) {

@Override
protected void doRun() {
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
indexNameExpressionResolver, autoCreateIndex, state);

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;

BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, TransportReindexAction.this,
request, state, listener).start();
}
);
}
);
});
}

static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
Expand Down
Loading