diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index c12eb5011ed5c..d5ddbc2c17592 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -554,10 +554,22 @@ static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}
+ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
+ return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
+ }
+
static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, false);
}
+ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
+ return prepareUpdateByQueryRequest(updateByQueryRequest, true);
+ }
+
+ static Request submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
+ return prepareUpdateByQueryRequest(updateByQueryRequest, false);
+ }
+
private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@@ -608,7 +620,8 @@ private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteBy
return request;
}
- static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
+ static Request prepareUpdateByQueryRequest(UpdateByQueryRequest updateByQueryRequest,
+ boolean waitForCompletion) throws IOException {
String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
@@ -619,6 +632,7 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(updateByQueryRequest.indicesOptions())
+ .withWaitForCompletion(waitForCompletion)
.withSlices(updateByQueryRequest.getSlices());
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
@@ -637,10 +651,6 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
return request;
}
- static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
- return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
- }
-
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_reindex");
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
index 44d0cb6994925..4d1e9dc96f580 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
@@ -601,6 +601,21 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
);
}
+ /**
+ * Submits a update by query task.
+ * See
+ * Update By Query API on elastic.co
+ * @param updateByQueryRequest the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the submission response
+ */
+ public final TaskSubmissionResponse submitUpdateByQueryTask(UpdateByQueryRequest updateByQueryRequest,
+ RequestOptions options) throws IOException {
+ return performRequestAndParseEntity(
+ updateByQueryRequest, RequestConverters::submitUpdateByQuery, options, TaskSubmissionResponse::fromXContent, emptySet()
+ );
+ }
+
/**
* Asynchronously executes an update by query request.
* See
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
index 174c98f3642df..5f8bd9a9907f6 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
@@ -21,6 +21,9 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@@ -31,6 +34,7 @@
import org.elasticsearch.client.cluster.RemoteInfoResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -42,6 +46,7 @@
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
@@ -51,18 +56,25 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
+ protected static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";
+
private static RestHighLevelClient restHighLevelClient;
private static boolean async = Booleans.parseBoolean(System.getProperty("tests.rest.async", "false"));
@@ -216,6 +228,29 @@ protected static void clusterUpdateSettings(Settings persistentSettings,
request, highLevelClient().cluster()::putSettings, highLevelClient().cluster()::putSettingsAsync).isAcknowledged());
}
+ protected void putConflictPipeline() throws IOException {
+ final XContentBuilder pipelineBuilder = jsonBuilder()
+ .startObject()
+ .startArray("processors")
+ .startObject()
+ .startObject("set")
+ .field("field", "_version")
+ .field("value", 1)
+ .endObject()
+ .endObject()
+ .startObject()
+ .startObject("set")
+ .field("field", "_id")
+ .field("value", "1")
+ .endObject()
+ .endObject()
+ .endArray()
+ .endObject();
+ final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
+ pipelineBuilder.contentType());
+ assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
+ }
+
@Override
protected Settings restClientSettings() {
final String user = Objects.requireNonNull(System.getProperty("tests.rest.cluster.username"));
@@ -280,4 +315,36 @@ protected static void setupRemoteClusterConfig(String remoteClusterName) throws
protected static Map toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}
+
+ protected static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
+ long start = System.nanoTime();
+ ListTasksRequest request = new ListTasksRequest();
+ request.setActions(actionName);
+ request.setDetailed(true);
+ do {
+ ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
+ list.rethrowFailures("Finding tasks to rethrottle");
+ List taskGroups =
+ list.getTaskGroups().stream()
+ .filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
+ assertThat("tasks are left over from the last execution of this test",
+ taskGroups, hasSize(lessThan(2)));
+ if (0 == taskGroups.size()) {
+ // The parent task hasn't started yet
+ continue;
+ }
+ TaskGroup taskGroup = taskGroups.get(0);
+ assertThat(taskGroup.getChildTasks(), empty());
+ return taskGroup.getTaskInfo().getTaskId();
+ } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
+ throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
+ highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
+ }
+
+ protected static CheckedRunnable checkTaskCompletionStatus(RestClient client, String taskId) {
+ return () -> {
+ Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
+ assertTrue((boolean) entityAsMap(response).get("completed"));
+ };
+ }
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java
index 8c764bd253e8c..d0f63922f6078 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java
@@ -20,54 +20,39 @@
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
-import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
-import org.elasticsearch.common.CheckedRunnable;
-import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
-import org.elasticsearch.index.reindex.UpdateByQueryAction;
-import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.lessThan;
public class ReindexIT extends ESRestHighLevelClientTestCase {
- private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";
-
public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
@@ -150,7 +135,7 @@ public void testReindexTask() throws Exception {
String taskId = reindexSubmission.getTask(); // <3>
// end::submit-reindex-task
- assertBusy(checkCompletionStatus(client(), taskId));
+ assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
@@ -196,155 +181,6 @@ public void testReindexConflict() throws IOException {
assertTrue(response.getTook().getMillis() > 0);
}
- public void testUpdateByQuery() throws Exception {
- final String sourceIndex = "source1";
- {
- // Prepare
- Settings settings = Settings.builder()
- .put("number_of_shards", 1)
- .put("number_of_replicas", 0)
- .build();
- createIndex(sourceIndex, settings);
- assertEquals(
- RestStatus.OK,
- highLevelClient().bulk(
- new BulkRequest()
- .add(new IndexRequest(sourceIndex).id("1")
- .source(Collections.singletonMap("foo", 1), XContentType.JSON))
- .add(new IndexRequest(sourceIndex).id("2")
- .source(Collections.singletonMap("foo", 2), XContentType.JSON))
- .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
- RequestOptions.DEFAULT
- ).status()
- );
- }
- {
- // test1: create one doc in dest
- UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
- updateByQueryRequest.indices(sourceIndex);
- updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
- updateByQueryRequest.setRefresh(true);
- BulkByScrollResponse bulkResponse =
- execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
- assertEquals(1, bulkResponse.getTotal());
- assertEquals(1, bulkResponse.getUpdated());
- assertEquals(0, bulkResponse.getNoops());
- assertEquals(0, bulkResponse.getVersionConflicts());
- assertEquals(1, bulkResponse.getBatches());
- assertTrue(bulkResponse.getTook().getMillis() > 0);
- assertEquals(1, bulkResponse.getBatches());
- assertEquals(0, bulkResponse.getBulkFailures().size());
- assertEquals(0, bulkResponse.getSearchFailures().size());
- }
- {
- // test2: update using script
- UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
- updateByQueryRequest.indices(sourceIndex);
- updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
- updateByQueryRequest.setRefresh(true);
- BulkByScrollResponse bulkResponse =
- execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
- assertEquals(2, bulkResponse.getTotal());
- assertEquals(2, bulkResponse.getUpdated());
- assertEquals(0, bulkResponse.getDeleted());
- assertEquals(0, bulkResponse.getNoops());
- assertEquals(0, bulkResponse.getVersionConflicts());
- assertEquals(1, bulkResponse.getBatches());
- assertTrue(bulkResponse.getTook().getMillis() > 0);
- assertEquals(1, bulkResponse.getBatches());
- assertEquals(0, bulkResponse.getBulkFailures().size());
- assertEquals(0, bulkResponse.getSearchFailures().size());
- assertEquals(
- 3,
- (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
- .getSourceAsMap().get("foo"))
- );
- }
- {
- // test update-by-query rethrottling
- UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
- updateByQueryRequest.indices(sourceIndex);
- updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
- updateByQueryRequest.setRefresh(true);
-
- // this following settings are supposed to halt reindexing after first document
- updateByQueryRequest.setBatchSize(1);
- updateByQueryRequest.setRequestsPerSecond(0.00001f);
- final CountDownLatch taskFinished = new CountDownLatch(1);
- highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() {
-
- @Override
- public void onResponse(BulkByScrollResponse response) {
- taskFinished.countDown();
- }
-
- @Override
- public void onFailure(Exception e) {
- fail(e.toString());
- }
- });
-
- TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
- float requestsPerSecond = 1000f;
- ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
- highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
- assertThat(response.getTasks(), hasSize(1));
- assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
- assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
- assertEquals(Float.toString(requestsPerSecond),
- ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
- assertTrue(taskFinished.await(10, TimeUnit.SECONDS));
-
- // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
- response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
- highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
- assertTrue(response.getTasks().isEmpty());
- assertFalse(response.getNodeFailures().isEmpty());
- assertEquals(1, response.getNodeFailures().size());
- assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
- response.getNodeFailures().get(0).getCause().getMessage());
- }
- }
-
- public void testUpdateByQueryConflict() throws IOException {
- final String index = "testupdatebyqueryconflict";
-
- final Settings settings = Settings.builder()
- .put("number_of_shards", 1)
- .put("number_of_replicas", 0)
- .build();
- createIndex(index, settings);
- final BulkRequest bulkRequest = new BulkRequest()
- .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
- .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
- .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
-
- putConflictPipeline();
-
- final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
- updateByQueryRequest.indices(index);
- updateByQueryRequest.setRefresh(true);
- updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID);
- final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
-
- assertThat(response.getVersionConflicts(), equalTo(1L));
- assertThat(response.getSearchFailures(), empty());
- assertThat(response.getBulkFailures(), hasSize(1));
- assertThat(
- response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
- everyItem(containsString("version conflict"))
- );
-
- assertThat(response.getTotal(), equalTo(2L));
- assertThat(response.getCreated(), equalTo(0L));
- assertThat(response.getUpdated(), equalTo(1L));
- assertThat(response.getDeleted(), equalTo(0L));
- assertThat(response.getNoops(), equalTo(0L));
- assertThat(response.getBatches(), equalTo(1));
- assertTrue(response.getTook().getMillis() > 0);
- }
-
public void testDeleteByQuery() throws Exception {
final String sourceIndex = "source1";
{
@@ -474,62 +310,7 @@ public void testDeleteByQueryTask() throws Exception {
String taskId = deleteByQuerySubmission.getTask();
// end::submit-delete_by_query-task
- assertBusy(checkCompletionStatus(client(), taskId));
+ assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
-
- private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
- long start = System.nanoTime();
- ListTasksRequest request = new ListTasksRequest();
- request.setActions(actionName);
- request.setDetailed(true);
- do {
- ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
- list.rethrowFailures("Finding tasks to rethrottle");
- List taskGroups =
- list.getTaskGroups().stream()
- .filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
- assertThat("tasks are left over from the last execution of this test",
- taskGroups, hasSize(lessThan(2)));
- if (0 == taskGroups.size()) {
- // The parent task hasn't started yet
- continue;
- }
- TaskGroup taskGroup = taskGroups.get(0);
- assertThat(taskGroup.getChildTasks(), empty());
- return taskGroup.getTaskInfo().getTaskId();
- } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
- throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
- highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
- }
-
- static CheckedRunnable checkCompletionStatus(RestClient client, String taskId) {
- return () -> {
- Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
- assertTrue((boolean) entityAsMap(response).get("completed"));
- };
- }
-
- private void putConflictPipeline() throws IOException {
- final XContentBuilder pipelineBuilder = jsonBuilder()
- .startObject()
- .startArray("processors")
- .startObject()
- .startObject("set")
- .field("field", "_version")
- .field("value", 1)
- .endObject()
- .endObject()
- .startObject()
- .startObject("set")
- .field("field", "_id")
- .field("value", "1")
- .endObject()
- .endObject()
- .endArray()
- .endObject();
- final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
- pipelineBuilder.contentType());
- assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
- }
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index fcea9044856c5..58c6b998f56df 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -516,6 +516,7 @@ public void testUpdateByQuery() throws IOException {
}
setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams);
setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
+ expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
Request request = RequestConverters.updateByQuery(updateByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", updateByQueryRequest.indices()));
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java
index 59183dbf0c0f2..00cd6bc307d15 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java
@@ -121,7 +121,7 @@ public void testGetValidTask() throws Exception {
assertEquals("reindex from [source1] to [dest]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
if (taskResponse.isCompleted() == false) {
- assertBusy(ReindexIT.checkCompletionStatus(client(), taskId.toString()));
+ assertBusy(checkTaskCompletionStatus(client(), taskId.toString()));
}
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java
new file mode 100644
index 0000000000000..1b7b9370ce228
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.tasks.TaskSubmissionResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.tasks.RawTaskStatus;
+import org.elasticsearch.tasks.TaskId;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class UpdateByQueryIT extends ESRestHighLevelClientTestCase {
+
+ public void testUpdateByQuery() throws Exception {
+ final String sourceIndex = "source1";
+ {
+ // Prepare
+ Settings settings = Settings.builder()
+ .put("number_of_shards", 1)
+ .put("number_of_replicas", 0)
+ .build();
+ createIndex(sourceIndex, settings);
+ assertEquals(
+ RestStatus.OK,
+ highLevelClient().bulk(
+ new BulkRequest()
+ .add(new IndexRequest(sourceIndex).id("1")
+ .source(Collections.singletonMap("foo", 1), XContentType.JSON))
+ .add(new IndexRequest(sourceIndex).id("2")
+ .source(Collections.singletonMap("foo", 2), XContentType.JSON))
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
+ RequestOptions.DEFAULT
+ ).status()
+ );
+ }
+ {
+ // test1: create one doc in dest
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
+ updateByQueryRequest.indices(sourceIndex);
+ updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
+ updateByQueryRequest.setRefresh(true);
+ BulkByScrollResponse bulkResponse =
+ execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
+ assertEquals(1, bulkResponse.getTotal());
+ assertEquals(1, bulkResponse.getUpdated());
+ assertEquals(0, bulkResponse.getNoops());
+ assertEquals(0, bulkResponse.getVersionConflicts());
+ assertEquals(1, bulkResponse.getBatches());
+ assertTrue(bulkResponse.getTook().getMillis() > 0);
+ assertEquals(1, bulkResponse.getBatches());
+ assertEquals(0, bulkResponse.getBulkFailures().size());
+ assertEquals(0, bulkResponse.getSearchFailures().size());
+ }
+ {
+ // test2: update using script
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
+ updateByQueryRequest.indices(sourceIndex);
+ updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
+ updateByQueryRequest.setRefresh(true);
+ BulkByScrollResponse bulkResponse =
+ execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
+ assertEquals(2, bulkResponse.getTotal());
+ assertEquals(2, bulkResponse.getUpdated());
+ assertEquals(0, bulkResponse.getDeleted());
+ assertEquals(0, bulkResponse.getNoops());
+ assertEquals(0, bulkResponse.getVersionConflicts());
+ assertEquals(1, bulkResponse.getBatches());
+ assertTrue(bulkResponse.getTook().getMillis() > 0);
+ assertEquals(1, bulkResponse.getBatches());
+ assertEquals(0, bulkResponse.getBulkFailures().size());
+ assertEquals(0, bulkResponse.getSearchFailures().size());
+ assertEquals(
+ 3,
+ (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
+ .getSourceAsMap().get("foo"))
+ );
+ }
+ {
+ // test update-by-query rethrottling
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
+ updateByQueryRequest.indices(sourceIndex);
+ updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
+ updateByQueryRequest.setRefresh(true);
+
+ // this following settings are supposed to halt reindexing after first document
+ updateByQueryRequest.setBatchSize(1);
+ updateByQueryRequest.setRequestsPerSecond(0.00001f);
+ final CountDownLatch taskFinished = new CountDownLatch(1);
+ highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<>() {
+
+ @Override
+ public void onResponse(BulkByScrollResponse response) {
+ taskFinished.countDown();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ fail(e.toString());
+ }
+ });
+
+ TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
+ float requestsPerSecond = 1000f;
+ ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
+ highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
+ assertThat(response.getTasks(), hasSize(1));
+ assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
+ assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
+ assertEquals(Float.toString(requestsPerSecond),
+ ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
+ assertTrue(taskFinished.await(10, TimeUnit.SECONDS));
+
+ // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
+ response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
+ highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
+ assertTrue(response.getTasks().isEmpty());
+ assertFalse(response.getNodeFailures().isEmpty());
+ assertEquals(1, response.getNodeFailures().size());
+ assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
+ response.getNodeFailures().get(0).getCause().getMessage());
+ }
+ }
+
+ public void testUpdateByQueryTask() throws Exception {
+ final String sourceIndex = "testupdatebyquerytask";
+ {
+ // Prepare
+ Settings settings = Settings.builder()
+ .put("number_of_shards", 1)
+ .put("number_of_replicas", 0)
+ .build();
+ createIndex(sourceIndex, settings);
+ assertEquals(
+ RestStatus.OK,
+ highLevelClient().bulk(
+ new BulkRequest()
+ .add(new IndexRequest(sourceIndex).id("1")
+ .source(Collections.singletonMap("foo", 1), XContentType.JSON))
+ .add(new IndexRequest(sourceIndex).id("2")
+ .source(Collections.singletonMap("foo", 2), XContentType.JSON))
+ .add(new IndexRequest(sourceIndex).id("3")
+ .source(Collections.singletonMap("foo", 3), XContentType.JSON))
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
+ RequestOptions.DEFAULT
+ ).status()
+ );
+ }
+ {
+ // tag::submit-update_by_query-task
+ UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
+ updateByQueryRequest.indices(sourceIndex);
+ updateByQueryRequest.setScript(
+ new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
+ updateByQueryRequest.setRefresh(true);
+
+ TaskSubmissionResponse updateByQuerySubmission = highLevelClient()
+ .submitUpdateByQueryTask(updateByQueryRequest, RequestOptions.DEFAULT);
+
+ String taskId = updateByQuerySubmission.getTask();
+ // end::submit-update_by_query-task
+
+ assertBusy(checkTaskCompletionStatus(client(), taskId));
+ }
+ }
+
+ public void testUpdateByQueryConflict() throws IOException {
+ final String index = "testupdatebyqueryconflict";
+
+ final Settings settings = Settings.builder()
+ .put("number_of_shards", 1)
+ .put("number_of_replicas", 0)
+ .build();
+ createIndex(index, settings);
+ final BulkRequest bulkRequest = new BulkRequest()
+ .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
+ .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+ assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
+
+ putConflictPipeline();
+
+ final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
+ updateByQueryRequest.indices(index);
+ updateByQueryRequest.setRefresh(true);
+ updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID);
+ final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
+
+ assertThat(response.getVersionConflicts(), equalTo(1L));
+ assertThat(response.getSearchFailures(), empty());
+ assertThat(response.getBulkFailures(), hasSize(1));
+ assertThat(
+ response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
+ everyItem(containsString("version conflict"))
+ );
+
+ assertThat(response.getTotal(), equalTo(2L));
+ assertThat(response.getCreated(), equalTo(0L));
+ assertThat(response.getUpdated(), equalTo(1L));
+ assertThat(response.getDeleted(), equalTo(0L));
+ assertThat(response.getNoops(), equalTo(0L));
+ assertThat(response.getBatches(), equalTo(1));
+ assertTrue(response.getTook().getMillis() > 0);
+ }
+}