From 4ffe52fa05de2d0dfea6842eb9ed5da614c70925 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 25 Nov 2020 09:23:28 +0100 Subject: [PATCH] Add support for data stream APIs in transport client. Backporting #65433 to the 7.10 branch. --- .../elasticsearch/xpack/core/XPackClient.java | 7 ++ .../xpack/core/XPackClientPlugin.java | 11 ++- .../core/datastreams/DataStreamClient.java | 68 +++++++++++++++++++ .../xpack/DataStreamTransportClientIT.java | 41 +++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamClient.java create mode 100644 x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/DataStreamTransportClientIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java index 439abc808cbbf..978c7829f5f57 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClient.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder; import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.datastreams.DataStreamClient; import org.elasticsearch.xpack.core.enrich.client.EnrichClient; import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; import org.elasticsearch.xpack.core.ilm.client.ILMClient; @@ -45,6 +46,7 @@ public class XPackClient { private final MachineLearningClient machineLearning; private final ILMClient ilmClient; private final EnrichClient enrichClient; + private final DataStreamClient dataStreamClient; public XPackClient(Client client) { this.client = Objects.requireNonNull(client, "client"); @@ -56,6 +58,7 @@ public XPackClient(Client client) { this.machineLearning = new MachineLearningClient(client); this.ilmClient = new ILMClient(client); this.enrichClient = new EnrichClient(client); + this.dataStreamClient = new DataStreamClient(client); } public Client es() { @@ -94,6 +97,10 @@ public EnrichClient enrichClient() { return enrichClient; } + public DataStreamClient dataStreamClient() { + return dataStreamClient; + } + public XPackClient withHeaders(Map headers) { return new XPackClient(client.filterWithHeader(headers)); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 9551e9b23f597..75e6fca0a5376 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -38,6 +38,10 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.Transport; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackUsageAction; import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage; @@ -489,7 +493,12 @@ public List> getClientActions() { DeleteAsyncResultAction.INSTANCE, // Point in time OpenPointInTimeAction.INSTANCE, - ClosePointInTimeAction.INSTANCE + ClosePointInTimeAction.INSTANCE, + // Data streams, + CreateDataStreamAction.INSTANCE, + GetDataStreamAction.INSTANCE, + DeleteDataStreamAction.INSTANCE, + DataStreamsStatsAction.INSTANCE ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamClient.java new file mode 100644 index 0000000000000..ff6f8fb7c08e0 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/DataStreamClient.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.datastreams; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; + +import java.util.Objects; + +public class DataStreamClient { + + private final ElasticsearchClient client; + + public DataStreamClient(ElasticsearchClient client) { + this.client = Objects.requireNonNull(client); + } + + public void createDataStream(CreateDataStreamAction.Request request, ActionListener listener) { + client.execute(CreateDataStreamAction.INSTANCE, request, listener); + } + + public ActionFuture createDataStream(CreateDataStreamAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(CreateDataStreamAction.INSTANCE, request, listener); + return listener; + } + + public void getDataStream(GetDataStreamAction.Request request, ActionListener listener) { + client.execute(GetDataStreamAction.INSTANCE, request, listener); + } + + public ActionFuture getDataStream(GetDataStreamAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(GetDataStreamAction.INSTANCE, request, listener); + return listener; + } + + public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener) { + client.execute(DeleteDataStreamAction.INSTANCE, request, listener); + } + + public ActionFuture deleteDataStream(DeleteDataStreamAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(DeleteDataStreamAction.INSTANCE, request, listener); + return listener; + } + + public void dataStreamsStats(DataStreamsStatsAction.Request request, ActionListener listener) { + client.execute(DataStreamsStatsAction.INSTANCE, request, listener); + } + + public ActionFuture dataStreamsStats(DataStreamsStatsAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(DataStreamsStatsAction.INSTANCE, request, listener); + return listener; + } +} diff --git a/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/DataStreamTransportClientIT.java b/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/DataStreamTransportClientIT.java new file mode 100644 index 0000000000000..5600150110dca --- /dev/null +++ b/x-pack/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/DataStreamTransportClientIT.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack; + +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.datastreams.DataStreamClient; + +import static org.hamcrest.Matchers.equalTo; + +public class DataStreamTransportClientIT extends ESXPackSmokeClientTestCase { + + public void testTransportClientUsage() { + Client client = getClient(); + XPackClient xPackClient = new XPackClient(client); + DataStreamClient dataStreamClient = xPackClient.dataStreamClient(); + + dataStreamClient.createDataStream(new CreateDataStreamAction.Request("logs-http-eu1")).actionGet(); + + GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"logs-http-eu1"}); + GetDataStreamAction.Response response = dataStreamClient.getDataStream(getDataStreamRequest).actionGet(); + assertThat(response.getDataStreams().size(), equalTo(1)); + assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-http-eu1")); + + DataStreamsStatsAction.Response dataStreamStatsResponse = + dataStreamClient.dataStreamsStats(new DataStreamsStatsAction.Request()).actionGet(); + assertThat(dataStreamStatsResponse.getDataStreamCount(), equalTo(1)); + + DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(new String[]{"logs-http-eu1"}); + dataStreamClient.deleteDataStream(deleteDataStreamRequest).actionGet(); + } + +}