Skip to content

Commit 184a871

Browse files
authored
REST high-level client: add flush API (#28852)
Relates to #27205
1 parent 742e9f5 commit 184a871

File tree

15 files changed

+515
-158
lines changed

15 files changed

+515
-158
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3131
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
3232
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
33+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
34+
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
3335
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
3436
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
3537
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -237,6 +239,26 @@ public void refreshAsync(RefreshRequest refreshRequest, ActionListener<RefreshRe
237239
listener, emptySet(), headers);
238240
}
239241

242+
/**
243+
* Flush one or more indices using the Flush API
244+
* <p>
245+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
246+
*/
247+
public FlushResponse flush(FlushRequest flushRequest, Header... headers) throws IOException {
248+
return restHighLevelClient.performRequestAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
249+
emptySet(), headers);
250+
}
251+
252+
/**
253+
* Asynchronously flush one or more indices using the Flush API
254+
* <p>
255+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
256+
*/
257+
public void flushAsync(FlushRequest flushRequest, ActionListener<FlushResponse> listener, Header... headers) {
258+
restHighLevelClient.performRequestAsyncAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
259+
listener, emptySet(), headers);
260+
}
261+
240262
/**
241263
* Checks if the index (indices) exists or not.
242264
* <p>

client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
3636
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
3737
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
38+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3839
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
3940
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
4041
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@@ -219,10 +220,17 @@ static Request putMapping(PutMappingRequest putMappingRequest) throws IOExceptio
219220

220221
static Request refresh(RefreshRequest refreshRequest) {
221222
String endpoint = endpoint(refreshRequest.indices(), "_refresh");
222-
223223
Params parameters = Params.builder();
224224
parameters.withIndicesOptions(refreshRequest.indicesOptions());
225+
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
226+
}
225227

228+
static Request flush(FlushRequest flushRequest) {
229+
String endpoint = endpoint(flushRequest.indices(), "_flush");
230+
Params parameters = Params.builder();
231+
parameters.withIndicesOptions(flushRequest.indicesOptions());
232+
parameters.putParam("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));
233+
parameters.putParam("force", Boolean.toString(flushRequest.force()));
226234
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
227235
}
228236

client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3535
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
3636
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
37+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
38+
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
3739
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
3840
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
3941
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -410,6 +412,32 @@ public void testRefresh() throws IOException {
410412
}
411413
}
412414

415+
public void testFlush() throws IOException {
416+
{
417+
String index = "index";
418+
Settings settings = Settings.builder()
419+
.put("number_of_shards", 1)
420+
.put("number_of_replicas", 0)
421+
.build();
422+
createIndex(index, settings);
423+
FlushRequest flushRequest = new FlushRequest(index);
424+
FlushResponse flushResponse =
425+
execute(flushRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync);
426+
assertThat(flushResponse.getTotalShards(), equalTo(1));
427+
assertThat(flushResponse.getSuccessfulShards(), equalTo(1));
428+
assertThat(flushResponse.getFailedShards(), equalTo(0));
429+
assertThat(flushResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
430+
}
431+
{
432+
String nonExistentIndex = "non_existent_index";
433+
assertFalse(indexExists(nonExistentIndex));
434+
FlushRequest refreshRequest = new FlushRequest(nonExistentIndex);
435+
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
436+
() -> execute(refreshRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync));
437+
assertEquals(RestStatus.NOT_FOUND, exception.status());
438+
}
439+
}
440+
413441
public void testExistsAlias() throws IOException {
414442
GetAliasesRequest getAliasesRequest = new GetAliasesRequest("alias");
415443
assertFalse(execute(getAliasesRequest, highLevelClient().indices()::existsAlias, highLevelClient().indices()::existsAliasAsync));

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
3838
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
3939
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
40+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4041
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
4142
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
4243
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@@ -537,15 +538,43 @@ public void testIndex() throws IOException {
537538
}
538539

539540
public void testRefresh() {
540-
String[] indices = randomIndicesNames(1, 5);
541+
String[] indices = randomIndicesNames(0, 5);
541542
RefreshRequest refreshRequest = new RefreshRequest(indices);
542-
543543
Map<String, String> expectedParams = new HashMap<>();
544544
setRandomIndicesOptions(refreshRequest::indicesOptions, refreshRequest::indicesOptions, expectedParams);
545-
546545
Request request = Request.refresh(refreshRequest);
547-
StringJoiner endpoint = new StringJoiner("/", "/", "").add(String.join(",", indices)).add("_refresh");
548-
assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
546+
StringJoiner endpoint = new StringJoiner("/", "/", "");
547+
if (indices.length > 0) {
548+
endpoint.add(String.join(",", indices));
549+
}
550+
endpoint.add("_refresh");
551+
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
552+
assertThat(request.getParameters(), equalTo(expectedParams));
553+
assertThat(request.getEntity(), nullValue());
554+
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
555+
}
556+
557+
public void testFlush() {
558+
String[] indices = randomIndicesNames(0, 5);
559+
FlushRequest flushRequest = new FlushRequest(indices);
560+
Map<String, String> expectedParams = new HashMap<>();
561+
setRandomIndicesOptions(flushRequest::indicesOptions, flushRequest::indicesOptions, expectedParams);
562+
if (randomBoolean()) {
563+
flushRequest.force(randomBoolean());
564+
}
565+
expectedParams.put("force", Boolean.toString(flushRequest.force()));
566+
if (randomBoolean()) {
567+
flushRequest.waitIfOngoing(randomBoolean());
568+
}
569+
expectedParams.put("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));
570+
571+
Request request = Request.flush(flushRequest);
572+
StringJoiner endpoint = new StringJoiner("/", "/", "");
573+
if (indices.length > 0) {
574+
endpoint.add(String.join(",", indices));
575+
}
576+
endpoint.add("_flush");
577+
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
549578
assertThat(request.getParameters(), equalTo(expectedParams));
550579
assertThat(request.getEntity(), nullValue());
551580
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
3434
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
3535
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
36+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
37+
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
3638
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
3739
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
3840
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@@ -691,6 +693,82 @@ public void onFailure(Exception e) {
691693
}
692694
}
693695

696+
public void testFlushIndex() throws Exception {
697+
RestHighLevelClient client = highLevelClient();
698+
699+
{
700+
createIndex("index1", Settings.EMPTY);
701+
}
702+
703+
{
704+
// tag::flush-request
705+
FlushRequest request = new FlushRequest("index1"); // <1>
706+
FlushRequest requestMultiple = new FlushRequest("index1", "index2"); // <2>
707+
FlushRequest requestAll = new FlushRequest(); // <3>
708+
// end::flush-request
709+
710+
// tag::flush-request-indicesOptions
711+
request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
712+
// end::flush-request-indicesOptions
713+
714+
// tag::flush-request-wait
715+
request.waitIfOngoing(true); // <1>
716+
// end::flush-request-wait
717+
718+
// tag::flush-request-force
719+
request.force(true); // <1>
720+
// end::flush-request-force
721+
722+
// tag::flush-execute
723+
FlushResponse flushResponse = client.indices().flush(request);
724+
// end::flush-execute
725+
726+
// tag::flush-response
727+
int totalShards = flushResponse.getTotalShards(); // <1>
728+
int successfulShards = flushResponse.getSuccessfulShards(); // <2>
729+
int failedShards = flushResponse.getFailedShards(); // <3>
730+
DefaultShardOperationFailedException[] failures = flushResponse.getShardFailures(); // <4>
731+
// end::flush-response
732+
733+
// tag::flush-execute-listener
734+
ActionListener<FlushResponse> listener = new ActionListener<FlushResponse>() {
735+
@Override
736+
public void onResponse(FlushResponse refreshResponse) {
737+
// <1>
738+
}
739+
740+
@Override
741+
public void onFailure(Exception e) {
742+
// <2>
743+
}
744+
};
745+
// end::flush-execute-listener
746+
747+
// Replace the empty listener by a blocking listener in test
748+
final CountDownLatch latch = new CountDownLatch(1);
749+
listener = new LatchedActionListener<>(listener, latch);
750+
751+
// tag::flush-execute-async
752+
client.indices().flushAsync(request, listener); // <1>
753+
// end::flush-execute-async
754+
755+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
756+
}
757+
758+
{
759+
// tag::flush-notfound
760+
try {
761+
FlushRequest request = new FlushRequest("does_not_exist");
762+
client.indices().flush(request);
763+
} catch (ElasticsearchException exception) {
764+
if (exception.status() == RestStatus.NOT_FOUND) {
765+
// <1>
766+
}
767+
}
768+
// end::flush-notfound
769+
}
770+
}
771+
694772
public void testCloseIndex() throws Exception {
695773
RestHighLevelClient client = highLevelClient();
696774

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
[[java-rest-high-flush]]
2+
=== Flush API
3+
4+
[[java-rest-high-flush-request]]
5+
==== Flush Request
6+
7+
A `FlushRequest` can be applied to one or more indices, or even on `_all` the indices:
8+
9+
["source","java",subs="attributes,callouts,macros"]
10+
--------------------------------------------------
11+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request]
12+
--------------------------------------------------
13+
<1> Flush one index
14+
<2> Flush multiple indices
15+
<3> Flush all the indices
16+
17+
==== Optional arguments
18+
19+
["source","java",subs="attributes,callouts,macros"]
20+
--------------------------------------------------
21+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-indicesOptions]
22+
--------------------------------------------------
23+
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
24+
how wildcard expressions are expanded
25+
26+
["source","java",subs="attributes,callouts,macros"]
27+
--------------------------------------------------
28+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-wait]
29+
--------------------------------------------------
30+
<1> Set the `wait_if_ongoing` flag to `true`
31+
32+
["source","java",subs="attributes,callouts,macros"]
33+
--------------------------------------------------
34+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-force]
35+
--------------------------------------------------
36+
<1> Set the `force` flag to `true`
37+
38+
[[java-rest-high-flush-sync]]
39+
==== Synchronous Execution
40+
41+
["source","java",subs="attributes,callouts,macros"]
42+
--------------------------------------------------
43+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute]
44+
--------------------------------------------------
45+
46+
[[java-rest-high-flush-async]]
47+
==== Asynchronous Execution
48+
49+
The asynchronous execution of a flush request requires both the `FlushRequest`
50+
instance and an `ActionListener` instance to be passed to the asynchronous
51+
method:
52+
53+
["source","java",subs="attributes,callouts,macros"]
54+
--------------------------------------------------
55+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-async]
56+
--------------------------------------------------
57+
<1> The `FlushRequest` to execute and the `ActionListener` to use when
58+
the execution completes
59+
60+
The asynchronous method does not block and returns immediately. Once it is
61+
completed the `ActionListener` is called back using the `onResponse` method
62+
if the execution successfully completed or using the `onFailure` method if
63+
it failed.
64+
65+
A typical listener for `FlushResponse` looks like:
66+
67+
["source","java",subs="attributes,callouts,macros"]
68+
--------------------------------------------------
69+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-listener]
70+
--------------------------------------------------
71+
<1> Called when the execution is successfully completed. The response is
72+
provided as an argument
73+
<2> Called in case of failure. The raised exception is provided as an argument
74+
75+
[[java-rest-high-flush-response]]
76+
==== Flush Response
77+
78+
The returned `FlushResponse` allows to retrieve information about the
79+
executed operation as follows:
80+
81+
["source","java",subs="attributes,callouts,macros"]
82+
--------------------------------------------------
83+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-response]
84+
--------------------------------------------------
85+
<1> Total number of shards hit by the flush request
86+
<2> Number of shards where the flush has succeeded
87+
<3> Number of shards where the flush has failed
88+
<4> A list of failures if the operation failed on one or more shards
89+
90+
By default, if the indices were not found, an `ElasticsearchException` will be thrown:
91+
92+
["source","java",subs="attributes,callouts,macros"]
93+
--------------------------------------------------
94+
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-notfound]
95+
--------------------------------------------------
96+
<1> Do something if the indices to be flushed were not found

docs/java-rest/high-level/supported-apis.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ Index Management::
5353
* <<java-rest-high-shrink-index>>
5454
* <<java-rest-high-split-index>>
5555
* <<java-rest-high-refresh>>
56+
* <<java-rest-high-flush>>
5657
* <<java-rest-high-rollover-index>>
5758

5859
Mapping Management::
@@ -70,6 +71,7 @@ include::indices/close_index.asciidoc[]
7071
include::indices/shrink_index.asciidoc[]
7172
include::indices/split_index.asciidoc[]
7273
include::indices/refresh.asciidoc[]
74+
include::indices/flush.asciidoc[]
7375
include::indices/rollover.asciidoc[]
7476
include::indices/put_mapping.asciidoc[]
7577
include::indices/update_aliases.asciidoc[]

0 commit comments

Comments
 (0)