From b66098fab67cb5aab48750f8961774e758652fed Mon Sep 17 00:00:00 2001 From: Tuan Le Date: Wed, 24 Jun 2020 23:19:04 -0400 Subject: [PATCH 1/4] Add submitUpdateByQueryTask --- .../client/RequestConverters.java | 20 ++++++++++++++----- .../client/RestHighLevelClient.java | 15 ++++++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index c12eb5011ed5c..d5ddbc2c17592 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -554,10 +554,22 @@ static Request submitReindex(ReindexRequest reindexRequest) throws IOException { return prepareReindexRequest(reindexRequest, false); } + static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException { + return prepareDeleteByQueryRequest(deleteByQueryRequest, true); + } + static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException { return prepareDeleteByQueryRequest(deleteByQueryRequest, false); } + static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException { + return prepareUpdateByQueryRequest(updateByQueryRequest, true); + } + + static Request submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException { + return prepareUpdateByQueryRequest(updateByQueryRequest, false); + } + private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException { String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); Request request = new Request(HttpPost.METHOD_NAME, endpoint); @@ -608,7 +620,8 @@ private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteBy return request; } - static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException { + static Request prepareUpdateByQueryRequest(UpdateByQueryRequest updateByQueryRequest, + boolean waitForCompletion) throws IOException { String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query"); Request request = new Request(HttpPost.METHOD_NAME, endpoint); Params params = new Params() @@ -619,6 +632,7 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) .withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) .withIndicesOptions(updateByQueryRequest.indicesOptions()) + .withWaitForCompletion(waitForCompletion) .withSlices(updateByQueryRequest.getSlices()); if (updateByQueryRequest.isAbortOnVersionConflict() == false) { params.putParam("conflicts", "proceed"); @@ -637,10 +651,6 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I return request; } - static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException { - return prepareDeleteByQueryRequest(deleteByQueryRequest, true); - } - static Request rethrottleReindex(RethrottleRequest rethrottleRequest) { return rethrottle(rethrottleRequest, "_reindex"); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 4c21067d9519a..a3302594c3993 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -599,6 +599,21 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue ); } + /** + * Submits a update by query task. + * See + * Update By Query API on elastic.co + * @param updateByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the submission response + */ + public final TaskSubmissionResponse submitUpdateByQueryTask(UpdateByQueryRequest updateByQueryRequest, + RequestOptions options) throws IOException { + return performRequestAndParseEntity( + updateByQueryRequest, RequestConverters::submitUpdateByQuery, options, TaskSubmissionResponse::fromXContent, emptySet() + ); + } + /** * Asynchronously executes an update by query request. * See From 47def3acb6bd5af1e85bcaa435e3270bf619931d Mon Sep 17 00:00:00 2001 From: Tuan Le Date: Wed, 24 Jun 2020 23:38:14 -0400 Subject: [PATCH 2/4] Move common methods to parent class --- .../client/ESRestHighLevelClientTestCase.java | 64 ++++- .../org/elasticsearch/client/ReindexIT.java | 226 +----------------- .../org/elasticsearch/client/TasksIT.java | 2 +- 3 files changed, 65 insertions(+), 227 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java index 174c98f3642df..2fde7c027750c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java @@ -21,6 +21,9 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; @@ -31,6 +34,7 @@ import org.elasticsearch.client.cluster.RemoteInfoResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -42,6 +46,7 @@ import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.AfterClass; import org.junit.Before; @@ -53,16 +58,17 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { + protected static final String CONFLICT_PIPELINE_ID = "conflict_pipeline"; + private static RestHighLevelClient restHighLevelClient; private static boolean async = Booleans.parseBoolean(System.getProperty("tests.rest.async", "false")); @@ -216,6 +222,29 @@ protected static void clusterUpdateSettings(Settings persistentSettings, request, highLevelClient().cluster()::putSettings, highLevelClient().cluster()::putSettingsAsync).isAcknowledged()); } + protected void putConflictPipeline() throws IOException { + final XContentBuilder pipelineBuilder = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", "_version") + .field("value", 1) + .endObject() + .endObject() + .startObject() + .startObject("set") + .field("field", "_id") + .field("value", "1") + .endObject() + .endObject() + .endArray() + .endObject(); + final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType()); + assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged()); + } + @Override protected Settings restClientSettings() { final String user = Objects.requireNonNull(System.getProperty("tests.rest.cluster.username")); @@ -280,4 +309,33 @@ protected static void setupRemoteClusterConfig(String remoteClusterName) throws protected static Map toMap(Response response) throws IOException { return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); } + + protected static TaskId findTaskToRethrottle(String actionName) throws IOException { + long start = System.nanoTime(); + ListTasksRequest request = new ListTasksRequest(); + request.setActions(actionName); + request.setDetailed(true); + do { + ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); + list.rethrowFailures("Finding tasks to rethrottle"); + assertThat("tasks are left over from the last execution of this test", + list.getTaskGroups(), hasSize(lessThan(2))); + if (0 == list.getTaskGroups().size()) { + // The parent task hasn't started yet + continue; + } + TaskGroup taskGroup = list.getTaskGroups().get(0); + assertThat(taskGroup.getChildTasks(), empty()); + return taskGroup.getTaskInfo().getTaskId(); + } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); + throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + + highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); + } + + protected static CheckedRunnable checkTaskCompletionStatus(RestClient client, String taskId) { + return () -> { + Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId)); + assertTrue((boolean) entityAsMap(response).get("completed")); + }; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 1c33a7e183e5a..2d623ce0515eb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -20,31 +20,21 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.tasks.TaskSubmissionResponse; -import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.index.reindex.UpdateByQueryAction; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.script.Script; import org.elasticsearch.tasks.RawTaskStatus; import org.elasticsearch.tasks.TaskId; @@ -54,19 +44,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.everyItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.*; public class ReindexIT extends ESRestHighLevelClientTestCase { - private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline"; - public void testReindex() throws IOException { final String sourceIndex = "source1"; final String destinationIndex = "dest"; @@ -149,7 +130,7 @@ public void testReindexTask() throws Exception { String taskId = reindexSubmission.getTask(); // <3> // end::submit-reindex-task - assertBusy(checkCompletionStatus(client(), taskId)); + assertBusy(checkTaskCompletionStatus(client(), taskId)); } } @@ -195,155 +176,6 @@ public void testReindexConflict() throws IOException { assertTrue(response.getTook().getMillis() > 0); } - public void testUpdateByQuery() throws Exception { - final String sourceIndex = "source1"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - new BulkRequest() - .add(new IndexRequest(sourceIndex).id("1") - .source(Collections.singletonMap("foo", 1), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("2") - .source(Collections.singletonMap("foo", 2), XContentType.JSON)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: create one doc in dest - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - } - { - // test2: update using script - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(2, bulkResponse.getTotal()); - assertEquals(2, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertEquals( - 3, - (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) - .getSourceAsMap().get("foo")) - ); - } - { - // test update-by-query rethrottling - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - - // this following settings are supposed to halt reindexing after first document - updateByQueryRequest.setBatchSize(1); - updateByQueryRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - taskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - assertTrue(taskFinished.await(10, TimeUnit.SECONDS)); - - // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - - public void testUpdateByQueryConflict() throws IOException { - final String index = "testupdatebyqueryconflict"; - - final Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(index, settings); - final BulkRequest bulkRequest = new BulkRequest() - .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) - .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); - - putConflictPipeline(); - - final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(index); - updateByQueryRequest.setRefresh(true); - updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID); - final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); - - assertThat(response.getVersionConflicts(), equalTo(1L)); - assertThat(response.getSearchFailures(), empty()); - assertThat(response.getBulkFailures(), hasSize(1)); - assertThat( - response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), - everyItem(containsString("version conflict")) - ); - - assertThat(response.getTotal(), equalTo(2L)); - assertThat(response.getCreated(), equalTo(0L)); - assertThat(response.getUpdated(), equalTo(1L)); - assertThat(response.getDeleted(), equalTo(0L)); - assertThat(response.getNoops(), equalTo(0L)); - assertThat(response.getBatches(), equalTo(1)); - assertTrue(response.getTook().getMillis() > 0); - } - public void testDeleteByQuery() throws Exception { final String sourceIndex = "source1"; { @@ -473,59 +305,7 @@ public void testDeleteByQueryTask() throws Exception { String taskId = deleteByQuerySubmission.getTask(); // end::submit-delete_by_query-task - assertBusy(checkCompletionStatus(client(), taskId)); + assertBusy(checkTaskCompletionStatus(client(), taskId)); } } - - private static TaskId findTaskToRethrottle(String actionName) throws IOException { - long start = System.nanoTime(); - ListTasksRequest request = new ListTasksRequest(); - request.setActions(actionName); - request.setDetailed(true); - do { - ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); - list.rethrowFailures("Finding tasks to rethrottle"); - assertThat("tasks are left over from the last execution of this test", - list.getTaskGroups(), hasSize(lessThan(2))); - if (0 == list.getTaskGroups().size()) { - // The parent task hasn't started yet - continue; - } - TaskGroup taskGroup = list.getTaskGroups().get(0); - assertThat(taskGroup.getChildTasks(), empty()); - return taskGroup.getTaskInfo().getTaskId(); - } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); - throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + - highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); - } - - static CheckedRunnable checkCompletionStatus(RestClient client, String taskId) { - return () -> { - Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId)); - assertTrue((boolean) entityAsMap(response).get("completed")); - }; - } - - private void putConflictPipeline() throws IOException { - final XContentBuilder pipelineBuilder = jsonBuilder() - .startObject() - .startArray("processors") - .startObject() - .startObject("set") - .field("field", "_version") - .field("value", 1) - .endObject() - .endObject() - .startObject() - .startObject("set") - .field("field", "_id") - .field("value", "1") - .endObject() - .endObject() - .endArray() - .endObject(); - final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder), - pipelineBuilder.contentType()); - assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged()); - } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index 59183dbf0c0f2..00cd6bc307d15 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -121,7 +121,7 @@ public void testGetValidTask() throws Exception { assertEquals("reindex from [source1] to [dest]", info.getDescription()); assertEquals("indices:data/write/reindex", info.getAction()); if (taskResponse.isCompleted() == false) { - assertBusy(ReindexIT.checkCompletionStatus(client(), taskId.toString())); + assertBusy(checkTaskCompletionStatus(client(), taskId.toString())); } } From 9e6672c5d8bfe9d0d47dd67fadc95508cca130ad Mon Sep 17 00:00:00 2001 From: Tuan Le Date: Wed, 24 Jun 2020 23:38:49 -0400 Subject: [PATCH 3/4] Add tests --- .../client/RequestConvertersTests.java | 1 + .../elasticsearch/client/UpdateByQueryIT.java | 240 ++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index fcea9044856c5..58c6b998f56df 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -516,6 +516,7 @@ public void testUpdateByQuery() throws IOException { } setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams); setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + expectedParams.put("wait_for_completion", Boolean.TRUE.toString()); Request request = RequestConverters.updateByQuery(updateByQueryRequest); StringJoiner joiner = new StringJoiner("/", "/", ""); joiner.add(String.join(",", updateByQueryRequest.indices())); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java new file mode 100644 index 0000000000000..8f556faf3ed64 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java @@ -0,0 +1,240 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.*; + +public class UpdateByQueryIT extends ESRestHighLevelClientTestCase { + + public void testUpdateByQuery() throws Exception { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: update using script + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(2, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 3, + (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) + .getSourceAsMap().get("foo")) + ); + } + { + // test update-by-query rethrottling + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + + // this following settings are supposed to halt reindexing after first document + updateByQueryRequest.setBatchSize(1); + updateByQueryRequest.setRequestsPerSecond(0.00001f); + final CountDownLatch taskFinished = new CountDownLatch(1); + highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse response) { + taskFinished.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + + TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); + float requestsPerSecond = 1000f; + ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertThat(response.getTasks(), hasSize(1)); + assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); + assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); + assertEquals(Float.toString(requestsPerSecond), + ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); + assertTrue(taskFinished.await(10, TimeUnit.SECONDS)); + + // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure + response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertTrue(response.getTasks().isEmpty()); + assertFalse(response.getNodeFailures().isEmpty()); + assertEquals(1, response.getNodeFailures().size()); + assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", + response.getNodeFailures().get(0).getCause().getMessage()); + } + } + + public void testUpdateByQueryTask() throws Exception { + final String sourceIndex = "testupdatebyquerytask"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("3") + .source(Collections.singletonMap("foo", 3), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // tag::submit-update_by_query-task + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + + TaskSubmissionResponse updateByQuerySubmission = highLevelClient() + .submitUpdateByQueryTask(updateByQueryRequest, RequestOptions.DEFAULT); + + String taskId = updateByQuerySubmission.getTask(); + // end::submit-update_by_query-task + + assertBusy(checkTaskCompletionStatus(client(), taskId)); + } + } + + public void testUpdateByQueryConflict() throws IOException { + final String index = "testupdatebyqueryconflict"; + + final Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + final BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); + + putConflictPipeline(); + + final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(index); + updateByQueryRequest.setRefresh(true); + updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID); + final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + + assertThat(response.getVersionConflicts(), equalTo(1L)); + assertThat(response.getSearchFailures(), empty()); + assertThat(response.getBulkFailures(), hasSize(1)); + assertThat( + response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()), + everyItem(containsString("version conflict")) + ); + + assertThat(response.getTotal(), equalTo(2L)); + assertThat(response.getCreated(), equalTo(0L)); + assertThat(response.getUpdated(), equalTo(1L)); + assertThat(response.getDeleted(), equalTo(0L)); + assertThat(response.getNoops(), equalTo(0L)); + assertThat(response.getBatches(), equalTo(1)); + assertTrue(response.getTook().getMillis() > 0); + } +} From 3a0b64871ec69f120e89da86a11f7080d655243f Mon Sep 17 00:00:00 2001 From: Tuan Le Date: Thu, 25 Jun 2020 10:43:10 -0400 Subject: [PATCH 4/4] Fix checkstyle --- .../client/ESRestHighLevelClientTestCase.java | 7 ++++++- .../java/org/elasticsearch/client/ReindexIT.java | 7 ++++++- .../org/elasticsearch/client/UpdateByQueryIT.java | 12 +++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java index 2fde7c027750c..e08c4c5ed76de 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java @@ -63,7 +63,12 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 2d623ce0515eb..7e00d4e08ec24 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -44,7 +44,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; public class ReindexIT extends ESRestHighLevelClientTestCase { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java index 8f556faf3ed64..0afadc6adc456 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/UpdateByQueryIT.java @@ -44,7 +44,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; public class UpdateByQueryIT extends ESRestHighLevelClientTestCase { @@ -123,7 +128,7 @@ public void testUpdateByQuery() throws Exception { updateByQueryRequest.setBatchSize(1); updateByQueryRequest.setRequestsPerSecond(0.00001f); final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse response) { @@ -186,7 +191,8 @@ public void testUpdateByQueryTask() throws Exception { // tag::submit-update_by_query-task UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setScript( + new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); updateByQueryRequest.setRefresh(true); TaskSubmissionResponse updateByQuerySubmission = highLevelClient()