From ec3c93d42f01195fff36816d3e20f4de24049865 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Mon, 29 Oct 2018 17:24:50 +0100 Subject: [PATCH 1/9] HLRC: reindex API with wait_for_completion false Extend High Level Rest Client Reindex API to support requests with wait_for_completion=false. This method will return a TaskID and results can be queried with Task API refers: #27205 --- .../client/RequestConverters.java | 16 ++- .../client/RestHighLevelClient.java | 15 ++ .../reindex/ReindexSubmissionResponse.java | 89 ++++++++++++ .../org/elasticsearch/client/ReindexIT.java | 134 ++++++++++++++++++ 4 files changed, 249 insertions(+), 5 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 3acb8d8297306..f1d37691a796c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -474,9 +474,18 @@ static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException { } static Request reindex(ReindexRequest reindexRequest) throws IOException { + return prepareReindexRequest(reindexRequest, true); + } + + static Request submitReindex(ReindexRequest reindexRequest) throws IOException { + return prepareReindexRequest(reindexRequest, false); + } + + private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException { String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); Request request = new Request(HttpPost.METHOD_NAME, endpoint); Params params = new Params(request) + .withWaitForCompletion(waitForCompletion) .withRefresh(reindexRequest.isRefresh()) .withTimeout(reindexRequest.getTimeout()) .withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) @@ -885,11 +894,8 @@ Params withDetailed(boolean detailed) { return this; } - Params withWaitForCompletion(boolean waitForCompletion) { - if (waitForCompletion) { - return putParam("wait_for_completion", Boolean.TRUE.toString()); - } - return this; + Params withWaitForCompletion(Boolean waitForCompletion) { + return putParam("wait_for_completion", waitForCompletion.toString()); } Params withNodes(String[] nodes) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index bd1adf634a518..7f0d7644d675b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -58,6 +58,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.core.TermVectorsRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.ParseField; @@ -447,6 +448,20 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request ); } + /** + * Submits a reindex task. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the submission response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet() + ); + } + /** * Asynchronously executes a reindex request. * See Reindex API on elastic.co diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java new file mode 100644 index 0000000000000..522e48f3c2e6b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.reindex; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Objects; + +public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject { + private static final ParseField TASK = new ParseField("task"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "reindex_submission_response", + true, a -> new ReindexSubmissionResponse((TaskId) a[0])); + + static { + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final TaskId task; + + ReindexSubmissionResponse(@Nullable TaskId task) { + this.task = task; + } + + /** + * Get the task id + * + * @return the id of the reindex task. + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return Objects.hash(task); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + ReindexSubmissionResponse that = (ReindexSubmissionResponse) other; + return Objects.equals(task, that.task); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java new file mode 100644 index 0000000000000..87b5c03a319a8 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Collections; +import java.util.function.BooleanSupplier; + +public class ReindexIT extends ESRestHighLevelClientTestCase { + + //TODO taken from CrudIT - should these scenarios be moved here? + public void testReindex() throws IOException { + final String sourceIndex = "source1"; + final String destinationIndex = "dest"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + bulkRequest, + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + } + + public void testReindexTask() throws IOException, InterruptedException { + final String sourceIndex = "source123"; + final String destinationIndex = "dest2"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + bulkRequest, + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); + + ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + + BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); + awaitBusy(hasUpgradeCompleted); + } + } + + private BooleanSupplier checkCompletionStatus(TaskId taskId) { + return () -> { + try { + Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId.toString())); + return (boolean) entityAsMap(response).get("completed"); + } catch (IOException e) { + fail(e.getMessage()); + return false; + } + }; + } +} From 3cc288dedb6ebafa742ec5506df2e31db5dfa8a8 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Sat, 3 Nov 2018 18:35:01 +0100 Subject: [PATCH 2/9] changes due ot wait for completion flag --- .../client/RequestConvertersTests.java | 1 + .../SnapshotRequestConvertersTests.java | 66 +++++++++---------- .../client/TasksRequestConvertersTests.java | 10 ++- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 4640ab56599b9..367ca2858b426 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -347,6 +347,7 @@ public void testReindex() throws IOException { setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams); expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep()); + expectedParams.put("wait_for_completion", Boolean.TRUE.toString()); Request request = RequestConverters.reindex(reindexRequest); assertEquals("/_reindex", request.getEndpoint()); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotRequestConvertersTests.java index efd321aa7ee34..ca86a9120422b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotRequestConvertersTests.java @@ -51,7 +51,7 @@ import static org.hamcrest.Matchers.nullValue; public class SnapshotRequestConvertersTests extends ESTestCase { - + public void testGetRepositories() { Map expectedParams = new HashMap<>(); StringBuilder endpoint = new StringBuilder("/_snapshot"); @@ -61,14 +61,14 @@ public void testGetRepositories() { RequestConvertersTests.setRandomLocal(getRepositoriesRequest, expectedParams); if (randomBoolean()) { - String[] entries = new String[] { "a", "b", "c" }; + String[] entries = new String[]{"a", "b", "c"}; getRepositoriesRequest.repositories(entries); endpoint.append("/" + String.join(",", entries)); } Request request = SnapshotRequestConverters.getRepositories(getRepositoriesRequest); - assertThat(endpoint.toString(), equalTo(request.getEndpoint())); - assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod())); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME)); assertThat(expectedParams, equalTo(request.getParameters())); } @@ -88,8 +88,8 @@ public void testCreateRepository() throws IOException { .build()); Request request = SnapshotRequestConverters.createRepository(putRepositoryRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpPut.METHOD_NAME, equalTo(request.getMethod())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpPut.METHOD_NAME)); RequestConvertersTests.assertToXContentBody(putRepositoryRequest, request.getEntity()); } @@ -105,9 +105,9 @@ public void testDeleteRepository() { RequestConvertersTests.setRandomTimeout(deleteRepositoryRequest::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams); Request request = SnapshotRequestConverters.deleteRepository(deleteRepositoryRequest); - assertThat(endpoint.toString(), equalTo(request.getEndpoint())); - assertThat(HttpDelete.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); assertNull(request.getEntity()); } @@ -121,9 +121,9 @@ public void testVerifyRepository() { RequestConvertersTests.setRandomTimeout(verifyRepositoryRequest::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams); Request request = SnapshotRequestConverters.verifyRepository(verifyRepositoryRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpPost.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); } public void testCreateSnapshot() throws IOException { @@ -137,14 +137,12 @@ public void testCreateSnapshot() throws IOException { Boolean waitForCompletion = randomBoolean(); createSnapshotRequest.waitForCompletion(waitForCompletion); - if (waitForCompletion) { - expectedParams.put("wait_for_completion", waitForCompletion.toString()); - } + expectedParams.put("wait_for_completion", waitForCompletion.toString()); Request request = SnapshotRequestConverters.createSnapshot(createSnapshotRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpPut.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpPut.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); RequestConvertersTests.assertToXContentBody(createSnapshotRequest, request.getEntity()); } @@ -178,9 +176,9 @@ public void testGetSnapshots() { } Request request = SnapshotRequestConverters.getSnapshots(getSnapshotsRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); assertNull(request.getEntity()); } @@ -202,9 +200,9 @@ public void testGetAllSnapshots() { expectedParams.put("verbose", Boolean.toString(verbose)); Request request = SnapshotRequestConverters.getSnapshots(getSnapshotsRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); assertNull(request.getEntity()); } @@ -239,10 +237,10 @@ public void testRestoreSnapshot() throws IOException { RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repository, snapshot); RequestConvertersTests.setRandomMasterTimeout(restoreSnapshotRequest, expectedParams); - if (randomBoolean()) { - restoreSnapshotRequest.waitForCompletion(true); - expectedParams.put("wait_for_completion", "true"); - } + boolean waitForCompletion = randomBoolean(); + restoreSnapshotRequest.waitForCompletion(waitForCompletion); + expectedParams.put("wait_for_completion", Boolean.toString(waitForCompletion)); + if (randomBoolean()) { String timeout = randomTimeValue(); restoreSnapshotRequest.masterNodeTimeout(timeout); @@ -250,9 +248,9 @@ public void testRestoreSnapshot() throws IOException { } Request request = SnapshotRequestConverters.restoreSnapshot(restoreSnapshotRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpPost.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); RequestConvertersTests.assertToXContentBody(restoreSnapshotRequest, request.getEntity()); } @@ -269,9 +267,9 @@ public void testDeleteSnapshot() { RequestConvertersTests.setRandomMasterTimeout(deleteSnapshotRequest, expectedParams); Request request = SnapshotRequestConverters.deleteSnapshot(deleteSnapshotRequest); - assertThat(endpoint, equalTo(request.getEndpoint())); - assertThat(HttpDelete.METHOD_NAME, equalTo(request.getMethod())); - assertThat(expectedParams, equalTo(request.getParameters())); + assertThat(request.getEndpoint(), equalTo(endpoint)); + assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME)); + assertThat(request.getParameters(), equalTo(expectedParams)); assertNull(request.getEntity()); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java index ff6726faee18d..4b7889d3b7e7a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksRequestConvertersTests.java @@ -62,12 +62,10 @@ public void testListTasks() { expectedParams.put("detailed", "true"); } } - if (randomBoolean()) { - request.setWaitForCompletion(randomBoolean()); - if (request.getWaitForCompletion()) { - expectedParams.put("wait_for_completion", "true"); - } - } + + request.setWaitForCompletion(randomBoolean()); + expectedParams.put("wait_for_completion", Boolean.toString(request.getWaitForCompletion())); + if (randomBoolean()) { String timeout = randomTimeValue(); request.setTimeout(timeout); From 0aa129488b46b4b78d8740c367d01f1518e3ca41 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Mon, 5 Nov 2018 09:34:59 +0100 Subject: [PATCH 3/9] cleanup docs --- .../java/org/elasticsearch/client/CrudIT.java | 105 ------------------ .../org/elasticsearch/client/ReindexIT.java | 1 - 2 files changed, 106 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 1dd27cff0d92a..ee0fde29869a1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -706,111 +706,6 @@ public void testBulk() throws IOException { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } - public void testReindex() throws Exception { - final String sourceIndex = "source1"; - final String destinationIndex = "dest"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - createIndex(destinationIndex, settings); - BulkRequest bulkRequest = new BulkRequest() - .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) - .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) - .setRefreshPolicy(RefreshPolicy.IMMEDIATE); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - bulkRequest, - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: create one doc in dest - ReindexRequest reindexRequest = new ReindexRequest(); - reindexRequest.setSourceIndices(sourceIndex); - reindexRequest.setDestIndex(destinationIndex); - reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); - reindexRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); - assertEquals(1, bulkResponse.getCreated()); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - } - { - // test2: create 1 and update 1 - ReindexRequest reindexRequest = new ReindexRequest(); - reindexRequest.setSourceIndices(sourceIndex); - reindexRequest.setDestIndex(destinationIndex); - BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); - assertEquals(1, bulkResponse.getCreated()); - assertEquals(2, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - } - { - // test reindex rethrottling - ReindexRequest reindexRequest = new ReindexRequest(); - reindexRequest.setSourceIndices(sourceIndex); - reindexRequest.setDestIndex(destinationIndex); - - // this following settings are supposed to halt reindexing after first document - reindexRequest.setSourceBatchSize(1); - reindexRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch reindexTaskFinished = new CountDownLatch(1); - highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - reindexTaskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - reindexTaskFinished.await(2, TimeUnit.SECONDS); - - // any rethrottling after the reindex is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - private TaskId findTaskToRethrottle(String actionName) throws IOException { long start = System.nanoTime(); ListTasksRequest request = new ListTasksRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 87b5c03a319a8..c368b462a229e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -37,7 +37,6 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { - //TODO taken from CrudIT - should these scenarios be moved here? public void testReindex() throws IOException { final String sourceIndex = "source1"; final String destinationIndex = "dest"; From 686f10ad26545e302d583f2224e7495b4b21901c Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Mon, 5 Nov 2018 09:37:44 +0100 Subject: [PATCH 4/9] refactor to return taskID from submission --- .../client/RestHighLevelClient.java | 26 +++--- .../reindex/ReindexSubmissionResponse.java | 89 ------------------- .../java/org/elasticsearch/client/CrudIT.java | 2 - .../org/elasticsearch/client/ReindexIT.java | 5 +- .../java/org/elasticsearch/tasks/TaskId.java | 17 ++++ 5 files changed, 32 insertions(+), 107 deletions(-) delete mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 7f0d7644d675b..5d15e3f31eaaa 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -56,9 +56,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.core.TermVectorsRequest; -import org.elasticsearch.client.reindex.ReindexSubmissionResponse; +import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.ParseField; @@ -135,6 +134,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MedianAbsoluteDeviationAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; @@ -144,6 +144,7 @@ import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; import org.elasticsearch.search.aggregations.metrics.ParsedMax; +import org.elasticsearch.search.aggregations.metrics.ParsedMedianAbsoluteDeviation; import org.elasticsearch.search.aggregations.metrics.ParsedMin; import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; import org.elasticsearch.search.aggregations.metrics.ParsedStats; @@ -157,20 +158,18 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.MedianAbsoluteDeviationAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedMedianAbsoluteDeviation; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; +import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket; -import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; +import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; @@ -178,6 +177,7 @@ import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder; import org.elasticsearch.search.suggest.term.TermSuggestion; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; +import org.elasticsearch.tasks.TaskId; import java.io.Closeable; import java.io.IOException; @@ -456,9 +456,9 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request * @return the submission response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + public final TaskId submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet() + reindexRequest, RequestConverters::submitReindex, options, TaskId::fromXContent, emptySet() ); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java deleted file mode 100644 index 522e48f3c2e6b..0000000000000 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.client.reindex; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.TaskId; - -import java.io.IOException; -import java.util.Objects; - -public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject { - private static final ParseField TASK = new ParseField("task"); - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "reindex_submission_response", - true, a -> new ReindexSubmissionResponse((TaskId) a[0])); - - static { - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); - } - - public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - - private final TaskId task; - - ReindexSubmissionResponse(@Nullable TaskId task) { - this.task = task; - } - - /** - * Get the task id - * - * @return the id of the reindex task. - */ - public TaskId getTask() { - return task; - } - - @Override - public int hashCode() { - return Objects.hash(task); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null || getClass() != other.getClass()) { - return false; - } - ReindexSubmissionResponse that = (ReindexSubmissionResponse) other; - return Objects.equals(task, that.task); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (task != null) { - builder.field(TASK.getPreferredName(), task.toString()); - } - builder.endObject(); - return builder; - } -} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index ee0fde29869a1..fed0e8921569c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -60,8 +60,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.ReindexAction; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index c368b462a229e..52b9f212c463c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.reindex.ReindexSubmissionResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -112,9 +111,9 @@ public void testReindexTask() throws IOException, InterruptedException { reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); reindexRequest.setRefresh(true); - ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + TaskId task = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); - BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); + BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(task); awaitBusy(hasUpgradeCompleted); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskId.java b/server/src/main/java/org/elasticsearch/tasks/TaskId.java index f92997b047c13..7e6cd3dd78416 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskId.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskId.java @@ -20,11 +20,14 @@ package org.elasticsearch.tasks; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ContextParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -35,6 +38,16 @@ public final class TaskId implements Writeable { public static final TaskId EMPTY_TASK_ID = new TaskId(); + private static final ParseField TASK = new ParseField("task"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "task_id_within_object_response", + true, a -> (TaskId) a[0]); + + static { + PARSER.declareField(ConstructingObjectParser.constructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + private final String nodeId; private final long id; @@ -147,4 +160,8 @@ public int hashCode() { result = 31 * result + (int) (id ^ (id >>> 32)); return result; } + + public static TaskId fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } } From 16f255053ec63a424d540c1b5e22576d890b6643 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 6 Nov 2018 09:10:58 +0100 Subject: [PATCH 5/9] Revert "refactor to return taskID from submission" This reverts commit 686f10ad26545e302d583f2224e7495b4b21901c. --- .../client/RestHighLevelClient.java | 26 +++--- .../reindex/ReindexSubmissionResponse.java | 89 +++++++++++++++++++ .../java/org/elasticsearch/client/CrudIT.java | 2 + .../org/elasticsearch/client/ReindexIT.java | 5 +- .../java/org/elasticsearch/tasks/TaskId.java | 17 ---- 5 files changed, 107 insertions(+), 32 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 5d15e3f31eaaa..7f0d7644d675b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -56,8 +56,9 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.core.TermVectorsRequest; import org.elasticsearch.client.core.TermVectorsResponse; +import org.elasticsearch.client.core.TermVectorsRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.ParseField; @@ -134,7 +135,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.MedianAbsoluteDeviationAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; @@ -144,7 +144,6 @@ import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; import org.elasticsearch.search.aggregations.metrics.ParsedMax; -import org.elasticsearch.search.aggregations.metrics.ParsedMedianAbsoluteDeviation; import org.elasticsearch.search.aggregations.metrics.ParsedMin; import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; import org.elasticsearch.search.aggregations.metrics.ParsedStats; @@ -158,18 +157,20 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.metrics.MedianAbsoluteDeviationAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ParsedMedianAbsoluteDeviation; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; -import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; -import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket; -import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; @@ -177,7 +178,6 @@ import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder; import org.elasticsearch.search.suggest.term.TermSuggestion; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; -import org.elasticsearch.tasks.TaskId; import java.io.Closeable; import java.io.IOException; @@ -456,9 +456,9 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request * @return the submission response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public final TaskId submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - reindexRequest, RequestConverters::submitReindex, options, TaskId::fromXContent, emptySet() + reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet() ); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java new file mode 100644 index 0000000000000..522e48f3c2e6b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.reindex; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Objects; + +public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject { + private static final ParseField TASK = new ParseField("task"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "reindex_submission_response", + true, a -> new ReindexSubmissionResponse((TaskId) a[0])); + + static { + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final TaskId task; + + ReindexSubmissionResponse(@Nullable TaskId task) { + this.task = task; + } + + /** + * Get the task id + * + * @return the id of the reindex task. + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return Objects.hash(task); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + ReindexSubmissionResponse that = (ReindexSubmissionResponse) other; + return Objects.equals(task, that.task); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index fed0e8921569c..ee0fde29869a1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -60,6 +60,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 52b9f212c463c..c368b462a229e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -111,9 +112,9 @@ public void testReindexTask() throws IOException, InterruptedException { reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); reindexRequest.setRefresh(true); - TaskId task = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); - BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(task); + BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); awaitBusy(hasUpgradeCompleted); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskId.java b/server/src/main/java/org/elasticsearch/tasks/TaskId.java index 7e6cd3dd78416..f92997b047c13 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskId.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskId.java @@ -20,14 +20,11 @@ package org.elasticsearch.tasks; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ContextParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -38,16 +35,6 @@ public final class TaskId implements Writeable { public static final TaskId EMPTY_TASK_ID = new TaskId(); - private static final ParseField TASK = new ParseField("task"); - - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "task_id_within_object_response", - true, a -> (TaskId) a[0]); - - static { - PARSER.declareField(ConstructingObjectParser.constructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); - } - private final String nodeId; private final long id; @@ -160,8 +147,4 @@ public int hashCode() { result = 31 * result + (int) (id ^ (id >>> 32)); return result; } - - public static TaskId fromXContent(XContentParser parser) { - return PARSER.apply(parser, null); - } } From d3ca238c865bfdf64259677d11b37b605634dfdb Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 6 Nov 2018 10:27:26 +0100 Subject: [PATCH 6/9] return TaskSubmissionResponse make rest client test assert about submit task methods --- .../client/RestHighLevelClient.java | 6 +- .../TaskSubmissionResponse.java} | 28 ++--- .../java/org/elasticsearch/client/CrudIT.java | 2 - .../org/elasticsearch/client/ReindexIT.java | 4 +- .../client/RestHighLevelClientTests.java | 115 ++++++++++++------ .../tasks/TaskSubmissionResponseTests.java | 33 +++++ 6 files changed, 121 insertions(+), 67 deletions(-) rename client/rest-high-level/src/main/java/org/elasticsearch/client/{reindex/ReindexSubmissionResponse.java => tasks/TaskSubmissionResponse.java} (65%) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 7f0d7644d675b..ab62a63df41be 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -58,7 +58,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.core.TermVectorsRequest; -import org.elasticsearch.client.reindex.ReindexSubmissionResponse; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.ParseField; @@ -456,9 +456,9 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request * @return the submission response * @throws IOException in case there is a problem sending the request or parsing back the response */ - public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + public final TaskSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet() + reindexRequest, RequestConverters::submitReindex, options, TaskSubmissionResponse::fromXContent, emptySet() ); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java similarity index 65% rename from client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java rename to client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java index 522e48f3c2e6b..d594ba6920f16 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java @@ -16,38 +16,35 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.client.reindex; +package org.elasticsearch.client.tasks; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Objects; -public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject { +public class TaskSubmissionResponse extends ActionResponse { private static final ParseField TASK = new ParseField("task"); - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "reindex_submission_response", - true, a -> new ReindexSubmissionResponse((TaskId) a[0])); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "task_submission_response", + true, a -> new TaskSubmissionResponse((TaskId) a[0])); static { PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); } - public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException { + public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } private final TaskId task; - ReindexSubmissionResponse(@Nullable TaskId task) { + TaskSubmissionResponse(TaskId task) { this.task = task; } @@ -73,17 +70,8 @@ public boolean equals(Object other) { if (other == null || getClass() != other.getClass()) { return false; } - ReindexSubmissionResponse that = (ReindexSubmissionResponse) other; + TaskSubmissionResponse that = (TaskSubmissionResponse) other; return Objects.equals(task, that.task); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (task != null) { - builder.field(TASK.getPreferredName(), task.toString()); - } - builder.endObject(); - return builder; - } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index ee0fde29869a1..fed0e8921569c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -60,8 +60,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.ReindexAction; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index c368b462a229e..e112214a8d3e9 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -22,7 +22,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.reindex.ReindexSubmissionResponse; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -112,7 +112,7 @@ public void testReindexTask() throws IOException, InterruptedException { reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); reindexRequest.setRefresh(true); - ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + TaskSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); awaitBusy(hasUpgradeCompleted); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index d40c3196e54f4..1e18954c7ce4d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.client; import com.fasterxml.jackson.core.JsonParseException; - import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -76,6 +75,7 @@ import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec; +import org.hamcrest.Matchers; import org.junit.Before; import java.io.IOException; @@ -107,6 +107,9 @@ public class RestHighLevelClientTests extends ESTestCase { + static final String SUBMIT_TASK_PREFIX = "submit_"; + static final String SUBMIT_TASK_SUFFIX = "_task"; + private static final ProtocolVersion HTTP_PROTOCOL = new ProtocolVersion("http", 1, 1); private static final RequestLine REQUEST_LINE = new BasicRequestLine(HttpGet.METHOD_NAME, "/", HTTP_PROTOCOL); @@ -715,46 +718,11 @@ public void testApiNamingConventions() throws Exception { //we convert all the method names to snake case, hence we need to look for the '_async' suffix rather than 'Async' if (apiName.endsWith("_async")) { - assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method", - methods.containsKey(apiName.substring(0, apiName.length() - 6))); - assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE)); - assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length); - if (apiName.equals("security.get_ssl_certificates_async")) { - assertEquals(2, method.getParameterTypes().length); - assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class)); - assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class)); - } else { - assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length); - assertThat("the first parameter to async method [" + method + "] should be a request type", - method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); - assertThat("the second parameter to async method [" + method + "] is the wrong type", - method.getParameterTypes()[1], equalTo(RequestOptions.class)); - assertThat("the third parameter to async method [" + method + "] is the wrong type", - method.getParameterTypes()[2], equalTo(ActionListener.class)); - } + assertAsyncMethod(methods, method, apiName); + } else if (isSubmitTaskMethod(apiName)) { + assertSubmitTaskMethod(methods, method, apiName, restSpec); } else { - //A few methods return a boolean rather than a response object - if (apiName.equals("ping") || apiName.contains("exist")) { - assertThat("the return type for method [" + method + "] is incorrect", - method.getReturnType().getSimpleName(), equalTo("boolean")); - } else { - assertThat("the return type for method [" + method + "] is incorrect", - method.getReturnType().getSimpleName(), endsWith("Response")); - } - - assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length); - //a few methods don't accept a request object as argument - if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates")) { - assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length); - assertThat("the parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[0], equalTo(RequestOptions.class)); - } else { - assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length); - assertThat("the first parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); - assertThat("the second parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[1], equalTo(RequestOptions.class)); - } + assertSyncMethod(method, apiName); boolean remove = apiSpec.remove(apiName); if (remove == false) { @@ -788,6 +756,73 @@ public void testApiNamingConventions() throws Exception { assertThat("Some API are not supported but they should be: " + apiSpec, apiSpec.size(), equalTo(0)); } + private void assertAsyncMethod(Map methods, Method method, String apiName) { + assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method", + methods.containsKey(apiName.substring(0, apiName.length() - 6))); + assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE)); + assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length); + if (apiName.equals("security.get_ssl_certificates_async")) { + assertEquals(2, method.getParameterTypes().length); + assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class)); + assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class)); + } else { + assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length); + assertThat("the first parameter to async method [" + method + "] should be a request type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to async method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + assertThat("the third parameter to async method [" + method + "] is the wrong type", + method.getParameterTypes()[2], equalTo(ActionListener.class)); + } + } + + private void assertSubmitTaskMethod(Map methods, Method method, String apiName, ClientYamlSuiteRestSpec restSpec) { + String methodName = extractMethodName(apiName); + assertTrue("submit task method [" + method.getName() + "] doesn't have corresponding sync method", + methods.containsKey(methodName)); + assertEquals("submit task method [" + method + "] has the wrong number of arguments", 2, method.getParameterTypes().length); + assertThat("the first parameter to submit task method [" + method + "] is the wrong type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to submit task method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + + assertThat("submit task method [" + method + "] must have wait_for_completion parameter in rest spec", + restSpec.getApi(methodName).getParams(), Matchers.hasKey("wait_for_completion")); + } + + private void assertSyncMethod(Method method, String apiName) { + //A few methods return a boolean rather than a response object + if (apiName.equals("ping") || apiName.contains("exist")) { + assertThat("the return type for method [" + method + "] is incorrect", + method.getReturnType().getSimpleName(), equalTo("boolean")); + } else { + assertThat("the return type for method [" + method + "] is incorrect", + method.getReturnType().getSimpleName(), endsWith("Response")); + } + + assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length); + //a few methods don't accept a request object as argument + if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates")) { + assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length); + assertThat("the parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[0], equalTo(RequestOptions.class)); + } else { + assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length); + assertThat("the first parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + } + } + + private String extractMethodName(String apiName) { + return apiName.substring(SUBMIT_TASK_PREFIX.length(), apiName.length() - SUBMIT_TASK_SUFFIX.length()); + } + + private boolean isSubmitTaskMethod(String apiName) { + return apiName.startsWith(SUBMIT_TASK_PREFIX) && apiName.endsWith(SUBMIT_TASK_SUFFIX); + } + private static Stream> getSubClientMethods(String namespace, Class clientClass) { return Arrays.stream(clientClass.getMethods()).filter(method -> method.getDeclaringClass().equals(clientClass)) .map(method -> Tuple.tuple(namespace + "." + toSnakeCase(method.getName()), method)) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java new file mode 100644 index 0000000000000..58edb92b1f643 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java @@ -0,0 +1,33 @@ +package org.elasticsearch.client.tasks; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class TaskSubmissionResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + xContentTester( + this::createParser, + this::createTestInstance, + this::toXContent, + TaskSubmissionResponse::fromXContent) + .supportsUnknownFields(true) + .test(); + } + + private void toXContent(TaskSubmissionResponse response, XContentBuilder xContentBuilder) throws IOException { + xContentBuilder.startObject(); + xContentBuilder.field("task", response.getTask().toString()); + xContentBuilder.endObject(); + } + + private TaskSubmissionResponse createTestInstance() { + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong()); + return new TaskSubmissionResponse(taskId); + } +} From ef40220c2eddc6ac67015125a53a25c218163e2f Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 6 Nov 2018 12:18:22 +0100 Subject: [PATCH 7/9] missing headers --- .../client/tasks/TaskSubmissionResponse.java | 1 + .../tasks/TaskSubmissionResponseTests.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java index d594ba6920f16..3a89950a4e213 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.client.tasks; import org.elasticsearch.action.ActionResponse; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java index 58edb92b1f643..9bd51426977fd 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.client.tasks; import org.elasticsearch.common.xcontent.XContentBuilder; From 5df4cbb5307a57c7f9cdd81dfb25a00b28cd6133 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 6 Nov 2018 17:34:21 +0100 Subject: [PATCH 8/9] rest high level client tests after merge --- .../client/RestHighLevelClientTests.java | 115 ++++++++++++------ 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 38810285a5d1c..9cdad41cf19bc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -83,6 +83,7 @@ import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec; +import org.hamcrest.Matchers; import org.junit.Before; import java.io.IOException; @@ -114,6 +115,8 @@ public class RestHighLevelClientTests extends ESTestCase { + private static final String SUBMIT_TASK_PREFIX = "submit_"; + private static final String SUBMIT_TASK_SUFFIX = "_task"; private static final ProtocolVersion HTTP_PROTOCOL = new ProtocolVersion("http", 1, 1); private static final RequestLine REQUEST_LINE = new BasicRequestLine(HttpGet.METHOD_NAME, "/", HTTP_PROTOCOL); @@ -728,47 +731,11 @@ public void testApiNamingConventions() throws Exception { //we convert all the method names to snake case, hence we need to look for the '_async' suffix rather than 'Async' if (apiName.endsWith("_async")) { - assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method", - methods.containsKey(apiName.substring(0, apiName.length() - 6))); - assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE)); - assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length); - if (apiName.equals("security.authenticate_async") || apiName.equals("security.get_ssl_certificates_async")) { - assertEquals(2, method.getParameterTypes().length); - assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class)); - assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class)); - } else { - assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length); - assertThat("the first parameter to async method [" + method + "] should be a request type", - method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); - assertThat("the second parameter to async method [" + method + "] is the wrong type", - method.getParameterTypes()[1], equalTo(RequestOptions.class)); - assertThat("the third parameter to async method [" + method + "] is the wrong type", - method.getParameterTypes()[2], equalTo(ActionListener.class)); - } + assertAsyncMethod(methods, method, apiName); + } else if (isSubmitTaskMethod(apiName)) { + assertSubmitTaskMethod(methods, method, apiName, restSpec); } else { - //A few methods return a boolean rather than a response object - if (apiName.equals("ping") || apiName.contains("exist")) { - assertThat("the return type for method [" + method + "] is incorrect", - method.getReturnType().getSimpleName(), equalTo("boolean")); - } else { - assertThat("the return type for method [" + method + "] is incorrect", - method.getReturnType().getSimpleName(), endsWith("Response")); - } - - assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length); - //a few methods don't accept a request object as argument - if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates") - || apiName.equals("security.authenticate")) { - assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length); - assertThat("the parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[0], equalTo(RequestOptions.class)); - } else { - assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length); - assertThat("the first parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); - assertThat("the second parameter to method [" + method + "] is the wrong type", - method.getParameterTypes()[1], equalTo(RequestOptions.class)); - } + assertSyncMethod(method, apiName); boolean remove = apiSpec.remove(apiName); if (remove == false) { @@ -803,6 +770,74 @@ public void testApiNamingConventions() throws Exception { assertThat("Some API are not supported but they should be: " + apiSpec, apiSpec.size(), equalTo(0)); } + private void assertSyncMethod(Method method, String apiName) { + //A few methods return a boolean rather than a response object + if (apiName.equals("ping") || apiName.contains("exist")) { + assertThat("the return type for method [" + method + "] is incorrect", + method.getReturnType().getSimpleName(), equalTo("boolean")); + } else { + assertThat("the return type for method [" + method + "] is incorrect", + method.getReturnType().getSimpleName(), endsWith("Response")); + } + + assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length); + //a few methods don't accept a request object as argument + if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates") + || apiName.equals("security.authenticate")) { + assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length); + assertThat("the parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[0], equalTo(RequestOptions.class)); + } else { + assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length); + assertThat("the first parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + } + } + + private void assertAsyncMethod(Map methods, Method method, String apiName) { + assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method", + methods.containsKey(apiName.substring(0, apiName.length() - 6))); + assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE)); + assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length); + if (apiName.equals("security.authenticate_async") || apiName.equals("security.get_ssl_certificates_async")) { + assertEquals(2, method.getParameterTypes().length); + assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class)); + assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class)); + } else { + assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length); + assertThat("the first parameter to async method [" + method + "] should be a request type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to async method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + assertThat("the third parameter to async method [" + method + "] is the wrong type", + method.getParameterTypes()[2], equalTo(ActionListener.class)); + } + } + + private void assertSubmitTaskMethod(Map methods, Method method, String apiName, ClientYamlSuiteRestSpec restSpec) { + String methodName = extractMethodName(apiName); + assertTrue("submit task method [" + method.getName() + "] doesn't have corresponding sync method", + methods.containsKey(methodName)); + assertEquals("submit task method [" + method + "] has the wrong number of arguments", 2, method.getParameterTypes().length); + assertThat("the first parameter to submit task method [" + method + "] is the wrong type", + method.getParameterTypes()[0].getSimpleName(), endsWith("Request")); + assertThat("the second parameter to submit task method [" + method + "] is the wrong type", + method.getParameterTypes()[1], equalTo(RequestOptions.class)); + + assertThat("submit task method [" + method + "] must have wait_for_completion parameter in rest spec", + restSpec.getApi(methodName).getParams(), Matchers.hasKey("wait_for_completion")); + } + + private String extractMethodName(String apiName) { + return apiName.substring(SUBMIT_TASK_PREFIX.length(), apiName.length() - SUBMIT_TASK_SUFFIX.length()); + } + + private boolean isSubmitTaskMethod(String apiName) { + return apiName.startsWith(SUBMIT_TASK_PREFIX) && apiName.endsWith(SUBMIT_TASK_SUFFIX); + } + private static Stream> getSubClientMethods(String namespace, Class clientClass) { return Arrays.stream(clientClass.getMethods()).filter(method -> method.getDeclaringClass().equals(clientClass)) .map(method -> Tuple.tuple(namespace + "." + toSnakeCase(method.getName()), method)) From f66f7b0eb1c630b7b2f579cd2c59dbacafd2b108 Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Wed, 7 Nov 2018 10:51:20 +0100 Subject: [PATCH 9/9] refactor taskid to be a string --- .../client/tasks/TaskSubmissionResponse.java | 22 +++++++++---------- .../org/elasticsearch/client/ReindexIT.java | 10 ++++----- .../tasks/TaskSubmissionResponseTests.java | 5 ++--- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java index 3a89950a4e213..7bc104c9bbf27 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskSubmissionResponse.java @@ -22,30 +22,26 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Objects; public class TaskSubmissionResponse extends ActionResponse { + private static final ParseField TASK = new ParseField("task"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "task_submission_response", - true, a -> new TaskSubmissionResponse((TaskId) a[0])); + true, a -> new TaskSubmissionResponse((String) a[0])); static { - PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); - } - - public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK); } - private final TaskId task; + private final String task; - TaskSubmissionResponse(TaskId task) { + TaskSubmissionResponse(String task) { this.task = task; } @@ -54,7 +50,7 @@ public static TaskSubmissionResponse fromXContent(XContentParser parser) throws * * @return the id of the reindex task. */ - public TaskId getTask() { + public String getTask() { return task; } @@ -75,4 +71,8 @@ public boolean equals(Object other) { return Objects.equals(task, that.task); } + public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index e112214a8d3e9..afc5e99b5f03a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Collections; @@ -61,13 +60,15 @@ public void testReindex() throws IOException { ); } { - // test1: create one doc in dest + // reindex one document with id 1 from source to destination ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndex); reindexRequest.setDestIndex(destinationIndex); reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); reindexRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); assertEquals(1, bulkResponse.getTotal()); assertEquals(0, bulkResponse.getDeleted()); @@ -105,7 +106,6 @@ public void testReindexTask() throws IOException, InterruptedException { ); } { - // test1: create one doc in dest ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndex); reindexRequest.setDestIndex(destinationIndex); @@ -119,10 +119,10 @@ public void testReindexTask() throws IOException, InterruptedException { } } - private BooleanSupplier checkCompletionStatus(TaskId taskId) { + private BooleanSupplier checkCompletionStatus(String taskId) { return () -> { try { - Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId.toString())); + Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId)); return (boolean) entityAsMap(response).get("completed"); } catch (IOException e) { fail(e.getMessage()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java index 9bd51426977fd..4e21b28dd8181 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/TaskSubmissionResponseTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.client.tasks; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -41,12 +40,12 @@ public void testFromXContent() throws IOException { private void toXContent(TaskSubmissionResponse response, XContentBuilder xContentBuilder) throws IOException { xContentBuilder.startObject(); - xContentBuilder.field("task", response.getTask().toString()); + xContentBuilder.field("task", response.getTask()); xContentBuilder.endObject(); } private TaskSubmissionResponse createTestInstance() { - TaskId taskId = new TaskId(randomAlphaOfLength(5), randomLong()); + String taskId = randomAlphaOfLength(5) + ":" + randomLong(); return new TaskSubmissionResponse(taskId); } }