Skip to content

Commit ea08f95

Browse files
authored
Add support for data stream APIs in transport client. (#65433)
1 parent 14e1308 commit ea08f95

File tree

4 files changed

+126
-1
lines changed

4 files changed

+126
-1
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.xpack.core.action.XPackInfoAction;
1818
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
1919
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
20+
import org.elasticsearch.xpack.core.datastreams.DataStreamClient;
2021
import org.elasticsearch.xpack.core.enrich.client.EnrichClient;
2122
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
2223
import org.elasticsearch.xpack.core.ilm.client.ILMClient;
@@ -45,6 +46,7 @@ public class XPackClient {
4546
private final MachineLearningClient machineLearning;
4647
private final ILMClient ilmClient;
4748
private final EnrichClient enrichClient;
49+
private final DataStreamClient dataStreamClient;
4850

4951
public XPackClient(Client client) {
5052
this.client = Objects.requireNonNull(client, "client");
@@ -56,6 +58,7 @@ public XPackClient(Client client) {
5658
this.machineLearning = new MachineLearningClient(client);
5759
this.ilmClient = new ILMClient(client);
5860
this.enrichClient = new EnrichClient(client);
61+
this.dataStreamClient = new DataStreamClient(client);
5962
}
6063

6164
public Client es() {
@@ -94,6 +97,10 @@ public EnrichClient enrichClient() {
9497
return enrichClient;
9598
}
9699

100+
public DataStreamClient dataStreamClient() {
101+
return dataStreamClient;
102+
}
103+
97104
public XPackClient withHeaders(Map<String, String> headers) {
98105
return new XPackClient(client.filterWithHeader(headers));
99106
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import org.elasticsearch.threadpool.ThreadPool;
4040
import org.elasticsearch.transport.SharedGroupFactory;
4141
import org.elasticsearch.transport.Transport;
42+
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
43+
import org.elasticsearch.xpack.core.action.DataStreamsStatsAction;
44+
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
45+
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
4246
import org.elasticsearch.xpack.core.action.XPackInfoAction;
4347
import org.elasticsearch.xpack.core.action.XPackUsageAction;
4448
import org.elasticsearch.xpack.core.aggregatemetric.AggregateMetricFeatureSetUsage;
@@ -496,7 +500,12 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
496500
DeleteAsyncResultAction.INSTANCE,
497501
// Point in time
498502
OpenPointInTimeAction.INSTANCE,
499-
ClosePointInTimeAction.INSTANCE
503+
ClosePointInTimeAction.INSTANCE,
504+
// Data streams,
505+
CreateDataStreamAction.INSTANCE,
506+
GetDataStreamAction.INSTANCE,
507+
DeleteDataStreamAction.INSTANCE,
508+
DataStreamsStatsAction.INSTANCE
500509
));
501510

502511
// rollupV2
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.datastreams;
8+
9+
import org.elasticsearch.action.ActionFuture;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.PlainActionFuture;
12+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13+
import org.elasticsearch.client.ElasticsearchClient;
14+
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
15+
import org.elasticsearch.xpack.core.action.DataStreamsStatsAction;
16+
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
17+
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
18+
19+
import java.util.Objects;
20+
21+
public class DataStreamClient {
22+
23+
private final ElasticsearchClient client;
24+
25+
public DataStreamClient(ElasticsearchClient client) {
26+
this.client = Objects.requireNonNull(client);
27+
}
28+
29+
public void createDataStream(CreateDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
30+
client.execute(CreateDataStreamAction.INSTANCE, request, listener);
31+
}
32+
33+
public ActionFuture<AcknowledgedResponse> createDataStream(CreateDataStreamAction.Request request) {
34+
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
35+
client.execute(CreateDataStreamAction.INSTANCE, request, listener);
36+
return listener;
37+
}
38+
39+
public void getDataStream(GetDataStreamAction.Request request, ActionListener<GetDataStreamAction.Response> listener) {
40+
client.execute(GetDataStreamAction.INSTANCE, request, listener);
41+
}
42+
43+
public ActionFuture<GetDataStreamAction.Response> getDataStream(GetDataStreamAction.Request request) {
44+
final PlainActionFuture<GetDataStreamAction.Response> listener = PlainActionFuture.newFuture();
45+
client.execute(GetDataStreamAction.INSTANCE, request, listener);
46+
return listener;
47+
}
48+
49+
public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener<AcknowledgedResponse> listener) {
50+
client.execute(DeleteDataStreamAction.INSTANCE, request, listener);
51+
}
52+
53+
public ActionFuture<AcknowledgedResponse> deleteDataStream(DeleteDataStreamAction.Request request) {
54+
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
55+
client.execute(DeleteDataStreamAction.INSTANCE, request, listener);
56+
return listener;
57+
}
58+
59+
public void dataStreamsStats(DataStreamsStatsAction.Request request, ActionListener<DataStreamsStatsAction.Response> listener) {
60+
client.execute(DataStreamsStatsAction.INSTANCE, request, listener);
61+
}
62+
63+
public ActionFuture<DataStreamsStatsAction.Response> dataStreamsStats(DataStreamsStatsAction.Request request) {
64+
final PlainActionFuture<DataStreamsStatsAction.Response> listener = PlainActionFuture.newFuture();
65+
client.execute(DataStreamsStatsAction.INSTANCE, request, listener);
66+
return listener;
67+
}
68+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack;
8+
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.xpack.core.XPackClient;
11+
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
12+
import org.elasticsearch.xpack.core.action.DataStreamsStatsAction;
13+
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
14+
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
15+
import org.elasticsearch.xpack.core.datastreams.DataStreamClient;
16+
17+
import static org.hamcrest.Matchers.equalTo;
18+
19+
public class DataStreamTransportClientIT extends ESXPackSmokeClientTestCase {
20+
21+
public void testTransportClientUsage() {
22+
Client client = getClient();
23+
XPackClient xPackClient = new XPackClient(client);
24+
DataStreamClient dataStreamClient = xPackClient.dataStreamClient();
25+
26+
dataStreamClient.createDataStream(new CreateDataStreamAction.Request("logs-http-eu1")).actionGet();
27+
28+
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"logs-http-eu1"});
29+
GetDataStreamAction.Response response = dataStreamClient.getDataStream(getDataStreamRequest).actionGet();
30+
assertThat(response.getDataStreams().size(), equalTo(1));
31+
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-http-eu1"));
32+
33+
DataStreamsStatsAction.Response dataStreamStatsResponse =
34+
dataStreamClient.dataStreamsStats(new DataStreamsStatsAction.Request()).actionGet();
35+
assertThat(dataStreamStatsResponse.getDataStreamCount(), equalTo(1));
36+
37+
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(new String[]{"logs-http-eu1"});
38+
dataStreamClient.deleteDataStream(deleteDataStreamRequest).actionGet();
39+
}
40+
41+
}

0 commit comments

Comments
 (0)