Skip to content

Commit 80d20a9

Browse files
sohaibiftikharjavanna
authored andcommitted
REST high-level client: add get ingest pipeline API (#30847)
Relates to #27205
1 parent 70749e0 commit 80d20a9

File tree

11 files changed

+483
-29
lines changed

11 files changed

+483
-29
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2525
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
26+
import org.elasticsearch.action.ingest.GetPipelineRequest;
27+
import org.elasticsearch.action.ingest.GetPipelineResponse;
2628
import org.elasticsearch.action.ingest.PutPipelineRequest;
2729
import org.elasticsearch.action.ingest.PutPipelineResponse;
2830

@@ -87,4 +89,26 @@ public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipel
8789
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
8890
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
8991
}
92+
93+
/**
94+
* Get an existing pipeline
95+
* <p>
96+
* See
97+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
98+
*/
99+
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
100+
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
101+
GetPipelineResponse::fromXContent, emptySet(), headers);
102+
}
103+
104+
/**
105+
* Asynchronously get an existing pipeline
106+
* <p>
107+
* See
108+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
109+
*/
110+
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) {
111+
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
112+
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
113+
}
90114
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.action.get.MultiGetRequest;
6262
import org.elasticsearch.action.index.IndexRequest;
6363
import org.elasticsearch.action.ingest.PutPipelineRequest;
64+
import org.elasticsearch.action.ingest.GetPipelineRequest;
6465
import org.elasticsearch.action.search.ClearScrollRequest;
6566
import org.elasticsearch.action.search.MultiSearchRequest;
6667
import org.elasticsearch.action.search.SearchRequest;
@@ -620,6 +621,18 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
620621
return request;
621622
}
622623

624+
static Request getPipeline(GetPipelineRequest getPipelineRequest) {
625+
String endpoint = new EndpointBuilder()
626+
.addPathPartAsIs("_ingest/pipeline")
627+
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
628+
.build();
629+
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
630+
631+
Params parameters = new Params(request);
632+
parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
633+
return request;
634+
}
635+
623636
static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
624637
String endpoint = new EndpointBuilder()
625638
.addPathPartAsIs("_ingest/pipeline")

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2424
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
25+
import org.elasticsearch.action.ingest.GetPipelineRequest;
26+
import org.elasticsearch.action.ingest.GetPipelineResponse;
2527
import org.elasticsearch.action.ingest.PutPipelineRequest;
2628
import org.elasticsearch.action.ingest.PutPipelineResponse;
2729
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -32,7 +34,7 @@
3234
import org.elasticsearch.common.xcontent.XContentType;
3335
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3436
import org.elasticsearch.indices.recovery.RecoverySettings;
35-
import org.elasticsearch.ingest.Pipeline;
37+
import org.elasticsearch.ingest.PipelineConfiguration;
3638
import org.elasticsearch.rest.RestStatus;
3739

3840
import java.io.IOException;
@@ -113,31 +115,7 @@ public void testClusterUpdateSettingNonExistent() {
113115

114116
public void testPutPipeline() throws IOException {
115117
String id = "some_pipeline_id";
116-
XContentType xContentType = randomFrom(XContentType.values());
117-
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
118-
pipelineBuilder.startObject();
119-
{
120-
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
121-
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
122-
{
123-
pipelineBuilder.startObject().startObject("set");
124-
{
125-
pipelineBuilder
126-
.field("field", "foo")
127-
.field("value", "bar");
128-
}
129-
pipelineBuilder.endObject().endObject();
130-
pipelineBuilder.startObject().startObject("convert");
131-
{
132-
pipelineBuilder
133-
.field("field", "rank")
134-
.field("type", "integer");
135-
}
136-
pipelineBuilder.endObject().endObject();
137-
}
138-
pipelineBuilder.endArray();
139-
}
140-
pipelineBuilder.endObject();
118+
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
141119
PutPipelineRequest request = new PutPipelineRequest(
142120
id,
143121
BytesReference.bytes(pipelineBuilder),
@@ -147,4 +125,27 @@ public void testPutPipeline() throws IOException {
147125
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
148126
assertTrue(putPipelineResponse.isAcknowledged());
149127
}
128+
129+
public void testGetPipeline() throws IOException {
130+
String id = "some_pipeline_id";
131+
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
132+
{
133+
PutPipelineRequest request = new PutPipelineRequest(
134+
id,
135+
BytesReference.bytes(pipelineBuilder),
136+
pipelineBuilder.contentType()
137+
);
138+
createPipeline(request);
139+
}
140+
141+
GetPipelineRequest request = new GetPipelineRequest(id);
142+
143+
GetPipelineResponse response =
144+
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
145+
assertTrue(response.isFound());
146+
assertEquals(response.pipelines().get(0).getId(), id);
147+
PipelineConfiguration expectedConfig =
148+
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
149+
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
150+
}
150151
}

client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121

2222
import org.apache.http.Header;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ingest.PutPipelineRequest;
2425
import org.elasticsearch.action.support.PlainActionFuture;
26+
import org.elasticsearch.common.bytes.BytesReference;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentType;
29+
import org.elasticsearch.ingest.Pipeline;
2530
import org.elasticsearch.test.rest.ESRestTestCase;
2631
import org.junit.AfterClass;
2732
import org.junit.Before;
@@ -80,4 +85,42 @@ private HighLevelClient(RestClient restClient) {
8085
super(restClient, (client) -> {}, Collections.emptyList());
8186
}
8287
}
88+
89+
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
90+
XContentType xContentType = randomFrom(XContentType.values());
91+
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
92+
pipelineBuilder.startObject();
93+
{
94+
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
95+
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
96+
{
97+
pipelineBuilder.startObject().startObject("set");
98+
{
99+
pipelineBuilder
100+
.field("field", "foo")
101+
.field("value", "bar");
102+
}
103+
pipelineBuilder.endObject().endObject();
104+
pipelineBuilder.startObject().startObject("convert");
105+
{
106+
pipelineBuilder
107+
.field("field", "rank")
108+
.field("type", "integer");
109+
}
110+
pipelineBuilder.endObject().endObject();
111+
}
112+
pipelineBuilder.endArray();
113+
}
114+
pipelineBuilder.endObject();
115+
return pipelineBuilder;
116+
}
117+
118+
protected static void createPipeline(String pipelineId) throws IOException {
119+
XContentBuilder builder = buildRandomXContentPipeline();
120+
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
121+
}
122+
123+
protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
124+
assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
125+
}
83126
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.action.get.GetRequest;
6464
import org.elasticsearch.action.get.MultiGetRequest;
6565
import org.elasticsearch.action.index.IndexRequest;
66+
import org.elasticsearch.action.ingest.GetPipelineRequest;
6667
import org.elasticsearch.action.ingest.PutPipelineRequest;
6768
import org.elasticsearch.action.search.ClearScrollRequest;
6869
import org.elasticsearch.action.search.MultiSearchRequest;
@@ -1450,6 +1451,20 @@ public void testPutPipeline() throws IOException {
14501451
assertEquals(expectedParams, expectedRequest.getParameters());
14511452
}
14521453

1454+
public void testGetPipeline() {
1455+
String pipelineId = "some_pipeline_id";
1456+
Map<String, String> expectedParams = new HashMap<>();
1457+
GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
1458+
setRandomMasterTimeout(request, expectedParams);
1459+
Request expectedRequest = RequestConverters.getPipeline(request);
1460+
StringJoiner endpoint = new StringJoiner("/", "/", "");
1461+
endpoint.add("_ingest/pipeline");
1462+
endpoint.add(pipelineId);
1463+
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
1464+
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
1465+
assertEquals(expectedParams, expectedRequest.getParameters());
1466+
}
1467+
14531468
public void testRollover() throws IOException {
14541469
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
14551470
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.action.LatchedActionListener;
2424
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2525
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
26+
import org.elasticsearch.action.ingest.GetPipelineRequest;
27+
import org.elasticsearch.action.ingest.GetPipelineResponse;
2628
import org.elasticsearch.action.ingest.PutPipelineRequest;
2729
import org.elasticsearch.action.ingest.PutPipelineResponse;
2830
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
@@ -34,11 +36,13 @@
3436
import org.elasticsearch.common.unit.TimeValue;
3537
import org.elasticsearch.common.xcontent.XContentType;
3638
import org.elasticsearch.indices.recovery.RecoverySettings;
39+
import org.elasticsearch.ingest.PipelineConfiguration;
3740

3841
import java.io.IOException;
3942
import java.nio.charset.StandardCharsets;
4043
import java.util.HashMap;
4144
import java.util.Map;
45+
import java.util.List;
4246
import java.util.concurrent.CountDownLatch;
4347
import java.util.concurrent.TimeUnit;
4448

@@ -257,4 +261,74 @@ public void onFailure(Exception e) {
257261
assertTrue(latch.await(30L, TimeUnit.SECONDS));
258262
}
259263
}
264+
265+
public void testGetPipeline() throws IOException {
266+
RestHighLevelClient client = highLevelClient();
267+
268+
{
269+
createPipeline("my-pipeline-id");
270+
}
271+
272+
{
273+
// tag::get-pipeline-request
274+
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
275+
// end::get-pipeline-request
276+
277+
// tag::get-pipeline-request-masterTimeout
278+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
279+
request.masterNodeTimeout("1m"); // <2>
280+
// end::get-pipeline-request-masterTimeout
281+
282+
// tag::get-pipeline-execute
283+
GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
284+
// end::get-pipeline-execute
285+
286+
// tag::get-pipeline-response
287+
boolean successful = response.isFound(); // <1>
288+
List<PipelineConfiguration> pipelines = response.pipelines(); // <2>
289+
for(PipelineConfiguration pipeline: pipelines) {
290+
Map<String, Object> config = pipeline.getConfigAsMap(); // <3>
291+
}
292+
// end::get-pipeline-response
293+
294+
assertTrue(successful);
295+
}
296+
}
297+
298+
public void testGetPipelineAsync() throws Exception {
299+
RestHighLevelClient client = highLevelClient();
300+
301+
{
302+
createPipeline("my-pipeline-id");
303+
}
304+
305+
{
306+
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");
307+
308+
// tag::get-pipeline-execute-listener
309+
ActionListener<GetPipelineResponse> listener =
310+
new ActionListener<GetPipelineResponse>() {
311+
@Override
312+
public void onResponse(GetPipelineResponse response) {
313+
// <1>
314+
}
315+
316+
@Override
317+
public void onFailure(Exception e) {
318+
// <2>
319+
}
320+
};
321+
// end::get-pipeline-execute-listener
322+
323+
// Replace the empty listener by a blocking listener in test
324+
final CountDownLatch latch = new CountDownLatch(1);
325+
listener = new LatchedActionListener<>(listener, latch);
326+
327+
// tag::get-pipeline-execute-async
328+
client.cluster().getPipelineAsync(request, listener); // <1>
329+
// end::get-pipeline-execute-async
330+
331+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
332+
}
333+
}
260334
}

0 commit comments

Comments
 (0)