Skip to content

Commit fd4cd80

Browse files
authored
HLRC: migration api - upgrade (#34898)
Implement high level client for migration upgrade API. It should wrap RestHighLevelClient and expose high level IndexUpgradeRequest (new), IndexTaskResponse for submissions with wait_for_completion=false and BulkByScrollResponse (already used) objects. refers: #29827
1 parent 0487181 commit fd4cd80

File tree

10 files changed

+324
-7
lines changed

10 files changed

+324
-7
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MigrationClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
2323
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
26+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
27+
import org.elasticsearch.client.migration.IndexUpgradeRequest;
28+
2429

2530
import java.io.IOException;
2631
import java.util.Collections;
@@ -52,4 +57,19 @@ public IndexUpgradeInfoResponse getAssistance(IndexUpgradeInfoRequest request, R
5257
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::getMigrationAssistance, options,
5358
IndexUpgradeInfoResponse::fromXContent, Collections.emptySet());
5459
}
60+
61+
public BulkByScrollResponse upgrade(IndexUpgradeRequest request, RequestOptions options) throws IOException {
62+
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::migrate, options,
63+
BulkByScrollResponse::fromXContent, Collections.emptySet());
64+
}
65+
66+
public TaskSubmissionResponse submitUpgradeTask(IndexUpgradeRequest request, RequestOptions options) throws IOException {
67+
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::submitMigrateTask, options,
68+
TaskSubmissionResponse::fromXContent, Collections.emptySet());
69+
}
70+
71+
public void upgradeAsync(IndexUpgradeRequest request, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
72+
restHighLevelClient.performRequestAsyncAndParseEntity(request, MigrationRequestConverters::migrate, options,
73+
BulkByScrollResponse::fromXContent, listener, Collections.emptySet());
74+
}
5575
}

client/rest-high-level/src/main/java/org/elasticsearch/client/MigrationRequestConverters.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
package org.elasticsearch.client;
2121

2222
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
2324
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
25+
import org.elasticsearch.client.migration.IndexUpgradeRequest;
2426

2527
final class MigrationRequestConverters {
2628

27-
private MigrationRequestConverters() {}
29+
private MigrationRequestConverters() {
30+
}
2831

2932
static Request getMigrationAssistance(IndexUpgradeInfoRequest indexUpgradeInfoRequest) {
3033
RequestConverters.EndpointBuilder endpointBuilder = new RequestConverters.EndpointBuilder()
@@ -36,4 +39,26 @@ static Request getMigrationAssistance(IndexUpgradeInfoRequest indexUpgradeInfoRe
3639
parameters.withIndicesOptions(indexUpgradeInfoRequest.indicesOptions());
3740
return request;
3841
}
42+
43+
static Request migrate(IndexUpgradeRequest indexUpgradeRequest) {
44+
return prepareMigrateRequest(indexUpgradeRequest, true);
45+
}
46+
47+
static Request submitMigrateTask(IndexUpgradeRequest indexUpgradeRequest) {
48+
return prepareMigrateRequest(indexUpgradeRequest, false);
49+
}
50+
51+
private static Request prepareMigrateRequest(IndexUpgradeRequest indexUpgradeRequest, boolean waitForCompletion) {
52+
String endpoint = new RequestConverters.EndpointBuilder()
53+
.addPathPartAsIs("_xpack", "migration", "upgrade")
54+
.addPathPart(indexUpgradeRequest.index())
55+
.build();
56+
57+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
58+
59+
RequestConverters.Params params = new RequestConverters.Params(request)
60+
.withWaitForCompletion(waitForCompletion);
61+
62+
return request;
63+
}
3964
}

client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeInfoRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import java.util.Arrays;
2727
import java.util.Objects;
2828

29+
/**
30+
* A request for retrieving upgrade information
31+
* Part of Migration API
32+
*/
2933
public class IndexUpgradeInfoRequest extends TimedRequest implements IndicesRequest.Replaceable {
3034

3135
private String[] indices = Strings.EMPTY_ARRAY;

client/rest-high-level/src/main/java/org/elasticsearch/client/migration/IndexUpgradeInfoResponse.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828

2929
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
3030

31+
/**
32+
* Response object that contains information about indices to be upgraded
33+
*/
3134
public class IndexUpgradeInfoResponse {
3235

3336
private static final ParseField INDICES = new ParseField("indices");
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.migration;
20+
21+
import org.elasticsearch.client.Validatable;
22+
23+
import java.util.Objects;
24+
25+
/**
26+
* A request for performing Upgrade on Index
27+
* Part of Migration API
28+
*/
29+
public class IndexUpgradeRequest implements Validatable {
30+
31+
private String index;
32+
33+
public IndexUpgradeRequest(String index) {
34+
this.index = index;
35+
}
36+
37+
public String index() {
38+
return index;
39+
}
40+
41+
@Override
42+
public boolean equals(Object o) {
43+
if (this == o) return true;
44+
if (o == null || getClass() != o.getClass()) return false;
45+
IndexUpgradeRequest request = (IndexUpgradeRequest) o;
46+
return Objects.equals(index, request.index);
47+
}
48+
49+
@Override
50+
public int hashCode() {
51+
return Objects.hash(index);
52+
}
53+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MigrationIT.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,73 @@
1919

2020
package org.elasticsearch.client;
2121

22-
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
22+
import org.elasticsearch.ElasticsearchStatusException;
2323
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
2424
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
25+
import org.elasticsearch.client.migration.IndexUpgradeRequest;
26+
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
27+
import org.elasticsearch.common.settings.Settings;
2528

2629
import java.io.IOException;
30+
import java.util.function.BooleanSupplier;
31+
32+
import static org.hamcrest.Matchers.containsString;
2733

2834
public class MigrationIT extends ESRestHighLevelClientTestCase {
2935

3036
public void testGetAssistance() throws IOException {
31-
RestHighLevelClient client = highLevelClient();
3237
{
33-
IndexUpgradeInfoResponse response = client.migration().getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
38+
IndexUpgradeInfoResponse response = highLevelClient().migration()
39+
.getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
3440
assertEquals(0, response.getActions().size());
3541
}
3642
{
37-
client.indices().create(new CreateIndexRequest("test"), RequestOptions.DEFAULT);
38-
IndexUpgradeInfoResponse response = client.migration().getAssistance(
43+
createIndex("test", Settings.EMPTY);
44+
IndexUpgradeInfoResponse response = highLevelClient().migration().getAssistance(
3945
new IndexUpgradeInfoRequest("test"), RequestOptions.DEFAULT);
4046
assertEquals(0, response.getActions().size());
4147
}
4248
}
49+
50+
public void testUpgradeWhenIndexCannotBeUpgraded() throws IOException {
51+
createIndex("test", Settings.EMPTY);
52+
53+
ThrowingRunnable execute = () -> execute(new IndexUpgradeRequest("test"),
54+
highLevelClient().migration()::upgrade,
55+
highLevelClient().migration()::upgradeAsync);
56+
57+
ElasticsearchStatusException responseException = expectThrows(ElasticsearchStatusException.class, execute);
58+
59+
assertThat(responseException.getDetailedMessage(), containsString("cannot be upgraded"));
60+
}
61+
62+
public void testUpgradeWithTaskApi() throws IOException, InterruptedException {
63+
createIndex("test", Settings.EMPTY);
64+
65+
IndexUpgradeRequest request = new IndexUpgradeRequest("test");
66+
67+
TaskSubmissionResponse upgrade = highLevelClient().migration()
68+
.submitUpgradeTask(request, RequestOptions.DEFAULT);
69+
70+
assertNotNull(upgrade.getTask());
71+
72+
BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(upgrade);
73+
awaitBusy(hasUpgradeCompleted);
74+
}
75+
76+
/**
77+
* Using low-level api as high-level-rest-client's getTaskById work is in progress.
78+
* TODO revisit once that work is finished
79+
*/
80+
private BooleanSupplier checkCompletionStatus(TaskSubmissionResponse upgrade) {
81+
return () -> {
82+
try {
83+
Response response = client().performRequest(new Request("GET", "/_tasks/" + upgrade.getTask()));
84+
return (boolean) entityAsMap(response).get("completed");
85+
} catch (IOException e) {
86+
fail(e.getMessage());
87+
return false;
88+
}
89+
};
90+
}
4391
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MigrationRequestConvertersTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.elasticsearch.client;
2121

2222
import org.apache.http.client.methods.HttpGet;
23+
import org.apache.http.client.methods.HttpPost;
2324
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
25+
import org.elasticsearch.client.migration.IndexUpgradeRequest;
2426
import org.elasticsearch.test.ESTestCase;
2527

2628
import java.util.HashMap;
@@ -45,4 +47,20 @@ public void testGetMigrationAssistance() {
4547
assertNull(request.getEntity());
4648
assertEquals(expectedParams, request.getParameters());
4749
}
50+
51+
public void testUpgradeRequest() {
52+
String[] indices = RequestConvertersTests.randomIndicesNames(1, 1);
53+
IndexUpgradeRequest upgradeInfoRequest = new IndexUpgradeRequest(indices[0]);
54+
55+
String expectedEndpoint = "/_xpack/migration/upgrade/" + indices[0];
56+
Map<String, String> expectedParams = new HashMap<>();
57+
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
58+
59+
Request request = MigrationRequestConverters.migrate(upgradeInfoRequest);
60+
61+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
62+
assertEquals(expectedEndpoint, request.getEndpoint());
63+
assertNull(request.getEntity());
64+
assertEquals(expectedParams, request.getParameters());
65+
}
4866
}

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MigrationClientDocumentationIT.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,30 @@
1919

2020
package org.elasticsearch.client.documentation;
2121

22+
import org.elasticsearch.ElasticsearchStatusException;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.LatchedActionListener;
2225
import org.elasticsearch.action.support.IndicesOptions;
2326
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
2427
import org.elasticsearch.client.RequestOptions;
2528
import org.elasticsearch.client.RestHighLevelClient;
26-
import org.elasticsearch.common.Strings;
2729
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
2830
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
31+
import org.elasticsearch.client.migration.IndexUpgradeRequest;
2932
import org.elasticsearch.client.migration.UpgradeActionRequired;
33+
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
34+
import org.elasticsearch.common.Strings;
35+
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
3037

3138
import java.io.IOException;
3239
import java.util.Map;
40+
import java.util.concurrent.CountDownLatch;
41+
import java.util.concurrent.TimeUnit;
42+
43+
import static org.hamcrest.Matchers.containsString;
44+
import static org.hamcrest.Matchers.isEmptyOrNullString;
45+
import static org.hamcrest.Matchers.not;
3346

3447
/**
3548
* This class is used to generate the Java Migration API documentation.
@@ -80,4 +93,66 @@ public void testGetAssistance() throws IOException {
8093
}
8194
// end::get-assistance-response
8295
}
96+
97+
public void testUpgrade() throws IOException {
98+
99+
RestHighLevelClient client = highLevelClient();
100+
createIndex("test", Settings.EMPTY);
101+
102+
103+
// tag::upgrade-request
104+
IndexUpgradeRequest request = new IndexUpgradeRequest("test"); // <1>
105+
// end::upgrade-request
106+
107+
try {
108+
109+
// tag::upgrade-execute
110+
BulkByScrollResponse response = client.migration().upgrade(request, RequestOptions.DEFAULT);
111+
// end::upgrade-execute
112+
113+
} catch (ElasticsearchStatusException e) {
114+
assertThat(e.getMessage(), containsString("cannot be upgraded"));
115+
}
116+
}
117+
118+
public void testUpgradeAsync() throws IOException, InterruptedException {
119+
RestHighLevelClient client = highLevelClient();
120+
createIndex("test", Settings.EMPTY);
121+
final CountDownLatch latch = new CountDownLatch(1);
122+
123+
// tag::upgrade-async-listener
124+
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
125+
@Override
126+
public void onResponse(BulkByScrollResponse bulkResponse) {
127+
// <1>
128+
}
129+
130+
@Override
131+
public void onFailure(Exception e) {
132+
// <2>
133+
}
134+
};
135+
// end::upgrade-async-listener
136+
137+
listener = new LatchedActionListener<>(listener, latch);
138+
139+
// tag::upgrade-async-execute
140+
client.migration().upgradeAsync(new IndexUpgradeRequest("test"), RequestOptions.DEFAULT, listener); // <1>
141+
// end::upgrade-async-execute
142+
143+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
144+
}
145+
146+
public void testUpgradeWithTaskApi() throws IOException {
147+
createIndex("test", Settings.EMPTY);
148+
RestHighLevelClient client = highLevelClient();
149+
// tag::upgrade-task-api
150+
IndexUpgradeRequest request = new IndexUpgradeRequest("test");
151+
152+
TaskSubmissionResponse response = client.migration()
153+
.submitUpgradeTask(request, RequestOptions.DEFAULT);
154+
String taskId = response.getTask();
155+
// end::upgrade-task-api
156+
assertThat(taskId, not(isEmptyOrNullString()));
157+
}
83158
}

0 commit comments

Comments
 (0)