From 9758f39323ed6650a41a0de6829fa729c5d75097 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 10 Aug 2018 00:43:47 +0200 Subject: [PATCH 1/2] added update by query --- .../client/RequestConverters.java | 29 +++ .../client/RestHighLevelClient.java | 30 +++ .../java/org/elasticsearch/client/CrudIT.java | 67 +++++++ .../client/RequestConvertersTests.java | 56 ++++++ .../client/RestHighLevelClientTests.java | 3 +- .../documentation/CRUDDocumentationIT.java | 122 ++++++++++++ .../document/update-by-query.asciidoc | 182 ++++++++++++++++++ .../high-level/supported-apis.asciidoc | 2 + .../reindex/RestUpdateByQueryAction.java | 3 +- .../index/reindex/RoundTripTests.java | 2 +- .../reindex/UpdateByQueryMetadataTests.java | 3 +- .../reindex/UpdateByQueryWithScriptTests.java | 3 +- .../reindex/AbstractBulkByScrollRequest.java | 4 +- .../index/reindex/BulkByScrollTask.java | 79 +++----- .../index/reindex/UpdateByQueryRequest.java | 106 +++++++++- .../reindex/BulkByScrollResponseTests.java | 37 +++- ...ulkByScrollTaskStatusOrExceptionTests.java | 9 +- .../reindex/BulkByScrollTaskStatusTests.java | 34 +++- .../reindex/UpdateByQueryRequestTests.java | 9 +- 19 files changed, 699 insertions(+), 81 deletions(-) create mode 100644 docs/java-rest/high-level/document/update-by-query.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 15bbde6b5bfae..7dbfd6e8a4cac 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -106,7 +106,9 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest; @@ -836,6 +838,33 @@ static Request reindex(ReindexRequest reindexRequest) throws IOException { return request; } + static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException { + String endpoint = + endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query"); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request) + .withRouting(updateByQueryRequest.getRouting()) + .withPipeline(updateByQueryRequest.getPipeline()) + .withRefresh(updateByQueryRequest.isRefresh()) + .withTimeout(updateByQueryRequest.getTimeout()) + .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) + .withIndicesOptions(updateByQueryRequest.indicesOptions()); + if (updateByQueryRequest.isAbortOnVersionConflict() == false) { + params.putParam("conflicts", "proceed"); + } + if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { + params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize())); + } + if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { + params.putParam("scroll", updateByQueryRequest.getScrollTime()); + } + if (updateByQueryRequest.getSize() > 0) { + params.putParam("size", Integer.toString(updateByQueryRequest.getSize())); + } + request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 4ac5bfd080f18..6e3c5a6fb831e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.rankeval.RankEvalResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; @@ -424,6 +425,35 @@ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions opt ); } + /** + * Executes a update by query request. + * See + * Update By Query API on elastic.co + * @param updateByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes an update by query request. + * See + * Update By Query API on elastic.co + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options, + ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 7978d76c56d70..d76917348aa39 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -691,6 +692,72 @@ public void testReindex() throws IOException { } } + public void testUpdateByQuery() throws IOException { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.setIndices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: update using script + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.setIndices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(2, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 3, + (int) (highLevelClient().get(new GetRequest(sourceIndex, "type", "2"), RequestOptions.DEFAULT) + .getSourceAsMap().get("foo")) + ); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 44b4ae05b57c7..dfe761e89e873 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -129,6 +129,7 @@ import org.elasticsearch.index.rankeval.RestRankEvalAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest; import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; @@ -137,6 +138,7 @@ import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.action.search.RestSearchAction; +import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.mustache.MultiSearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -470,6 +472,60 @@ public void testReindex() throws IOException { assertToXContentBody(reindexRequest, request.getEntity()); } + public void testUpdateByQuery() throws IOException { + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.setIndices(randomIndicesNames(1, 5)); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + updateByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false)); + } + if (randomBoolean()) { + int batchSize = randomInt(100); + updateByQueryRequest.setBatchSize(batchSize); + expectedParams.put("scroll_size", Integer.toString(batchSize)); + } + if (randomBoolean()) { + updateByQueryRequest.setPipeline("my_pipeline"); + expectedParams.put("pipeline", "my_pipeline"); + } + if (randomBoolean()) { + updateByQueryRequest.setRouting("=cat"); + expectedParams.put("routing", "=cat"); + } + if (randomBoolean()) { + int size = randomIntBetween(100, 1000); + updateByQueryRequest.setSize(size); + expectedParams.put("size", Integer.toString(size)); + } + if (randomBoolean()) { + updateByQueryRequest.setAbortOnVersionConflict(false); + expectedParams.put("conflicts", "proceed"); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + updateByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + expectedParams.put("scroll", ts); + } + if (randomBoolean()) { + updateByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval")); + } + if (randomBoolean()) { + updateByQueryRequest.setScript(new Script("ctx._source.last = \"lastname\"")); + } + setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams); + setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + Request request = RequestConverters.updateByQuery(updateByQueryRequest); + StringJoiner joiner = new StringJoiner("/", "/", ""); + joiner.add(String.join(",", updateByQueryRequest.indices())); + if (updateByQueryRequest.getDocTypes().length > 0) + joiner.add(String.join(",", updateByQueryRequest.getDocTypes())); + joiner.add("_update_by_query"); + assertEquals(joiner.toString(), request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(updateByQueryRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index d2585b6f3f4c2..7df4e07257680 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -664,8 +664,7 @@ public void testApiNamingConventions() throws Exception { "render_search_template", "scripts_painless_execute", "tasks.get", - "termvectors", - "update_by_query" + "termvectors" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 9c69a2a48361a..b2a61326aa7d7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -67,6 +68,7 @@ import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -899,6 +901,126 @@ public void onFailure(Exception e) { } } + public void testUpdateByQuery() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + createPipeline("my_pipeline"); + } + { + // tag::update-by-query-request + UpdateByQueryRequest request = new UpdateByQueryRequest(); // <1> + request.setIndices("source1", "source2"); // <2> + // end::update-by-query-request + // tag::update-by-query-request-conflicts + request.setConflicts("proceed"); // <1> + // end::update-by-query-request-conflicts + // tag::update-by-query-request-typeOrQuery + request.setDocTypes("doc"); // <1> + request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::update-by-query-request-typeOrQuery + // tag::update-by-query-request-size + request.setSize(10); // <1> + // end::update-by-query-request-size + // tag::update-by-query-request-scrollSize + request.setBatchSize(100); // <1> + // end::update-by-query-request-scrollSize + // tag::update-by-query-request-pipeline + request.setPipeline("my_pipeline"); // <1> + // end::update-by-query-request-pipeline + // tag::update-by-query-request-script + request.setScript( + new Script( + ScriptType.INLINE, "painless", + "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", + Collections.emptyMap())); // <1> + // end::update-by-query-request-script + // tag::update-by-query-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::update-by-query-request-timeout + // tag::update-by-query-request-refresh + request.setRefresh(true); // <1> + // end::update-by-query-request-refresh + // tag::update-by-query-request-slices + request.setSlices(2); // <1> + // end::update-by-query-request-slices + // tag::update-by-query-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::update-by-query-request-scroll + // tag::update-by-query-request-routing + request.setRouting("=cat"); // <1> + // end::update-by-query-request-routing + // tag::update-by-query-request-indicesOptions + request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1> + // end::update-by-query-request-indicesOptions + + // tag::update-by-query-execute + BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT); + // end::update-by-query-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::update-by-query-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long updatedDocs = bulkResponse.getUpdated(); // <4> + long deletedDocs = bulkResponse.getDeleted(); // <5> + long batches = bulkResponse.getBatches(); // <6> + long noops = bulkResponse.getNoops(); // <7> + long versionConflicts = bulkResponse.getVersionConflicts(); // <8> + long bulkRetries = bulkResponse.getBulkRetries(); // <9> + long searchRetries = bulkResponse.getSearchRetries(); // <10> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <11> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <12> + List searchFailures = bulkResponse.getSearchFailures(); // <13> + List bulkFailures = bulkResponse.getBulkFailures(); // <14> + // end::update-by-query-response + } + { + UpdateByQueryRequest request = new UpdateByQueryRequest(); + request.setIndices("source1"); + + // tag::update-by-query-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::update-by-query-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::update-by-query-execute-async + client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::update-by-query-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/update-by-query.asciidoc b/docs/java-rest/high-level/document/update-by-query.asciidoc new file mode 100644 index 0000000000000..679b00c3291d2 --- /dev/null +++ b/docs/java-rest/high-level/document/update-by-query.asciidoc @@ -0,0 +1,182 @@ +[[java-rest-high-document-update-by-query]] +=== Update By Query API + +[[java-rest-high-document-update-by-query-request]] +==== Update By Query Request + +A `UpdateByQueryRequest` can be used to update documents in an index. + +It requires an existing index and a target index which may or may not exist pre-request. + +The simplest form of a `UpdateByQueryRequest` looks like follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request] +-------------------------------------------------- +<1> Creates the `UpdateByQueryRequest` +<2> Adds a list of sources to update + +By default version conflicts abort the `UpdateByQueryRequest` process but you can just count them by settings it to +`proceed` in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `UpdateByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scrollSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +Update by query can also use the ingest feature by specifying a `pipeline`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-pipeline] +-------------------------------------------------- +<1> set pipeline to `my_pipeline` + +`UpdateByQueryRequest` also supports a `script` that modifies the document. The following example illustrates that. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-script] +-------------------------------------------------- +<1> `setScript` to increment the `likes` field on all documents with user `kimchy`. + +`UpdateByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`UpdateByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scroll] +-------------------------------------------------- +<1> set scroll time + +If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match +that routing value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-routing] +-------------------------------------------------- +<1> set routing + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the update by query request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling update by query + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-indicesOptions] +-------------------------------------------------- +<1> Set indices options + + +[[java-rest-high-document-update-by-query-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute] +-------------------------------------------------- + +[[java-rest-high-document-update-by-query-async]] +==== Asynchronous Execution + +The asynchronous execution of an update by query request requires both the `UpdateByQueryRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-async] +-------------------------------------------------- +<1> The `UpdateByQueryRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `UpdateByQueryRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-update-by-query-execute-listener-response]] +==== Update By Query Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were updated +<5> Number of docs that were deleted +<6> Number of batches that were executed +<7> Number of skipped docs +<8> Number of version conflicts +<9> Number of times request had to retry bulk index operations +<10> Number of times request had to retry search operations +<11> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<12> Remaining delay of any current throttle sleep or 0 if not sleeping +<13> Failures during search phase +<14> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 64c95912b5ea9..961a4ed08667c 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -16,6 +16,7 @@ Multi-document APIs:: * <> * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -25,6 +26,7 @@ include::document/update.asciidoc[] include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] include::document/reindex.asciidoc[] +include::document/update-by-query.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java index bf0adc6e1429f..72a2a0d73356b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -63,7 +62,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept * it to set its own defaults which differ from SearchRequest's * defaults. Then the parse can override them. */ - UpdateByQueryRequest internal = new UpdateByQueryRequest(new SearchRequest()); + UpdateByQueryRequest internal = new UpdateByQueryRequest(); Map> consumers = new HashMap<>(); consumers.put("conflicts", o -> internal.setConflicts((String) o)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index cc848900b7815..30621ab607bf2 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -81,7 +81,7 @@ public void testReindexRequest() throws IOException { } public void testUpdateByQueryRequest() throws IOException { - UpdateByQueryRequest update = new UpdateByQueryRequest(new SearchRequest()); + UpdateByQueryRequest update = new UpdateByQueryRequest(); randomRequest(update); if (randomBoolean()) { update.setPipeline(randomAlphaOfLength(5)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java index b688ce019e3df..d3f62af907d9f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.settings.Settings; public class UpdateByQueryMetadataTests @@ -39,7 +38,7 @@ protected TestAction action() { @Override protected UpdateByQueryRequest request() { - return new UpdateByQueryRequest(new SearchRequest()); + return new UpdateByQueryRequest(); } private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 4006d16fbcb11..8c9744aa0dd92 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; @@ -50,7 +49,7 @@ public void testModifyingCtxNotAllowed() { @Override protected UpdateByQueryRequest request() { - return new UpdateByQueryRequest(new SearchRequest()); + return new UpdateByQueryRequest(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 3b635c8238784..b2e6f98f1268f 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -44,8 +44,8 @@ public abstract class AbstractBulkByScrollRequest> extends ActionRequest { public static final int SIZE_ALL_MATCHES = -1; - static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5); - static final int DEFAULT_SCROLL_SIZE = 1000; + public static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5); + public static final int DEFAULT_SCROLL_SIZE = 1000; public static final int AUTO_SLICES = 0; public static final String AUTO_SLICES_VALUE = "auto"; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 5beb86fae6ba8..7aa2c8a1b75a9 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -209,8 +209,8 @@ public boolean shouldCancelChildrenOnCancellation() { public static class StatusBuilder { private Integer sliceId = null; private Long total = null; - private Long updated = null; - private Long created = null; + private long updated = 0; // Not present during deleteByQuery + private long created = 0; // Not present during updateByQuery private Long deleted = null; private Integer batches = null; private Long versionConflicts = null; @@ -221,7 +221,7 @@ public static class StatusBuilder { private Float requestsPerSecond = null; private String reasonCancelled = null; private TimeValue throttledUntil = null; - private List sliceStatuses = emptyList(); + private List sliceStatuses = new ArrayList<>(); public void setSliceId(Integer sliceId) { this.sliceId = sliceId; @@ -295,10 +295,14 @@ public void setThrottledUntil(Long throttledUntil) { public void setSliceStatuses(List sliceStatuses) { if (sliceStatuses != null) { - this.sliceStatuses = sliceStatuses; + this.sliceStatuses.addAll(sliceStatuses); } } + public void addToSliceStatuses(StatusOrException statusOrException) { + this.sliceStatuses.add(statusOrException); + } + public Status buildStatus() { if (sliceStatuses.isEmpty()) { try { @@ -613,37 +617,20 @@ public static Status innerFromXContent(XContentParser parser) throws IOException Token token = parser.currentToken(); String fieldName = parser.currentName(); ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); - Integer sliceId = null; - Long total = null; - Long updated = null; - Long created = null; - Long deleted = null; - Integer batches = null; - Long versionConflicts = null; - Long noOps = null; - Long bulkRetries = null; - Long searchRetries = null; - TimeValue throttled = null; - Float requestsPerSecond = null; - String reasonCancelled = null; - TimeValue throttledUntil = null; - List sliceStatuses = new ArrayList<>(); + StatusBuilder builder = new StatusBuilder(); while ((token = parser.nextToken()) != Token.END_OBJECT) { if (token == Token.FIELD_NAME) { fieldName = parser.currentName(); } else if (token == Token.START_OBJECT) { if (fieldName.equals(Status.RETRIES_FIELD)) { - Tuple retries = - Status.RETRIES_PARSER.parse(parser, null); - bulkRetries = retries.v1(); - searchRetries = retries.v2(); + builder.setRetries(Status.RETRIES_PARSER.parse(parser, null)); } else { parser.skipChildren(); } } else if (token == Token.START_ARRAY) { if (fieldName.equals(Status.SLICES_FIELD)) { while ((token = parser.nextToken()) != Token.END_ARRAY) { - sliceStatuses.add(StatusOrException.fromXContent(parser)); + builder.addToSliceStatuses(StatusOrException.fromXContent(parser)); } } else { parser.skipChildren(); @@ -651,57 +638,47 @@ public static Status innerFromXContent(XContentParser parser) throws IOException } else { // else if it is a value switch (fieldName) { case Status.SLICE_ID_FIELD: - sliceId = parser.intValue(); + builder.setSliceId(parser.intValue()); break; case Status.TOTAL_FIELD: - total = parser.longValue(); + builder.setTotal(parser.longValue()); break; case Status.UPDATED_FIELD: - updated = parser.longValue(); + builder.setUpdated(parser.longValue()); break; case Status.CREATED_FIELD: - created = parser.longValue(); + builder.setCreated(parser.longValue()); break; case Status.DELETED_FIELD: - deleted = parser.longValue(); + builder.setDeleted(parser.longValue()); break; case Status.BATCHES_FIELD: - batches = parser.intValue(); + builder.setBatches(parser.intValue()); break; case Status.VERSION_CONFLICTS_FIELD: - versionConflicts = parser.longValue(); + builder.setVersionConflicts(parser.longValue()); break; case Status.NOOPS_FIELD: - noOps = parser.longValue(); + builder.setNoops(parser.longValue()); break; case Status.THROTTLED_RAW_FIELD: - throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + builder.setThrottled(parser.longValue()); break; case Status.REQUESTS_PER_SEC_FIELD: - requestsPerSecond = parser.floatValue(); - requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + builder.setRequestsPerSecond(parser.floatValue()); break; case Status.CANCELED_FIELD: - reasonCancelled = parser.text(); + builder.setReasonCancelled(parser.text()); break; case Status.THROTTLED_UNTIL_RAW_FIELD: - throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + builder.setThrottledUntil(parser.longValue()); break; default: break; } } } - if (sliceStatuses.isEmpty()) { - return - new Status( - sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries, - searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil - ); - } else { - return new Status(sliceStatuses, reasonCancelled); - } - + return builder.buildStatus(); } @Override @@ -838,15 +815,15 @@ public int hashCode() { ); } - public boolean equalsWithoutSliceStatus(Object o) { + public boolean equalsWithoutSliceStatus(Object o, boolean includeUpdated, boolean includeCreated) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status other = (Status) o; return Objects.equals(sliceId, other.sliceId) && total == other.total && - updated == other.updated && - created == other.created && + (!includeUpdated || updated == other.updated) && + (!includeCreated || created == other.created) && deleted == other.deleted && batches == other.batches && versionConflicts == other.versionConflicts && @@ -861,7 +838,7 @@ public boolean equalsWithoutSliceStatus(Object o) { @Override public boolean equals(Object o) { - if (equalsWithoutSliceStatus(o)) { + if (equalsWithoutSliceStatus(o, true, true)) { return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses); } else { return false; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java index eb4fd59a7bc5f..696030e2ca8c2 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java @@ -24,6 +24,9 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -34,16 +37,18 @@ * representative set of subrequests. This is best-effort but better than {@linkplain ReindexRequest} because scripts can't change the * destination index and things. */ -public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest implements IndicesRequest.Replaceable { +public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest + implements IndicesRequest.Replaceable, ToXContentObject { /** * Ingest pipeline to set on index requests made by this action. */ private String pipeline; public UpdateByQueryRequest() { + this(new SearchRequest()); } - public UpdateByQueryRequest(SearchRequest search) { + UpdateByQueryRequest(SearchRequest search) { this(search, true); } @@ -59,8 +64,91 @@ private UpdateByQueryRequest(SearchRequest search, boolean setDefaults) { /** * Set the ingest pipeline to set on index requests made by this action. */ - public void setPipeline(String pipeline) { + public UpdateByQueryRequest setPipeline(String pipeline) { this.pipeline = pipeline; + return this; + } + + /** + * Set the query for selective update + */ + public UpdateByQueryRequest setQuery(QueryBuilder query) { + if (query != null) { + getSearchRequest().source().query(query); + } + return this; + } + + /** + * Set the document types for the update + */ + public UpdateByQueryRequest setDocTypes(String... types) { + if (types != null) { + getSearchRequest().types(types); + } + return this; + } + + /** + * Set the indices on which the update by query request is to be run + */ + public UpdateByQueryRequest setIndices(String... indices) { + if (indices != null) { + getSearchRequest().indices(indices); + } + return this; + } + + /** + * Set routing limiting the process to the shards that match that routing value + */ + public UpdateByQueryRequest setRouting(String routing) { + if (routing != null) { + getSearchRequest().routing(routing); + } + return this; + } + + /** + * The scroll size to control number of documents processed per batch + */ + public UpdateByQueryRequest setBatchSize(int size) { + getSearchRequest().source().size(size); + return this; + } + + /** + * Set the IndicesOptions for controlling unavailable indices + */ + public UpdateByQueryRequest setIndicesOptions(IndicesOptions indicesOptions) { + getSearchRequest().indicesOptions(indicesOptions); + return this; + } + + /** + * Gets the batch size for this request + */ + public int getBatchSize() { + return getSearchRequest().source().size(); + } + + /** + * Gets the routing value used for this request + */ + public String getRouting() { + return getSearchRequest().routing(); + } + + /** + * Gets the document types on which this request would be executed. Returns an empty array if all + * types are to be processed. + */ + public String[] getDocTypes() { + if (getSearchRequest().types() != null) { + return getSearchRequest().types(); + } else { + return new String[0]; + } } /** @@ -121,4 +209,16 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(pipeline); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (getScript() != null) { + builder.field("script"); + getScript().toXContent(builder, params); + } + getSearchRequest().source().innerToXContent(builder, params); + builder.endObject(); + return builder; + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java index 0dd4d6bc84974..71aab8ca9f9f6 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java @@ -24,11 +24,15 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -38,6 +42,9 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase { + private boolean includeUpdated; + private boolean includeCreated; + public void testRountTrip() throws IOException { BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatus(), randomIndexingFailures(), randomSearchFailures(), randomBoolean()); @@ -97,10 +104,11 @@ private void assertResponseEquals(BulkByScrollResponse expected, BulkByScrollRes } } - @Override - protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) { + public static void assertEqualBulkResponse(BulkByScrollResponse expected, BulkByScrollResponse actual, + boolean includeUpdated, boolean includeCreated) { assertEquals(expected.getTook(), actual.getTook()); - BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + BulkByScrollTaskStatusTests + .assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated); assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size()); for (int i = 0; i < expected.getBulkFailures().size(); i++) { Failure expectedFailure = expected.getBulkFailures().get(i); @@ -122,6 +130,11 @@ protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollR } } + @Override + protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) { + assertEqualBulkResponse(expected, actual, includeUpdated, includeCreated); + } + @Override protected BulkByScrollResponse createTestInstance() { // failures are tested separately, so we can test XContent equivalence at least when we have no failures @@ -141,4 +154,22 @@ protected BulkByScrollResponse doParseInstance(XContentParser parser) throws IOE protected boolean supportsUnknownFields() { return true; } + + @Override + protected ToXContent.Params getToXContentParams() { + Map params = new HashMap<>(); + if (randomBoolean()) { + includeUpdated = false; + params.put(Status.INCLUDE_UPDATED, "false"); + } else { + includeUpdated = true; + } + if (randomBoolean()) { + includeCreated = false; + params.put(Status.INCLUDE_CREATED, "false"); + } else { + includeCreated = true; + } + return new ToXContent.MapParams(params); + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java index 33c56bacd9121..0d84b0e1412b1 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java @@ -55,11 +55,14 @@ protected StatusOrException doParseInstance(XContentParser parser) throws IOExce return StatusOrException.fromXContent(parser); } - public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) { + public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual, + boolean includeUpdated, boolean includeCreated) { if (expected != null && actual != null) { assertNotSame(expected, actual); if (expected.getException() == null) { - BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + BulkByScrollTaskStatusTests + // we test includeCreated params in the Status tests + .assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated); } else { assertThat( actual.getException().getMessage(), @@ -74,7 +77,7 @@ public static void assertEqualStatusOrException(StatusOrException expected, Stat @Override protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) { - assertEqualStatusOrException(expected, actual); + assertEqualStatusOrException(expected, actual, true, true); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java index 368e1b3bdac08..13db9f4766e79 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java @@ -33,7 +33,9 @@ import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -44,6 +46,10 @@ import static org.hamcrest.Matchers.equalTo; public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase { + + private boolean includeUpdated; + private boolean includeCreated; + public void testBulkByTaskStatus() throws IOException { BulkByScrollTask.Status status = randomStatus(); BytesStreamOutput out = new BytesStreamOutput(); @@ -144,21 +150,21 @@ bulkRetries, searchRetries, throttled, abs(Randomness.get().nextFloat()), ); } - public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { + public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual, + boolean includeUpdated, boolean includeCreated) { assertNotSame(expected, actual); - assertTrue(expected.equalsWithoutSliceStatus(actual)); + assertTrue(expected.equalsWithoutSliceStatus(actual, includeUpdated, includeCreated)); assertThat(expected.getSliceStatuses().size(), equalTo(actual.getSliceStatuses().size())); for (int i = 0; i< expected.getSliceStatuses().size(); i++) { BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException( - expected.getSliceStatuses().get(i), - actual.getSliceStatuses().get(i) + expected.getSliceStatuses().get(i), actual.getSliceStatuses().get(i), includeUpdated, includeCreated ); } } @Override protected void assertEqualInstances(BulkByScrollTask.Status first, BulkByScrollTask.Status second) { - assertEqualStatus(first, second); + assertEqualStatus(first, second, includeUpdated, includeCreated); } @Override @@ -193,4 +199,22 @@ public void testFromXContentWithFailures() throws IOException { getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); } + + @Override + protected ToXContent.Params getToXContentParams() { + Map params = new HashMap<>(); + if (randomBoolean()) { + includeUpdated = false; + params.put(Status.INCLUDE_UPDATED, "false"); + } else { + includeUpdated = true; + } + if (randomBoolean()) { + includeCreated = false; + params.put(Status.INCLUDE_CREATED, "false"); + } else { + includeCreated = true; + } + return new ToXContent.MapParams(params); + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java index b30968cf056b5..09ab76d9b8279 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import static org.apache.lucene.util.TestUtil.randomSimpleString; @@ -32,11 +31,11 @@ public void testUpdateByQueryRequestImplementsIndicesRequestReplaceable() { indices[i] = randomSimpleString(random(), 1, 30); } - SearchRequest searchRequest = new SearchRequest(indices); IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()); - searchRequest.indicesOptions(indicesOptions); - UpdateByQueryRequest request = new UpdateByQueryRequest(searchRequest); + UpdateByQueryRequest request = new UpdateByQueryRequest(); + request.setIndices(indices); + request.setIndicesOptions(indicesOptions); for (int i = 0; i < numIndices; i++) { assertEquals(indices[i], request.indices()[i]); } @@ -60,7 +59,7 @@ public void testUpdateByQueryRequestImplementsIndicesRequestReplaceable() { @Override protected UpdateByQueryRequest newRequest() { - return new UpdateByQueryRequest(new SearchRequest(randomAlphaOfLength(5))); + return new UpdateByQueryRequest().setIndices(randomAlphaOfLength(5)); } @Override From 7708d99e07f77a72dabed12b0ea68ee2dd1cdba6 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 10 Aug 2018 16:53:12 +0200 Subject: [PATCH 2/2] removed redundant setter and fixed docs --- .../test/java/org/elasticsearch/client/CrudIT.java | 4 ++-- .../client/RequestConvertersTests.java | 2 +- .../client/documentation/CRUDDocumentationIT.java | 5 ++--- .../high-level/document/update-by-query.asciidoc | 5 ++--- .../index/reindex/UpdateByQueryRequest.java | 14 ++++---------- .../index/reindex/UpdateByQueryRequestTests.java | 4 ++-- 6 files changed, 13 insertions(+), 21 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index d76917348aa39..e02d9f451ebe0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -717,7 +717,7 @@ public void testUpdateByQuery() throws IOException { { // test1: create one doc in dest UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.setIndices(sourceIndex); + updateByQueryRequest.indices(sourceIndex); updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type")); updateByQueryRequest.setRefresh(true); BulkByScrollResponse bulkResponse = @@ -735,7 +735,7 @@ public void testUpdateByQuery() throws IOException { { // test2: update using script UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.setIndices(sourceIndex); + updateByQueryRequest.indices(sourceIndex); updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); updateByQueryRequest.setRefresh(true); BulkByScrollResponse bulkResponse = diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index dfe761e89e873..92930d14cf4a5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -474,7 +474,7 @@ public void testReindex() throws IOException { public void testUpdateByQuery() throws IOException { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.setIndices(randomIndicesNames(1, 5)); + updateByQueryRequest.indices(randomIndicesNames(1, 5)); Map expectedParams = new HashMap<>(); if (randomBoolean()) { updateByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index b2a61326aa7d7..ac9d42c65ca59 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -924,8 +924,7 @@ public void testUpdateByQuery() throws Exception { } { // tag::update-by-query-request - UpdateByQueryRequest request = new UpdateByQueryRequest(); // <1> - request.setIndices("source1", "source2"); // <2> + UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); // <1> // end::update-by-query-request // tag::update-by-query-request-conflicts request.setConflicts("proceed"); // <1> @@ -993,7 +992,7 @@ public void testUpdateByQuery() throws Exception { } { UpdateByQueryRequest request = new UpdateByQueryRequest(); - request.setIndices("source1"); + request.indices("source1"); // tag::update-by-query-execute-listener ActionListener listener = new ActionListener() { diff --git a/docs/java-rest/high-level/document/update-by-query.asciidoc b/docs/java-rest/high-level/document/update-by-query.asciidoc index 679b00c3291d2..324385a442b5d 100644 --- a/docs/java-rest/high-level/document/update-by-query.asciidoc +++ b/docs/java-rest/high-level/document/update-by-query.asciidoc @@ -6,7 +6,7 @@ A `UpdateByQueryRequest` can be used to update documents in an index. -It requires an existing index and a target index which may or may not exist pre-request. +It requires an existing index (or a set of indices) on which the update is to be performed. The simplest form of a `UpdateByQueryRequest` looks like follows: @@ -14,8 +14,7 @@ The simplest form of a `UpdateByQueryRequest` looks like follows: -------------------------------------------------- include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request] -------------------------------------------------- -<1> Creates the `UpdateByQueryRequest` -<2> Adds a list of sources to update +<1> Creates the `UpdateByQueryRequest` on a set of indices. By default version conflicts abort the `UpdateByQueryRequest` process but you can just count them by settings it to `proceed` in the request body diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java index 696030e2ca8c2..71ffadc930327 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java @@ -48,6 +48,10 @@ public UpdateByQueryRequest() { this(new SearchRequest()); } + public UpdateByQueryRequest(String... indices) { + this(new SearchRequest(indices)); + } + UpdateByQueryRequest(SearchRequest search) { this(search, true); } @@ -89,16 +93,6 @@ public UpdateByQueryRequest setDocTypes(String... types) { return this; } - /** - * Set the indices on which the update by query request is to be run - */ - public UpdateByQueryRequest setIndices(String... indices) { - if (indices != null) { - getSearchRequest().indices(indices); - } - return this; - } - /** * Set routing limiting the process to the shards that match that routing value */ diff --git a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java index 09ab76d9b8279..47449eb739199 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java @@ -34,7 +34,7 @@ public void testUpdateByQueryRequestImplementsIndicesRequestReplaceable() { IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()); UpdateByQueryRequest request = new UpdateByQueryRequest(); - request.setIndices(indices); + request.indices(indices); request.setIndicesOptions(indicesOptions); for (int i = 0; i < numIndices; i++) { assertEquals(indices[i], request.indices()[i]); @@ -59,7 +59,7 @@ public void testUpdateByQueryRequestImplementsIndicesRequestReplaceable() { @Override protected UpdateByQueryRequest newRequest() { - return new UpdateByQueryRequest().setIndices(randomAlphaOfLength(5)); + return new UpdateByQueryRequest(randomAlphaOfLength(5)); } @Override