From 8874e7d9b312a003e1ee8654d75c52c5d52beca2 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 7 Aug 2018 14:33:40 +0200 Subject: [PATCH 1/7] added reindex API --- .../client/RequestConverters.java | 12 + .../client/RestHighLevelClient.java | 29 ++ .../java/org/elasticsearch/client/CrudIT.java | 68 ++++ .../client/RequestConvertersTests.java | 58 +++ .../client/RestHighLevelClientTests.java | 1 - .../documentation/CRUDDocumentationIT.java | 149 ++++++++ .../high-level/document/reindex.asciidoc | 211 +++++++++++ .../high-level/supported-apis.asciidoc | 2 + .../index/reindex/RestReindexAction.java | 3 +- .../index/reindex/ReindexMetadataTests.java | 3 +- .../index/reindex/ReindexScriptTests.java | 3 +- .../index/reindex/RestReindexActionTests.java | 4 +- .../index/reindex/RoundTripTests.java | 3 +- .../action/bulk/BulkItemResponse.java | 35 +- .../index/reindex/BulkByScrollResponse.java | 129 ++++++- .../index/reindex/BulkByScrollTask.java | 348 +++++++++++++++++- .../index/reindex/ReindexRequest.java | 132 ++++++- .../index/reindex/RemoteInfo.java | 25 +- .../index/reindex/ScrollableHitSource.java | 13 +- .../search/builder/SearchSourceBuilder.java | 11 +- .../reindex/BulkByScrollResponseTests.java | 51 ++- ...ulkByScrollTaskStatusOrExceptionTests.java | 101 +++++ .../reindex/BulkByScrollTaskStatusTests.java | 92 ++++- .../index/reindex/ReindexRequestTests.java | 8 +- .../xpack/upgrade/InternalIndexReindexer.java | 10 +- 25 files changed, 1436 insertions(+), 65 deletions(-) create mode 100644 docs/java-rest/high-level/document/reindex.asciidoc create mode 100644 server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 1430f407135ea..2b74bbb2c1bf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -106,6 +106,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.protocol.xpack.license.GetLicenseRequest; @@ -812,6 +813,17 @@ static Request clusterHealth(ClusterHealthRequest healthRequest) { return request; } + static Request reindex(ReindexRequest reindexRequest) throws IOException { + String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request); + if (reindexRequest.getScrollTime() != null) { + params.putParam("scroll", reindexRequest.getScrollTime()); + } + request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 4fc3dd87df1a5..44797a9ac56cb 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -65,6 +65,8 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; @@ -371,6 +373,33 @@ public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Act performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet()); } + /** + * Executes a bulk request using the Bulk API. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes a bulk request using the Bulk API. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 89f357477fa06..c79a6ae24c9bc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -41,12 +41,16 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -58,6 +62,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; @@ -624,6 +629,69 @@ public void testBulk() throws IOException { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } + public void testReindex() throws IOException, InterruptedException { + final String sourceIndex = "source1"; + final String destinationIndex = "dest"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)), + RequestOptions.DEFAULT + ).status() + ); + } + { + TimeUnit.SECONDS.sleep(1); + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + TimeUnit.SECONDS.sleep(1); + // test2: create 1 and update 1 + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index e4aa690acb617..3a27527477415 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.RandomCreateIndexGenerator; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.QueryBuilders; @@ -125,6 +126,8 @@ import org.elasticsearch.index.rankeval.RankEvalSpec; import org.elasticsearch.index.rankeval.RatedRequest; import org.elasticsearch.index.rankeval.RestRankEvalAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; @@ -168,6 +171,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE; import static org.elasticsearch.client.RequestConverters.enforceSameContentType; @@ -175,6 +179,7 @@ import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest; import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings; import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.CoreMatchers.equalTo; @@ -403,6 +408,59 @@ public void testUpdateAliases() throws IOException { assertToXContentBody(indicesAliasesRequest, request.getEntity()); } + public void testReindex() throws IOException { + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices("source_idx"); + reindexRequest.setDestIndex("dest_idx"); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null, + BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)), + "user", + "pass", + emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, + RemoteInfo.DEFAULT_CONNECT_TIMEOUT + ); + reindexRequest.setRemoteInfo(remoteInfo); + } + if (randomBoolean()) { + reindexRequest.setSourceDocTypes("doc", "tweet"); + } + if (randomBoolean()) { + reindexRequest.setSourceBatchSize(randomInt(100)); + } + if (randomBoolean()) { + reindexRequest.setDestDocType("tweet_and_doc"); + } + if (randomBoolean()) { + reindexRequest.setDestOpType("create"); + } + if (randomBoolean()) { + reindexRequest.setDestPipeline("my_pipeline"); + } + if (randomBoolean()) { + reindexRequest.setDestRouting("=cat"); + } + if (randomBoolean()) { + reindexRequest.setSize(randomIntBetween(100, 1000)); + } + if (randomBoolean()) { + reindexRequest.setAbortOnVersionConflict(false); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + } + expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep()); + Request request = RequestConverters.reindex(reindexRequest); + assertEquals("/_reindex", request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(reindexRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index d04224bacf406..0c90d5b33c2e7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -660,7 +660,6 @@ public void testApiNamingConventions() throws Exception { "indices.put_alias", "mtermvectors", "put_script", - "reindex", "reindex_rethrottle", "render_search_template", "scripts_painless_execute", diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index ad41c139ddc37..bae66e4e83674 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -50,6 +50,8 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -59,13 +61,22 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; +import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.SortOrder; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -750,6 +761,144 @@ public void onFailure(Exception e) { } } + public void testReindex() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + createPipeline("my_pipeline"); + } + { + // tag::reindex-request + ReindexRequest request = new ReindexRequest(); // <1> + request.setSourceIndices("source1", "source2"); // <2> + request.setDestIndex("dest"); // <3> + // end::reindex-request + // tag::reindex-request-versionType + request.setDestVersionType(VersionType.EXTERNAL); // <1> + // end::reindex-request-versionType + // tag::reindex-request-opType + request.setDestOpType("create"); // <1> + // end::reindex-request-opType + // tag::reindex-request-conflicts + request.setConflicts("proceed"); // <1> + // end::reindex-request-conflicts + // tag::reindex-request-typeOrQuery + request.setSourceDocTypes("doc"); // <1> + request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::reindex-request-typeOrQuery + // tag::reindex-request-size + request.setSize(10); // <1> + // end::reindex-request-size + // tag::reindex-request-sourceSize + request.setSourceBatchSize(100); // <1> + // end::reindex-request-sourceSize + // tag::reindex-request-pipeline + request.setDestPipeline("my_pipeline"); // <1> + // end::reindex-request-pipeline + // tag::reindex-request-sort + request.addSortField("field1", SortOrder.DESC); // <1> + request.addSortField("field2", SortOrder.ASC); // <2> + // end::reindex-request-sort + // tag::reindex-request-script + request.setScript( + new Script( + ScriptType.INLINE, "painless", + "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", + Collections.emptyMap())); // <1> + // end::reindex-request-script + // tag::reindex-request-remote + request.setRemoteInfo( + new RemoteInfo( + "https", "localhost", 9002, null, new BytesArray(new MatchAllQueryBuilder().toString()), + "user", "pass", Collections.emptyMap(), new TimeValue(100, TimeUnit.MILLISECONDS), + new TimeValue(100, TimeUnit.SECONDS) + ) + ); // <1> + // end::reindex-request-remote + request.setRemoteInfo(null); // Remove it for tests + // tag::reindex-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::reindex-request-timeout + // tag::reindex-request-refresh + request.setRefresh(true); // <1> + // end::reindex-request-refresh + // tag::reindex-request-slices + request.setSlices(2); // <1> + // end::reindex-request-slices + // tag::reindex-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::reindex-request-scroll + + + // tag::reindex-execute + BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT); + // end::reindex-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::reindex-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long updatedDocs = bulkResponse.getUpdated(); // <4> + long createdDocs = bulkResponse.getCreated(); // <5> + long deletedDocs = bulkResponse.getDeleted(); // <6> + long batches = bulkResponse.getBatches(); // <7> + long noops = bulkResponse.getNoops(); // <8> + long versionConflicts = bulkResponse.getVersionConflicts(); // <9> + long bulkRetries = bulkResponse.getBulkRetries(); // <10> + long searchRetries = bulkResponse.getSearchRetries(); // <11> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <12> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <13> + List searchFailures = bulkResponse.getSearchFailures(); // <14> + List bulkFailures = bulkResponse.getBulkFailures(); // <15> + // end::reindex-response + } + { + ReindexRequest request = new ReindexRequest(); + request.setSourceIndices("source1"); + request.setDestIndex("dest"); + + // tag::reindex-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::reindex-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::reindex-execute-async + client.reindexAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::reindex-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/reindex.asciidoc b/docs/java-rest/high-level/document/reindex.asciidoc new file mode 100644 index 0000000000000..7c6a190ceae68 --- /dev/null +++ b/docs/java-rest/high-level/document/reindex.asciidoc @@ -0,0 +1,211 @@ +[[java-rest-high-document-reindex]] +=== Reindex API + +[[java-rest-high-document-reindex-request]] +==== Reindex Request + +A `ReindexRequest` can be used to copy documents from one or more indexes into a destination index. + +It requires an existing source index and a target index which may or may not exist pre-request. Reindex does not attempt +to set up the destination index. It does not copy the settings of the source index. You should set up the destination +index prior to running a _reindex action, including setting up mappings, shard counts, replicas, etc. + +The simplest form of a `ReindexRequest` looks like follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request] +-------------------------------------------------- +<1> Creates the `ReindexRequest` +<2> Adds a list of sources to copy from +<3> Adds the destination index + +The `dest` element can be configured like the index API to control optimistic concurrency control. Just leaving out +`versionType` (as above) or setting it to internal will cause Elasticsearch to blindly dump documents into the target. +Setting `versionType` to external will cause Elasticsearch to preserve the version from the source, create any documents +that are missing, and update any documents that have an older version in the destination index than they do in the +source index. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-versionType] +-------------------------------------------------- +<1> Set the versionType to `EXTERNAL` + +Settings `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing +documents will cause a version conflict. The default `opType` is `index`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-opType] +-------------------------------------------------- +<1> Set the opType to `create` + +By default version conflicts abort the `_reindex` process but you can just count them by settings it to `proceed` +in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `_reindex` uses batches of 1000. You can change the batch size with `sourceBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sourceSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +Reindex can also use the ingest feature by specifying a `pipeline`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-pipeline] +-------------------------------------------------- +<1> set pipeline to `my_pipeline` + +If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more +selective query to size and sort. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sort] +-------------------------------------------------- +<1> add descending sort to`field1` +<2> add ascending sort to `field2` + +`ReindexRequest` also supports a `script` that modifies the document. It allows you to also change the document's +metadata. The following example illustrates that. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-script] +-------------------------------------------------- +<1> `setScript` to bump the version of the source document + +`ReindexRequest` supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be +specified inside the `RemoteInfo` object and not using `setSourceQuery`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-remote] +-------------------------------------------------- +<1> set remote elastic cluster + +`ReindexRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`ReindexRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-scroll] +-------------------------------------------------- +<1> set scroll time + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the reindex request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling reindex + + +[[java-rest-high-document-reindex-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute] +-------------------------------------------------- + +[[java-rest-high-document-reindex-async]] +==== Asynchronous Execution + +The asynchronous execution of a reindex request requires both the `ReindexRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-async] +-------------------------------------------------- +<1> The `ReindexRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `ReindexRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-reindex-response]] +==== Reindex Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were updated +<5> Number of docs that were created +<6> Number of docs that were deleted +<7> Number of batches that were executed +<8> Number of skipped docs +<9> Number of version conflicts +<10> Number of times request had to retry bulk index operations +<11> Number of times request had to retry search operations +<12> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<13> Remaining delay of any current throttle sleep or 0 if not sleeping +<14> Failures during search phase +<15> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 1acba88222641..2377e9f989757 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -15,6 +15,7 @@ Single document APIs:: Multi-document APIs:: * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -23,6 +24,7 @@ include::document/delete.asciidoc[] include::document/update.asciidoc[] include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] +include::document/reindex.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index a5520c90b0ff5..50d01535d7ff0 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -118,7 +117,7 @@ protected ReindexRequest buildRequest(RestRequest request) throws IOException { throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. " + "Specify it in the [dest] object instead."); } - ReindexRequest internal = new ReindexRequest(new SearchRequest(), new IndexRequest()); + ReindexRequest internal = new ReindexRequest(); try (XContentParser parser = request.contentParser()) { PARSER.parse(parser, internal, null); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java index 4611f9dcbcddb..ec34da777b533 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.settings.Settings; /** @@ -73,7 +72,7 @@ protected TestAction action() { @Override protected ReindexRequest request() { - return new ReindexRequest(new SearchRequest(), new IndexRequest()); + return new ReindexRequest(); } private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java index 6d3ce558c7567..a90b60357c4ff 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; @@ -100,7 +99,7 @@ public void testSetRouting() throws Exception { @Override protected ReindexRequest request() { - return new ReindexRequest(new SearchRequest(), new IndexRequest()); + return new ReindexRequest(); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index b06948b90581a..70e29ed12c5b4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -144,7 +142,7 @@ public void testReindexFromRemoteRequestParsing() throws IOException { request = BytesReference.bytes(b); } try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) { - ReindexRequest r = new ReindexRequest(new SearchRequest(), new IndexRequest()); + ReindexRequest r = new ReindexRequest(); RestReindexAction.PARSER.parse(p, r, null); assertEquals("localhost", r.getRemoteInfo().getHost()); assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices()); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 97809c9bc8dc3..46aa6df120f3e 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.Version; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -47,7 +46,7 @@ */ public class RoundTripTests extends ESTestCase { public void testReindexRequest() throws IOException { - ReindexRequest reindex = new ReindexRequest(new SearchRequest(), new IndexRequest()); + ReindexRequest reindex = new ReindexRequest(); randomRequest(reindex); reindex.getDestination().version(randomFrom(Versions.MATCH_ANY, Versions.MATCH_DELETED, 12L, 1L, 123124L, 12L)); reindex.getDestination().index("test"); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index fb535d312cf65..c0615064de1be 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -28,11 +28,13 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -42,6 +44,8 @@ import java.io.IOException; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField; @@ -161,11 +165,11 @@ public static BulkItemResponse fromXContent(XContentParser parser, int id) throw * Represents a failure. */ public static class Failure implements Writeable, ToXContentFragment { - static final String INDEX_FIELD = "index"; - static final String TYPE_FIELD = "type"; - static final String ID_FIELD = "id"; - static final String CAUSE_FIELD = "cause"; - static final String STATUS_FIELD = "status"; + public static final String INDEX_FIELD = "index"; + public static final String TYPE_FIELD = "type"; + public static final String ID_FIELD = "id"; + public static final String CAUSE_FIELD = "cause"; + public static final String STATUS_FIELD = "status"; private final String index; private final String type; @@ -175,6 +179,23 @@ public static class Failure implements Writeable, ToXContentFragment { private final long seqNo; private final boolean aborted; + public static ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "bulk_failures", + true, + a -> + new Failure( + (String)a[0], (String)a[1], (String)a[2], (Exception)a[3], RestStatus.fromCode((int)a[4]) + ) + ); + static { + PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD)); + PARSER.declareString(constructorArg(), new ParseField(TYPE_FIELD)); + PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD)); + PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD)); + } + /** * For write failures before operation was assigned a sequence number. * @@ -322,6 +343,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static Failure fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + @Override public String toString() { return Strings.toString(this); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index ac206c2c44f06..e37bd57d08b43 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -19,23 +19,35 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /** * Response used for actions that index many documents using a scroll request. @@ -47,6 +59,41 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr private List searchFailures; private boolean timedOut; + private static final String TOOK_FIELD = "took"; + private static final String TIMED_OUT_FIELD = "timed_out"; + private static final String FAILURES_FIELD = "failures"; + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "bulk_by_scroll_response", + true, + a -> { + TimeValue took = new TimeValue((long)a[0], TimeUnit.MILLISECONDS); + boolean timedOut = (Boolean)a[1]; + // start deciphering failures + ArrayList failures = a[2] != null ? (ArrayList) a[2] : new ArrayList<>(); + Status status = Status.constructFromObjectArray(a, 3); + List bulkFailures = new ArrayList<>(); + List searchFailures = new ArrayList<>(); + for (Object object: failures) { + if (object instanceof Failure) { + bulkFailures.add((Failure) object); + } else if (object instanceof SearchFailure) { + searchFailures.add((SearchFailure) object); + } + } + return new BulkByScrollResponse(took, status, bulkFailures, searchFailures, timedOut); + } + ); + static { + PARSER.declareLong(constructorArg(), new ParseField(TOOK_FIELD)); + PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT_FIELD)); + PARSER.declareObjectArray(constructorArg(), (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD)); + // since the result of BulkByScrollResponse.Status are mixed we also parse that in this + Status.declareFields(PARSER); + } + public BulkByScrollResponse() { } @@ -87,6 +134,10 @@ public long getCreated() { return status.getCreated(); } + public long getTotal() { + return status.getTotal(); + } + public long getDeleted() { return status.getDeleted(); } @@ -171,8 +222,8 @@ public void readFrom(StreamInput in) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("took", took.millis()); - builder.field("timed_out", timedOut); + builder.field(TOOK_FIELD, took.millis()); + builder.field(TIMED_OUT_FIELD, timedOut); status.innerXContent(builder, params); builder.startArray("failures"); for (Failure failure: bulkFailures) { @@ -187,6 +238,80 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static BulkByScrollResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private static Object parseFailure(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + Token token; + String index = null; + String type = null; + String id = null; + Integer status = null; + Integer shardId = null; + String nodeId = null; + ElasticsearchException bulkExc = null; + ElasticsearchException searchExc = null; + while ((token = parser.nextToken()) != Token.END_OBJECT) { + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + String name = parser.currentName(); + token = parser.nextToken(); + if (token == Token.START_ARRAY) { + parser.skipChildren(); + } else if (token == Token.START_OBJECT) { + switch (name) { + case SearchFailure.REASON_FIELD: + bulkExc = ElasticsearchException.fromXContent(parser); + break; + case Failure.CAUSE_FIELD: + searchExc = ElasticsearchException.fromXContent(parser); + break; + default: + parser.skipChildren(); + } + } else if (token == Token.VALUE_STRING) { + switch (name) { + // This field is the same as SearchFailure.index + case Failure.INDEX_FIELD: + index = parser.text(); + break; + case Failure.TYPE_FIELD: + type = parser.text(); + break; + case Failure.ID_FIELD: + id = parser.text(); + break; + case SearchFailure.NODE_FIELD: + nodeId = parser.text(); + break; + default: + // Do nothing + break; + } + } else if (token == Token.VALUE_NUMBER) { + switch (name) { + case Failure.STATUS_FIELD: + status = parser.intValue(); + break; + case SearchFailure.SHARD_FIELD: + shardId = parser.intValue(); + break; + default: + // Do nothing + break; + } + } + } + if (bulkExc != null) { + return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status)); + } else if (searchExc != null) { + return new SearchFailure(searchExc, index, shardId, nodeId); + } else { + throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present"); + } + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 9ff26b13212c7..2b2bf853474aa 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -22,27 +22,40 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParseException; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; import static java.lang.Math.min; import static java.util.Collections.emptyList; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Task storing information about a currently running BulkByScroll request. @@ -204,6 +217,115 @@ public static class Status implements Task.Status, SuccessfullyProcessed { */ public static final String INCLUDE_UPDATED = "include_updated"; + public static final String SLICE_ID_FIELD = "slice_id"; + public static final String TOTAL_FIELD = "total"; + public static final String UPDATED_FIELD = "updated"; + public static final String CREATED_FIELD = "created"; + public static final String DELETED_FIELD = "deleted"; + public static final String BATCHES_FIELD = "batches"; + public static final String VERSION_CONFLICTS_FIELD = "version_conflicts"; + public static final String NOOPS_FIELD = "noops"; + public static final String RETRIES_FIELD = "retries"; + public static final String RETRIES_BULK_FIELD = "bulk"; + public static final String RETRIES_SEARCH_FIELD = "search"; + public static final String THROTTLED_RAW_FIELD = "throttled_millis"; + public static final String THROTTLED_HR_FIELD = "throttled"; + public static final String REQUESTS_PER_SEC_FIELD = "requests_per_second"; + public static final String CANCELED_FIELD = "canceled"; + public static final String THROTTLED_UNTIL_RAW_FIELD = "throttled_until_millis"; + public static final String THROTTLED_UNTIL_HR_FIELD = "throttled_until"; + public static final String SLICES_FIELD = "slices"; + + public static Set FIELDS_SET = new HashSet<>(); + static { + FIELDS_SET.add(SLICE_ID_FIELD); + FIELDS_SET.add(TOTAL_FIELD); + FIELDS_SET.add(UPDATED_FIELD); + FIELDS_SET.add(CREATED_FIELD); + FIELDS_SET.add(DELETED_FIELD); + FIELDS_SET.add(BATCHES_FIELD); + FIELDS_SET.add(VERSION_CONFLICTS_FIELD); + FIELDS_SET.add(NOOPS_FIELD); + FIELDS_SET.add(RETRIES_FIELD); + // No need for inner level fields for retries in the set of outer level fields + FIELDS_SET.add(THROTTLED_RAW_FIELD); + FIELDS_SET.add(THROTTLED_HR_FIELD); + FIELDS_SET.add(REQUESTS_PER_SEC_FIELD); + FIELDS_SET.add(CANCELED_FIELD); + FIELDS_SET.add(THROTTLED_UNTIL_RAW_FIELD); + FIELDS_SET.add(THROTTLED_UNTIL_HR_FIELD); + FIELDS_SET.add(SLICES_FIELD); + } + + @SuppressWarnings("unchecked") + static ConstructingObjectParser, Void> RETRIES_PARSER = new ConstructingObjectParser<>( + "bulk_by_scroll_task_status_retries", + true, + a -> new Tuple(a[0], a[1]) + ); + static { + RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_BULK_FIELD)); + RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_SEARCH_FIELD)); + } + + /** + * Tries to construct the object based on the order on which fields were declared + * in {@link #declareFields} + * @param a the array of objects returned by ObjectParser + * @param startIndex the startIndex from which to start reading in this array + * @return the constructed Status object + */ + @SuppressWarnings("unchecked") + public static Status constructFromObjectArray(Object[] a, int startIndex) { + Integer sliceId = (Integer) a[startIndex]; + Long total = (Long) a[startIndex + 1]; + Long updated = (Long) a[startIndex + 2]; + Long created = (Long) a[startIndex + 3]; + Long deleted = (Long) a[startIndex + 4]; + Integer batches = (Integer) a[startIndex + 5]; + Long versionConflicts = (Long) a[startIndex + 6]; + Long noops = (Long) a[startIndex + 7]; + Tuple retries = (Tuple) a[startIndex + 8]; + Long bulkRetries = retries.v1(); + Long searchRetries = retries.v2(); + TimeValue throttled = + a[startIndex + 9] != null ? new TimeValue((Long)a[startIndex + 9], TimeUnit.MILLISECONDS) : null; + Float requestsPerSecond = (Float) a[startIndex + 10]; + requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + String reasonCancelled = (String) a[startIndex + 11]; + TimeValue throttledUntil = + a[startIndex + 12] != null ? new TimeValue((Long) a[startIndex + 12], TimeUnit.MILLISECONDS) : null; + List sliceStatuses = + a[startIndex + 13] != null ? (ArrayList) a[startIndex + 13] : emptyList(); + if (sliceStatuses.isEmpty()) { + return new Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil + ); + } else { + return new Status(sliceStatuses, reasonCancelled); + } + } + + public static void declareFields(ConstructingObjectParser parser) { + parser.declareInt(optionalConstructorArg(), new ParseField(SLICE_ID_FIELD)); + parser.declareLong(constructorArg(), new ParseField(TOTAL_FIELD)); + parser.declareLong(optionalConstructorArg(), new ParseField(UPDATED_FIELD)); + parser.declareLong(optionalConstructorArg(), new ParseField(CREATED_FIELD)); + parser.declareLong(constructorArg(), new ParseField(DELETED_FIELD)); + parser.declareInt(constructorArg(), new ParseField(BATCHES_FIELD)); + parser.declareLong(constructorArg(), new ParseField(VERSION_CONFLICTS_FIELD)); + parser.declareLong(constructorArg(), new ParseField(NOOPS_FIELD)); + parser.declareObject(constructorArg(), RETRIES_PARSER, new ParseField(RETRIES_FIELD)); + parser.declareLong(constructorArg(), new ParseField(THROTTLED_RAW_FIELD)); + parser.declareFloat(constructorArg(), new ParseField(REQUESTS_PER_SEC_FIELD)); + parser.declareString(optionalConstructorArg(), new ParseField(CANCELED_FIELD)); + parser.declareLong(constructorArg(), new ParseField(THROTTLED_UNTIL_RAW_FIELD)); + parser.declareObjectArray( + optionalConstructorArg(), (p, c) -> StatusOrException.fromXContent(p), new ParseField(SLICES_FIELD) + ); + } + private final Integer sliceId; private final long total; private final long updated; @@ -369,32 +491,32 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public XContentBuilder innerXContent(XContentBuilder builder, Params params) throws IOException { if (sliceId != null) { - builder.field("slice_id", sliceId); + builder.field(SLICE_ID_FIELD, sliceId); } - builder.field("total", total); + builder.field(TOTAL_FIELD, total); if (params.paramAsBoolean(INCLUDE_UPDATED, true)) { - builder.field("updated", updated); + builder.field(UPDATED_FIELD, updated); } if (params.paramAsBoolean(INCLUDE_CREATED, true)) { - builder.field("created", created); + builder.field(CREATED_FIELD, created); } - builder.field("deleted", deleted); - builder.field("batches", batches); - builder.field("version_conflicts", versionConflicts); - builder.field("noops", noops); - builder.startObject("retries"); { - builder.field("bulk", bulkRetries); - builder.field("search", searchRetries); + builder.field(DELETED_FIELD, deleted); + builder.field(BATCHES_FIELD, batches); + builder.field(VERSION_CONFLICTS_FIELD, versionConflicts); + builder.field(NOOPS_FIELD, noops); + builder.startObject(RETRIES_FIELD); { + builder.field(RETRIES_BULK_FIELD, bulkRetries); + builder.field(RETRIES_SEARCH_FIELD, searchRetries); } builder.endObject(); - builder.humanReadableField("throttled_millis", "throttled", throttled); - builder.field("requests_per_second", requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond); + builder.humanReadableField(THROTTLED_RAW_FIELD, THROTTLED_HR_FIELD, throttled); + builder.field(REQUESTS_PER_SEC_FIELD, requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond); if (reasonCancelled != null) { - builder.field("canceled", reasonCancelled); + builder.field(CANCELED_FIELD, reasonCancelled); } - builder.humanReadableField("throttled_until_millis", "throttled_until", throttledUntil); + builder.humanReadableField(THROTTLED_UNTIL_RAW_FIELD, THROTTLED_UNTIL_HR_FIELD, throttledUntil); if (false == sliceStatuses.isEmpty()) { - builder.startArray("slices"); + builder.startArray(SLICES_FIELD); for (StatusOrException slice : sliceStatuses) { if (slice == null) { builder.nullValue(); @@ -407,6 +529,114 @@ public XContentBuilder innerXContent(XContentBuilder builder, Params params) return builder; } + public static Status fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + if (parser.currentToken() == Token.START_OBJECT) { + token = parser.nextToken(); + } else { + token = parser.nextToken(); + } + ensureExpectedToken(Token.START_OBJECT, token, parser::getTokenLocation); + token = parser.nextToken(); + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + return innerFromXContent(parser); + } + + public static Status innerFromXContent(XContentParser parser) throws IOException { + Token token = parser.currentToken(); + String fieldName = parser.currentName(); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + Integer sliceId = null; + Long total = null; + Long updated = null; + Long created = null; + Long deleted = null; + Integer batches = null; + Long versionConflicts = null; + Long noOps = null; + Long bulkRetries = null; + Long searchRetries = null; + TimeValue throttled = null; + Float requestsPerSecond = null; + String reasonCancelled = null; + TimeValue throttledUntil = null; + List sliceStatuses = new ArrayList<>(); + while ((token = parser.nextToken()) != Token.END_OBJECT) { + if (token == Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == Token.START_OBJECT) { + if (fieldName.equals(Status.RETRIES_FIELD)) { + Tuple retries = + Status.RETRIES_PARSER.parse(parser, null); + bulkRetries = retries.v1(); + searchRetries = retries.v2(); + } else { + parser.skipChildren(); + } + } else if (token == Token.START_ARRAY) { + if (fieldName.equals(Status.SLICES_FIELD)) { + while ((token = parser.nextToken()) != Token.END_ARRAY) { + sliceStatuses.add(StatusOrException.fromXContent(parser)); + } + } else { + parser.skipChildren(); + } + } else { // else if it is a value + switch (fieldName) { + case Status.SLICE_ID_FIELD: + sliceId = parser.intValue(); + break; + case Status.TOTAL_FIELD: + total = parser.longValue(); + break; + case Status.UPDATED_FIELD: + updated = parser.longValue(); + break; + case Status.CREATED_FIELD: + created = parser.longValue(); + break; + case Status.DELETED_FIELD: + deleted = parser.longValue(); + break; + case Status.BATCHES_FIELD: + batches = parser.intValue(); + break; + case Status.VERSION_CONFLICTS_FIELD: + versionConflicts = parser.longValue(); + break; + case Status.NOOPS_FIELD: + noOps = parser.longValue(); + break; + case Status.THROTTLED_RAW_FIELD: + throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + break; + case Status.REQUESTS_PER_SEC_FIELD: + requestsPerSecond = parser.floatValue(); + requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + break; + case Status.CANCELED_FIELD: + reasonCancelled = parser.text(); + break; + case Status.THROTTLED_UNTIL_RAW_FIELD: + throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + break; + default: + break; + } + } + } + if (sliceStatuses.isEmpty()) { + return + new Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries, + searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil + ); + } else { + return new Status(sliceStatuses, reasonCancelled); + } + + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -533,6 +763,44 @@ public List getSliceStatuses() { return sliceStatuses; } + @Override + public int hashCode() { + return Objects.hash( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, searchRetries, + bulkRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil, sliceStatuses + ); + } + + public boolean equalsWithoutSliceStatus(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status other = (Status) o; + return + Objects.equals(sliceId, other.sliceId) && + total == other.total && + updated == other.updated && + created == other.created && + deleted == other.deleted && + batches == other.batches && + versionConflicts == other.versionConflicts && + noops == other.noops && + searchRetries == other.searchRetries && + bulkRetries == other.bulkRetries && + Objects.equals(throttled, other.throttled) && + requestsPerSecond == other.requestsPerSecond && + Objects.equals(reasonCancelled, other.reasonCancelled) && + Objects.equals(throttledUntil, other.throttledUntil); + } + + @Override + public boolean equals(Object o) { + if (equalsWithoutSliceStatus(o)) { + return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses); + } else { + return false; + } + } + private int checkPositive(int value, String name) { if (value < 0) { throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]"); @@ -556,6 +824,19 @@ public static class StatusOrException implements Writeable, ToXContentObject { private final Status status; private final Exception exception; + public static Set EXPECTED_EXCEPTION_FIELDS = new HashSet<>(); + static { + EXPECTED_EXCEPTION_FIELDS.add("type"); + EXPECTED_EXCEPTION_FIELDS.add("reason"); + EXPECTED_EXCEPTION_FIELDS.add("caused_by"); + EXPECTED_EXCEPTION_FIELDS.add("suppressed"); + EXPECTED_EXCEPTION_FIELDS.add("stack_trace"); + EXPECTED_EXCEPTION_FIELDS.add("header"); + EXPECTED_EXCEPTION_FIELDS.add("error"); + EXPECTED_EXCEPTION_FIELDS.add("root_cause"); + } + + public StatusOrException(Status status) { this.status = status; exception = null; @@ -610,6 +891,41 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static StatusOrException fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + if (token == Token.VALUE_NULL) { + return null; + } else { + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation); + token = parser.nextToken(); + // This loop is present only to ignore unknown tokens. It breaks as soon as we find a field + // that is allowed. + while (token != Token.END_OBJECT) { + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + String fieldName = parser.currentName(); + // weird way to ignore unknown tokens + if (Status.FIELDS_SET.contains(fieldName)) { + return new StatusOrException( + Status.innerFromXContent(parser) + ); + } else if (EXPECTED_EXCEPTION_FIELDS.contains(fieldName)){ + return new StatusOrException(ElasticsearchException.innerFromXContent(parser, false)); + } else { + // Ignore unknown tokens + token = parser.nextToken(); + if (token == Token.START_OBJECT || token == Token.START_ARRAY) { + parser.skipChildren(); + } + token = parser.nextToken(); + } + } + throw new XContentParseException("Unable to parse StatusFromException. Expected fields not found."); + } + } + @Override public boolean equals(Object obj) { if (obj == null) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index e45d039edaeae..7214003789df1 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -26,6 +26,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -39,7 +44,8 @@ * of reasons, not least of which that scripts are allowed to change the destination request in drastic ways, including changing the index * to which documents are written. */ -public class ReindexRequest extends AbstractBulkIndexByScrollRequest implements CompositeIndicesRequest { +public class ReindexRequest extends AbstractBulkIndexByScrollRequest + implements CompositeIndicesRequest, ToXContentObject { /** * Prototype for index requests. */ @@ -48,9 +54,10 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest 0) { + builder.field("size", getSize()); + } + if (getScript() != null) { + builder.field("script", getScript()); + } + if (isAbortOnVersionConflict() == false) { + builder.field("conflicts", "proceed"); + } + } + builder.endObject(); + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java b/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java index 70f79a9def605..e30afcb8da2a2 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.HashMap; @@ -35,7 +37,7 @@ import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -public class RemoteInfo implements Writeable { +public class RemoteInfo implements Writeable, ToXContentObject { /** * Default {@link #socketTimeout} for requests that don't have one set. */ @@ -197,4 +199,25 @@ public String toString() { } return b.toString(); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (username != null) { + builder.field("username", username); + } + if (password != null) { + builder.field("password", password); + } + builder.field("host", scheme + "://" + host + ":" + port + + (pathPrefix == null ? "" : "/" + pathPrefix)); + if (headers.size() >0 ) { + builder.field("headers", headers); + } + builder.field("socket_timeout", socketTimeout.getStringRep()); + builder.field("connect_timeout", connectTimeout.getStringRep()); + builder.field("query", query); + builder.endObject(); + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 917b57a9c9745..a3901bb7a568b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -284,6 +284,11 @@ public static class SearchFailure implements Writeable, ToXContentObject { @Nullable private final String nodeId; + public static final String INDEX_FIELD = "index"; + public static final String SHARD_FIELD = "shard"; + public static final String NODE_FIELD = "node"; + public static final String REASON_FIELD = "reason"; + public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) { this.index = index; this.shardId = shardId; @@ -337,15 +342,15 @@ public String getNodeId() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (index != null) { - builder.field("index", index); + builder.field(INDEX_FIELD, index); } if (shardId != null) { - builder.field("shard", shardId); + builder.field(SHARD_FIELD, shardId); } if (nodeId != null) { - builder.field("node", nodeId); + builder.field(NODE_FIELD, nodeId); } - builder.field("reason"); + builder.field(REASON_FIELD); { builder.startObject(); ElasticsearchException.generateThrowableXContent(builder, params, reason); diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index c42a1a12a1877..706999327dbc6 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -1154,9 +1154,7 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th } } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { if (from != -1) { builder.field(FROM_FIELD.getPreferredName(), from); } @@ -1294,6 +1292,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (collapse != null) { builder.field(COLLAPSE.getPreferredName(), collapse); } + return builder; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder, params); builder.endObject(); return builder; } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java index a288328391a9d..0dd4d6bc84974 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java @@ -24,7 +24,8 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; import java.util.List; @@ -33,8 +34,9 @@ import static java.util.Collections.singletonList; import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.hamcrest.Matchers.containsString; -public class BulkByScrollResponseTests extends ESTestCase { +public class BulkByScrollResponseTests extends AbstractXContentTestCase { public void testRountTrip() throws IOException { BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()), @@ -94,4 +96,49 @@ private void assertResponseEquals(BulkByScrollResponse expected, BulkByScrollRes assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage()); } } + + @Override + protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) { + assertEquals(expected.getTook(), actual.getTook()); + BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size()); + for (int i = 0; i < expected.getBulkFailures().size(); i++) { + Failure expectedFailure = expected.getBulkFailures().get(i); + Failure actualFailure = actual.getBulkFailures().get(i); + assertEquals(expectedFailure.getIndex(), actualFailure.getIndex()); + assertEquals(expectedFailure.getType(), actualFailure.getType()); + assertEquals(expectedFailure.getId(), actualFailure.getId()); + assertThat(expectedFailure.getMessage(), containsString(actualFailure.getMessage())); + assertEquals(expectedFailure.getStatus(), actualFailure.getStatus()); + } + assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size()); + for (int i = 0; i < expected.getSearchFailures().size(); i++) { + ScrollableHitSource.SearchFailure expectedFailure = expected.getSearchFailures().get(i); + ScrollableHitSource.SearchFailure actualFailure = actual.getSearchFailures().get(i); + assertEquals(expectedFailure.getIndex(), actualFailure.getIndex()); + assertEquals(expectedFailure.getShardId(), actualFailure.getShardId()); + assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId()); + assertThat(expectedFailure.getReason().getMessage(), containsString(actualFailure.getReason().getMessage())); + } + } + + @Override + protected BulkByScrollResponse createTestInstance() { + // failures are tested separately, so we can test XContent equivalence at least when we have no failures + return + new BulkByScrollResponse( + timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatusWithoutException(), + emptyList(), emptyList(), randomBoolean() + ); + } + + @Override + protected BulkByScrollResponse doParseInstance(XContentParser parser) throws IOException { + return BulkByScrollResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java new file mode 100644 index 0000000000000..33c56bacd9121 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.index.reindex.BulkByScrollTask.StatusOrException; + +import java.io.IOException; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.containsString; + +public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTestCase { + @Override + protected StatusOrException createTestInstance() { + // failures are tested separately, so we can test XContent equivalence at least when we have no failures + return createTestInstanceWithoutExceptions(); + } + + static StatusOrException createTestInstanceWithoutExceptions() { + return new StatusOrException(BulkByScrollTaskStatusTests.randomStatusWithoutException()); + } + + static StatusOrException createTestInstanceWithExceptions() { + if (randomBoolean()) { + return new StatusOrException(new ElasticsearchException("test_exception")); + } else { + return new StatusOrException(BulkByScrollTaskStatusTests.randomStatus()); + } + } + + @Override + protected StatusOrException doParseInstance(XContentParser parser) throws IOException { + return StatusOrException.fromXContent(parser); + } + + public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) { + if (expected != null && actual != null) { + assertNotSame(expected, actual); + if (expected.getException() == null) { + BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + } else { + assertThat( + actual.getException().getMessage(), + containsString(expected.getException().getMessage()) + ); + } + } else { + // If one of them is null both of them should be null + assertSame(expected, actual); + } + } + + @Override + protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) { + assertEqualStatusOrException(expected, actual); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + /** + * Test parsing {@link StatusOrException} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = BulkByScrollTaskStatusOrExceptionTests::createTestInstanceWithExceptions; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java index 9e5383a259adc..88d20820b56ee 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java @@ -23,22 +23,29 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; import org.hamcrest.Matchers; +import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.IntStream; import static java.lang.Math.abs; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; import static org.apache.lucene.util.TestUtil.randomSimpleString; -import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; +import static org.hamcrest.Matchers.equalTo; -public class BulkByScrollTaskStatusTests extends ESTestCase { +public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase { public void testBulkByTaskStatus() throws IOException { BulkByScrollTask.Status status = randomStatus(); BytesStreamOutput out = new BytesStreamOutput(); @@ -113,6 +120,22 @@ public static BulkByScrollTask.Status randomStatus() { return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); } + public static BulkByScrollTask.Status randomStatusWithoutException() { + if (randomBoolean()) { + return randomWorkingStatus(null); + } + boolean canHaveNullStatues = randomBoolean(); + List statuses = IntStream.range(0, between(0, 10)) + .mapToObj(i -> { + if (canHaveNullStatues && LuceneTestCase.rarely()) { + return null; + } + return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i)); + }) + .collect(toList()); + return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); + } + private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { // These all should be believably small because we sum them if we have multiple workers int total = between(0, 10000000); @@ -124,8 +147,65 @@ private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { long versionConflicts = between(0, total); long bulkRetries = between(0, 10000000); long searchRetries = between(0, 100000); - return new BulkByScrollTask.Status(sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, - searchRetries, parseTimeValue(randomPositiveTimeValue(), "test"), abs(Randomness.get().nextFloat()), - randomBoolean() ? null : randomSimpleString(Randomness.get()), parseTimeValue(randomPositiveTimeValue(), "test")); + // smallest unit of time during toXContent is Milliseconds + TimeUnit[] timeUnits = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS}; + TimeValue throttled = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + TimeValue throttledUntil = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + return + new BulkByScrollTask.Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, + bulkRetries, searchRetries, throttled, abs(Randomness.get().nextFloat()), + randomBoolean() ? null : randomSimpleString(Randomness.get()), throttledUntil + ); + } + + public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { + assertNotSame(expected, actual); + assertTrue(expected.equalsWithoutSliceStatus(actual)); + assertThat(expected.getSliceStatuses().size(), equalTo(actual.getSliceStatuses().size())); + for (int i = 0; i< expected.getSliceStatuses().size(); i++) { + BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException( + expected.getSliceStatuses().get(i), + actual.getSliceStatuses().get(i) + ); + } + } + + @Override + protected void assertEqualInstances(BulkByScrollTask.Status first, BulkByScrollTask.Status second) { + assertEqualStatus(first, second); + } + + @Override + protected BulkByScrollTask.Status createTestInstance() { + // failures are tested separately, so we can test xcontent equivalence at least when we have no failures + return randomStatusWithoutException(); + } + + @Override + protected BulkByScrollTask.Status doParseInstance(XContentParser parser) throws IOException { + return BulkByScrollTask.Status.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + /** + * Test parsing {@link Status} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = BulkByScrollTaskStatusTests::randomStatus; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 6c1988a1440e9..1c3d539263e7d 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -20,8 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.search.slice.SliceBuilder; @@ -89,9 +87,9 @@ protected void extraForSliceAssertions(ReindexRequest original, ReindexRequest f @Override protected ReindexRequest newRequest() { - ReindexRequest reindex = new ReindexRequest(new SearchRequest(), new IndexRequest()); - reindex.getSearchRequest().indices("source"); - reindex.getDestination().index("dest"); + ReindexRequest reindex = new ReindexRequest(); + reindex.setSourceIndices("source"); + reindex.setDestIndex("dest"); return reindex; } } diff --git a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java index 7e1e31919b1fc..e905d93d0ed35 100644 --- a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java +++ b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -7,8 +7,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; @@ -116,10 +114,10 @@ private void removeReadOnlyBlock(ParentTaskAssigningClient parentAwareClient, St private void reindex(ParentTaskAssigningClient parentAwareClient, String index, String newIndex, ActionListener listener) { - SearchRequest sourceRequest = new SearchRequest(index); - sourceRequest.types(types); - IndexRequest destinationRequest = new IndexRequest(newIndex); - ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest); + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(index); + reindexRequest.setSourceDocTypes(types); + reindexRequest.setDestIndex(newIndex); reindexRequest.setRefresh(true); reindexRequest.setScript(transformScript); parentAwareClient.execute(ReindexAction.INSTANCE, reindexRequest, listener); From 7c09364fc02cf7a32eb2dffff78521ae37d55088 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 7 Aug 2018 15:21:48 +0200 Subject: [PATCH 2/7] Fixed CrudIT to not use sleep --- .../src/test/java/org/elasticsearch/client/CrudIT.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index c79a6ae24c9bc..7978d76c56d70 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -62,7 +62,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; @@ -629,7 +628,7 @@ public void testBulk() throws IOException { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } - public void testReindex() throws IOException, InterruptedException { + public void testReindex() throws IOException { final String sourceIndex = "source1"; final String destinationIndex = "dest"; { @@ -647,18 +646,19 @@ public void testReindex() throws IOException, InterruptedException { .add(new IndexRequest(sourceIndex, "type", "1") .source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) .add(new IndexRequest(sourceIndex, "type", "2") - .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)), + .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT ).status() ); } { - TimeUnit.SECONDS.sleep(1); // test1: create one doc in dest ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndex); reindexRequest.setDestIndex(destinationIndex); reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); assertEquals(1, bulkResponse.getCreated()); assertEquals(1, bulkResponse.getTotal()); @@ -672,7 +672,6 @@ public void testReindex() throws IOException, InterruptedException { assertEquals(0, bulkResponse.getSearchFailures().size()); } { - TimeUnit.SECONDS.sleep(1); // test2: create 1 and update 1 ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndex); From a42a4192ba37671e2050a2bd112b99e0919a7396 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Wed, 8 Aug 2018 01:48:20 +0200 Subject: [PATCH 3/7] Added statusbuilder object --- .../index/reindex/BulkByScrollTask.java | 153 ++++++++++++++---- 1 file changed, 125 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 2b2bf853474aa..388f6fea12bde 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -202,6 +202,115 @@ public boolean shouldCancelChildrenOnCancellation() { return true; } + private static class StatusBuilder { + private Integer sliceId = null; + private Long total = null; + private Long updated = null; + private Long created = null; + private Long deleted = null; + private Integer batches = null; + private Long versionConflicts = null; + private Long noops = null; + private Long bulkRetries = null; + private Long searchRetries = null; + private TimeValue throttled = null; + private Float requestsPerSecond = null; + private String reasonCancelled = null; + private TimeValue throttledUntil = null; + private List sliceStatuses = emptyList(); + + public void setSliceId(Integer sliceId) { + this.sliceId = sliceId; + } + + public void setTotal(Long total) { + this.total = total; + } + + public void setUpdated(Long updated) { + this.updated = updated; + } + + public void setCreated(Long created) { + this.created = created; + } + + public void setDeleted(Long deleted) { + this.deleted = deleted; + } + + public void setBatches(Integer batches) { + this.batches = batches; + } + + public void setVersionConflicts(Long versionConflicts) { + this.versionConflicts = versionConflicts; + } + + public void setNoops(Long noops) { + this.noops = noops; + } + + public void setRetries(Tuple retries) { + if (retries != null) { + setBulkRetries(retries.v1()); + setSearchRetries(retries.v2()); + } + } + + public void setBulkRetries(Long bulkRetries) { + this.bulkRetries = bulkRetries; + } + + public void setSearchRetries(Long searchRetries) { + this.searchRetries = searchRetries; + } + + public void setThrottled(Long throttled) { + if (throttled != null) { + this.throttled = new TimeValue(throttled, TimeUnit.MILLISECONDS); + } + } + + public void setRequestsPerSecond(Float requestsPerSecond) { + if (requestsPerSecond != null) { + requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + this.requestsPerSecond = requestsPerSecond; + } + } + + public void setReasonCancelled(String reasonCancelled) { + this.reasonCancelled = reasonCancelled; + } + + public void setThrottledUntil(Long throttledUntil) { + if (throttledUntil != null) { + this.throttledUntil = new TimeValue(throttledUntil, TimeUnit.MILLISECONDS); + } + } + + public void setSliceStatuses(List sliceStatuses) { + if (sliceStatuses != null) { + this.sliceStatuses = sliceStatuses; + } + } + + public Status build() { + if (sliceStatuses.isEmpty()) { + try { + return new Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil + ); + } catch (NullPointerException npe) { + throw new IllegalArgumentException("a required field is null when building Status"); + } + } else { + return new Status(sliceStatuses, reasonCancelled); + } + } + } + public static class Status implements Task.Status, SuccessfullyProcessed { public static final String NAME = "bulk-by-scroll"; @@ -277,34 +386,22 @@ public static class Status implements Task.Status, SuccessfullyProcessed { */ @SuppressWarnings("unchecked") public static Status constructFromObjectArray(Object[] a, int startIndex) { - Integer sliceId = (Integer) a[startIndex]; - Long total = (Long) a[startIndex + 1]; - Long updated = (Long) a[startIndex + 2]; - Long created = (Long) a[startIndex + 3]; - Long deleted = (Long) a[startIndex + 4]; - Integer batches = (Integer) a[startIndex + 5]; - Long versionConflicts = (Long) a[startIndex + 6]; - Long noops = (Long) a[startIndex + 7]; - Tuple retries = (Tuple) a[startIndex + 8]; - Long bulkRetries = retries.v1(); - Long searchRetries = retries.v2(); - TimeValue throttled = - a[startIndex + 9] != null ? new TimeValue((Long)a[startIndex + 9], TimeUnit.MILLISECONDS) : null; - Float requestsPerSecond = (Float) a[startIndex + 10]; - requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; - String reasonCancelled = (String) a[startIndex + 11]; - TimeValue throttledUntil = - a[startIndex + 12] != null ? new TimeValue((Long) a[startIndex + 12], TimeUnit.MILLISECONDS) : null; - List sliceStatuses = - a[startIndex + 13] != null ? (ArrayList) a[startIndex + 13] : emptyList(); - if (sliceStatuses.isEmpty()) { - return new Status( - sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, - searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil - ); - } else { - return new Status(sliceStatuses, reasonCancelled); - } + StatusBuilder builder = new StatusBuilder(); + builder.setSliceId((Integer) a[startIndex]); + builder.setTotal((Long) a[startIndex + 1]); + builder.setUpdated((Long) a[startIndex + 2]); + builder.setCreated((Long) a[startIndex + 3]); + builder.setDeleted((Long) a[startIndex + 4]); + builder.setBatches((Integer) a[startIndex + 5]); + builder.setVersionConflicts((Long) a[startIndex + 6]); + builder.setNoops((Long) a[startIndex + 7]); + builder.setRetries((Tuple) a[startIndex + 8]); + builder.setThrottled((Long)a[startIndex + 9]); + builder.setRequestsPerSecond((Float) a[startIndex + 10]); + builder.setReasonCancelled((String) a[startIndex + 11]); + builder.setThrottledUntil((Long) a[startIndex + 12]); + builder.setSliceStatuses((ArrayList) a[startIndex + 13]); + return builder.build(); } public static void declareFields(ConstructingObjectParser parser) { From 40b108fe86ec06e377a3d1ac2fc92bd8891543a9 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Wed, 8 Aug 2018 12:52:38 +0200 Subject: [PATCH 4/7] Changed to ObjectParser from ConstructingObjectParser --- .../index/reindex/BulkByScrollResponse.java | 36 +++------- .../reindex/BulkByScrollResponseBuilder.java | 72 +++++++++++++++++++ .../index/reindex/BulkByScrollTask.java | 63 +++++----------- 3 files changed, 100 insertions(+), 71 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java index e37bd57d08b43..7fe60db2ddda7 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponse.java @@ -23,13 +23,13 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -40,14 +40,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /** * Response used for actions that index many documents using a scroll request. @@ -64,32 +62,18 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr private static final String FAILURES_FIELD = "failures"; @SuppressWarnings("unchecked") - private static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>( + private static final ObjectParser PARSER = + new ObjectParser<>( "bulk_by_scroll_response", true, - a -> { - TimeValue took = new TimeValue((long)a[0], TimeUnit.MILLISECONDS); - boolean timedOut = (Boolean)a[1]; - // start deciphering failures - ArrayList failures = a[2] != null ? (ArrayList) a[2] : new ArrayList<>(); - Status status = Status.constructFromObjectArray(a, 3); - List bulkFailures = new ArrayList<>(); - List searchFailures = new ArrayList<>(); - for (Object object: failures) { - if (object instanceof Failure) { - bulkFailures.add((Failure) object); - } else if (object instanceof SearchFailure) { - searchFailures.add((SearchFailure) object); - } - } - return new BulkByScrollResponse(took, status, bulkFailures, searchFailures, timedOut); - } + BulkByScrollResponseBuilder::new ); static { - PARSER.declareLong(constructorArg(), new ParseField(TOOK_FIELD)); - PARSER.declareBoolean(constructorArg(), new ParseField(TIMED_OUT_FIELD)); - PARSER.declareObjectArray(constructorArg(), (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD)); + PARSER.declareLong(BulkByScrollResponseBuilder::setTook, new ParseField(TOOK_FIELD)); + PARSER.declareBoolean(BulkByScrollResponseBuilder::setTimedOut, new ParseField(TIMED_OUT_FIELD)); + PARSER.declareObjectArray( + BulkByScrollResponseBuilder::setFailures, (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD) + ); // since the result of BulkByScrollResponse.Status are mixed we also parse that in this Status.declareFields(PARSER); } @@ -239,7 +223,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } public static BulkByScrollResponse fromXContent(XContentParser parser) { - return PARSER.apply(parser, null); + return PARSER.apply(parser, null).buildResponse(); } private static Object parseFailure(XContentParser parser) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java new file mode 100644 index 0000000000000..f542ca3ddf1ad --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; +import org.elasticsearch.index.reindex.BulkByScrollTask.StatusBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class BulkByScrollResponseBuilder extends StatusBuilder { + private TimeValue took; + private BulkByScrollTask.Status status; + private List bulkFailures = new ArrayList<>(); + private List searchFailures = new ArrayList<>(); + private boolean timedOut; + + public BulkByScrollResponseBuilder() {} + + public void setTook(long took) { + setTook(new TimeValue(took, TimeUnit.MILLISECONDS)); + } + + public void setTook(TimeValue took) { + this.took = took; + } + + public void setStatus(BulkByScrollTask.Status status) { + this.status = status; + } + + public void setFailures(List failures) { + if (failures != null) { + for (Object object: failures) { + if (object instanceof Failure) { + bulkFailures.add((Failure) object); + } else if (object instanceof SearchFailure) { + searchFailures.add((SearchFailure) object); + } + } + } + } + + public void setTimedOut(boolean timedOut) { + this.timedOut = timedOut; + } + + public BulkByScrollResponse buildResponse() { + status = super.buildStatus(); + return new BulkByScrollResponse(took, status, bulkFailures, searchFailures, timedOut); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 388f6fea12bde..f009ba9cd58b4 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParseException; @@ -54,7 +55,6 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -202,7 +202,7 @@ public boolean shouldCancelChildrenOnCancellation() { return true; } - private static class StatusBuilder { + public static class StatusBuilder { private Integer sliceId = null; private Long total = null; private Long updated = null; @@ -295,7 +295,7 @@ public void setSliceStatuses(List sliceStatuses) { } } - public Status build() { + public Status buildStatus() { if (sliceStatuses.isEmpty()) { try { return new Status( @@ -377,49 +377,22 @@ public static class Status implements Task.Status, SuccessfullyProcessed { RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_SEARCH_FIELD)); } - /** - * Tries to construct the object based on the order on which fields were declared - * in {@link #declareFields} - * @param a the array of objects returned by ObjectParser - * @param startIndex the startIndex from which to start reading in this array - * @return the constructed Status object - */ - @SuppressWarnings("unchecked") - public static Status constructFromObjectArray(Object[] a, int startIndex) { - StatusBuilder builder = new StatusBuilder(); - builder.setSliceId((Integer) a[startIndex]); - builder.setTotal((Long) a[startIndex + 1]); - builder.setUpdated((Long) a[startIndex + 2]); - builder.setCreated((Long) a[startIndex + 3]); - builder.setDeleted((Long) a[startIndex + 4]); - builder.setBatches((Integer) a[startIndex + 5]); - builder.setVersionConflicts((Long) a[startIndex + 6]); - builder.setNoops((Long) a[startIndex + 7]); - builder.setRetries((Tuple) a[startIndex + 8]); - builder.setThrottled((Long)a[startIndex + 9]); - builder.setRequestsPerSecond((Float) a[startIndex + 10]); - builder.setReasonCancelled((String) a[startIndex + 11]); - builder.setThrottledUntil((Long) a[startIndex + 12]); - builder.setSliceStatuses((ArrayList) a[startIndex + 13]); - return builder.build(); - } - - public static void declareFields(ConstructingObjectParser parser) { - parser.declareInt(optionalConstructorArg(), new ParseField(SLICE_ID_FIELD)); - parser.declareLong(constructorArg(), new ParseField(TOTAL_FIELD)); - parser.declareLong(optionalConstructorArg(), new ParseField(UPDATED_FIELD)); - parser.declareLong(optionalConstructorArg(), new ParseField(CREATED_FIELD)); - parser.declareLong(constructorArg(), new ParseField(DELETED_FIELD)); - parser.declareInt(constructorArg(), new ParseField(BATCHES_FIELD)); - parser.declareLong(constructorArg(), new ParseField(VERSION_CONFLICTS_FIELD)); - parser.declareLong(constructorArg(), new ParseField(NOOPS_FIELD)); - parser.declareObject(constructorArg(), RETRIES_PARSER, new ParseField(RETRIES_FIELD)); - parser.declareLong(constructorArg(), new ParseField(THROTTLED_RAW_FIELD)); - parser.declareFloat(constructorArg(), new ParseField(REQUESTS_PER_SEC_FIELD)); - parser.declareString(optionalConstructorArg(), new ParseField(CANCELED_FIELD)); - parser.declareLong(constructorArg(), new ParseField(THROTTLED_UNTIL_RAW_FIELD)); + public static void declareFields(ObjectParser parser) { + parser.declareInt(StatusBuilder::setSliceId, new ParseField(SLICE_ID_FIELD)); + parser.declareLong(StatusBuilder::setTotal, new ParseField(TOTAL_FIELD)); + parser.declareLong(StatusBuilder::setUpdated, new ParseField(UPDATED_FIELD)); + parser.declareLong(StatusBuilder::setCreated, new ParseField(CREATED_FIELD)); + parser.declareLong(StatusBuilder::setDeleted, new ParseField(DELETED_FIELD)); + parser.declareInt(StatusBuilder::setBatches, new ParseField(BATCHES_FIELD)); + parser.declareLong(StatusBuilder::setVersionConflicts, new ParseField(VERSION_CONFLICTS_FIELD)); + parser.declareLong(StatusBuilder::setNoops, new ParseField(NOOPS_FIELD)); + parser.declareObject(StatusBuilder::setRetries, RETRIES_PARSER, new ParseField(RETRIES_FIELD)); + parser.declareLong(StatusBuilder::setThrottled, new ParseField(THROTTLED_RAW_FIELD)); + parser.declareFloat(StatusBuilder::setRequestsPerSecond, new ParseField(REQUESTS_PER_SEC_FIELD)); + parser.declareString(StatusBuilder::setReasonCancelled, new ParseField(CANCELED_FIELD)); + parser.declareLong(StatusBuilder::setThrottledUntil, new ParseField(THROTTLED_UNTIL_RAW_FIELD)); parser.declareObjectArray( - optionalConstructorArg(), (p, c) -> StatusOrException.fromXContent(p), new ParseField(SLICES_FIELD) + StatusBuilder::setSliceStatuses, (p, c) -> StatusOrException.fromXContent(p), new ParseField(SLICES_FIELD) ); } From 3346b40352b2f7bcb7f874732c4a954c1b3ac03c Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Wed, 8 Aug 2018 13:15:42 +0200 Subject: [PATCH 5/7] changed documentation --- .../client/documentation/CRUDDocumentationIT.java | 2 +- docs/java-rest/high-level/document/reindex.asciidoc | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index bae66e4e83674..9c69a2a48361a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -818,7 +818,7 @@ public void testReindex() throws Exception { request.setScript( new Script( ScriptType.INLINE, "painless", - "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", + "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", Collections.emptyMap())); // <1> // end::reindex-request-script // tag::reindex-request-remote diff --git a/docs/java-rest/high-level/document/reindex.asciidoc b/docs/java-rest/high-level/document/reindex.asciidoc index 7c6a190ceae68..b6d98b42dc509 100644 --- a/docs/java-rest/high-level/document/reindex.asciidoc +++ b/docs/java-rest/high-level/document/reindex.asciidoc @@ -32,7 +32,7 @@ include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-versionType -------------------------------------------------- <1> Set the versionType to `EXTERNAL` -Settings `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing +Setting `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing documents will cause a version conflict. The default `opType` is `index`. ["source","java",subs="attributes,callouts,macros"] @@ -100,10 +100,14 @@ metadata. The following example illustrates that. -------------------------------------------------- include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-script] -------------------------------------------------- -<1> `setScript` to bump the version of the source document +<1> `setScript` to increment the `likes` field on all documents with user `kimchy`. `ReindexRequest` supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be -specified inside the `RemoteInfo` object and not using `setSourceQuery`. +specified inside the `RemoteInfo` object and not using `setSourceQuery`. If both the remote info and the source query are +set it results in a validation error during the request. The reason for this is that the remote Elasticsearch may not +understand queries built by the modern query builders. The remote cluster support works all the way back to Elasticsearch +0.90 and the query language has changed since then. When reaching older versions, it is safer to write the query by hand +in JSON. ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- From 34cbed697ef81a54259cd73dac5d15cf46ca5749 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 10 Aug 2018 01:16:44 +0200 Subject: [PATCH 6/7] Added javadoc --- .../client/RestHighLevelClient.java | 4 +- .../reindex/BulkByScrollResponseBuilder.java | 8 +++- .../index/reindex/BulkByScrollTask.java | 17 +++++++ .../index/reindex/ReindexRequest.java | 48 +++++++++++++++++-- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 44797a9ac56cb..61ee0854fcab0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -374,7 +374,7 @@ public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Act } /** - * Executes a bulk request using the Bulk API. + * Executes a reindex request. * See Reindex API on elastic.co * @param reindexRequest the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized @@ -388,7 +388,7 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request } /** - * Asynchronously executes a bulk request using the Bulk API. + * Asynchronously executes a reindex request. * See Reindex API on elastic.co * @param reindexRequest the request * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java index f542ca3ddf1ad..ad5bfd6e03cdf 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.index.reindex.BulkByScrollTask.StatusBuilder; @@ -28,14 +29,17 @@ import java.util.List; import java.util.concurrent.TimeUnit; -public class BulkByScrollResponseBuilder extends StatusBuilder { +/** + * Helps build a {@link BulkByScrollResponse}. Used by an instance of {@link ObjectParser} when parsing from XContent. + */ +class BulkByScrollResponseBuilder extends StatusBuilder { private TimeValue took; private BulkByScrollTask.Status status; private List bulkFailures = new ArrayList<>(); private List searchFailures = new ArrayList<>(); private boolean timedOut; - public BulkByScrollResponseBuilder() {} + BulkByScrollResponseBuilder() {} public void setTook(long took) { setTook(new TimeValue(took, TimeUnit.MILLISECONDS)); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index f009ba9cd58b4..77c4635094b8b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -202,6 +202,11 @@ public boolean shouldCancelChildrenOnCancellation() { return true; } + /** + * This class acts as a builder for {@link Status}. Once the {@link Status} object is built by calling + * {@link #buildStatus()} it is immutable. Used by an instance of {@link ObjectParser} when parsing from + * XContent. + */ public static class StatusBuilder { private Integer sliceId = null; private Long total = null; @@ -558,6 +563,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } + /** + * We need to write a manual parser for this because of {@link StatusOrException}. Since + * {@link StatusOrException#fromXContent(XContentParser)} tries to peek at a field first before deciding + * what needs to be it cannot use an {@link ObjectParser}. + */ public XContentBuilder innerXContent(XContentBuilder builder, Params params) throws IOException { if (sliceId != null) { @@ -961,6 +971,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + /** + * Since {@link StatusOrException} can contain either an {@link Exception} or a {@link Status} we need to peek + * at a field first before deciding what needs to be parsed since the same object could contains either. + * The {@link #EXPECTED_EXCEPTION_FIELDS} contains the fields that are expected when the serialised object + * was an instance of exception and the {@link Status#FIELDS_SET} is the set of fields expected when the + * serialized object was an instance of Status. + */ public static StatusOrException fromXContent(XContentParser parser) throws IOException { XContentParser.Token token = parser.currentToken(); if (token == null) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 7214003789df1..52a1c89d4b3f5 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -45,7 +45,7 @@ * to which documents are written. */ public class ReindexRequest extends AbstractBulkIndexByScrollRequest - implements CompositeIndicesRequest, ToXContentObject { + implements CompositeIndicesRequest, ToXContentObject { /** * Prototype for index requests. */ @@ -128,6 +128,9 @@ private boolean routingIsValid() { } } + /** + * Set the indices which will act as the source for the ReindexRequest + */ public ReindexRequest setSourceIndices(String... sourceIndices) { if (sourceIndices != null) { this.getSearchRequest().indices(sourceIndices); @@ -135,6 +138,9 @@ public ReindexRequest setSourceIndices(String... sourceIndices) { return this; } + /** + * Set the document types which need to be copied from the source indices + */ public ReindexRequest setSourceDocTypes(String... docTypes) { if (docTypes != null) { this.getSearchRequest().types(docTypes); @@ -142,11 +148,17 @@ public ReindexRequest setSourceDocTypes(String... docTypes) { return this; } + /** + * Sets the scroll size for setting how many documents are to be processed in one batch during reindex + */ public ReindexRequest setSourceBatchSize(int size) { this.getSearchRequest().source().size(size); return this; } + /** + * Set the query for selecting documents from the source indices + */ public ReindexRequest setSourceQuery(QueryBuilder queryBuilder) { if (queryBuilder != null) { this.getSearchRequest().source().query(queryBuilder); @@ -158,12 +170,16 @@ public ReindexRequest setSourceQuery(QueryBuilder queryBuilder) { * Add a sort against the given field name. * * @param name The name of the field to sort by + * @param order The order in which to sort */ public ReindexRequest addSortField(String name, SortOrder order) { this.getSearchRequest().source().sort(name, order); return this; } + /** + * Set the target index for the ReindexRequest + */ public ReindexRequest setDestIndex(String destIndex) { if (destIndex != null) { this.getDestination().index(destIndex); @@ -171,21 +187,34 @@ public ReindexRequest setDestIndex(String destIndex) { return this; } + /** + * Set the document type for the destination index + */ public ReindexRequest setDestDocType(String docType) { this.getDestination().type(docType); return this; } + /** + * Set the routing to decide which shard the documents need to be routed to + */ public ReindexRequest setDestRouting(String routing) { this.getDestination().routing(routing); return this; } + /** + * Set the version type for the target index. A {@link VersionType#EXTERNAL} helps preserve the version + * if the document already existed in the target index. + */ public ReindexRequest setDestVersionType(VersionType versionType) { this.getDestination().versionType(versionType); return this; } + /** + * Allows to set the ingest pipeline for the target index. + */ public void setDestPipeline(String pipelineName) { this.getDestination().setPipeline(pipelineName); } @@ -199,15 +228,24 @@ public ReindexRequest setDestOpType(String opType) { return this; } - public IndexRequest getDestination() { - return destination; - } - + /** + * Set the {@link RemoteInfo} if the source indices are in a remote cluster. + */ public ReindexRequest setRemoteInfo(RemoteInfo remoteInfo) { this.remoteInfo = remoteInfo; return this; } + /** + * Gets the target for this reindex request in the for of an {@link IndexRequest} + */ + public IndexRequest getDestination() { + return destination; + } + + /** + * Get the {@link RemoteInfo} if it was set for this request. + */ public RemoteInfo getRemoteInfo() { return remoteInfo; } From 4fa987c3bd6f47074b6cb5619aa257f3314f6a1c Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 10 Aug 2018 16:14:16 +0200 Subject: [PATCH 7/7] Added options for request converters --- .../java/org/elasticsearch/client/RequestConverters.java | 6 +++++- .../org/elasticsearch/client/RequestConvertersTests.java | 5 +++++ .../index/reindex/AbstractBulkByScrollRequest.java | 8 ++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 2b74bbb2c1bf3..cd60d78108ff6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -816,7 +816,11 @@ static Request clusterHealth(ClusterHealthRequest healthRequest) { static Request reindex(ReindexRequest reindexRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); Request request = new Request(HttpPost.METHOD_NAME, endpoint); - Params params = new Params(request); + Params params = new Params(request) + .withRefresh(reindexRequest.isRefresh()) + .withTimeout(reindexRequest.getTimeout()) + .withWaitForActiveShards(reindexRequest.getWaitForActiveShards()); + if (reindexRequest.getScrollTime() != null) { params.putParam("scroll", reindexRequest.getScrollTime()); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 3a27527477415..ac26378736636 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -453,6 +453,11 @@ public void testReindex() throws IOException { String ts = randomTimeValue(); reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); } + if (reindexRequest.getRemoteInfo() == null && randomBoolean()) { + reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval")); + } + setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams); expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep()); Request request = RequestConverters.reindex(reindexRequest); assertEquals("/_reindex", request.getEndpoint()); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 8536337bfdbc2..3b635c8238784 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -252,6 +252,14 @@ public Self setTimeout(TimeValue timeout) { return self(); } + /** + * Timeout to wait for the shards on to be available for each bulk request? + */ + public Self setTimeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, this.timeout, getClass().getSimpleName() + ".timeout"); + return self(); + } + /** * The number of shard copies that must be active before proceeding with the write. */