Skip to content

Commit 3b64194

Browse files
authored
Updating ingest pipeline without changes is no-op (#78196)
1 parent 5964ffe commit 3b64194

File tree

4 files changed

+186
-10
lines changed

4 files changed

+186
-10
lines changed

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.inject.Inject;
2626
import org.elasticsearch.common.xcontent.XContentHelper;
2727
import org.elasticsearch.ingest.IngestInfo;
28+
import org.elasticsearch.ingest.IngestMetadata;
2829
import org.elasticsearch.ingest.IngestService;
2930
import org.elasticsearch.ingest.Pipeline;
3031
import org.elasticsearch.tasks.Task;
@@ -58,22 +59,43 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp
5859
@Override
5960
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
6061
throws Exception {
62+
63+
Map<String, Object> pipelineConfig = null;
64+
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
65+
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
66+
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
67+
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
68+
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
69+
// existing pipeline matches request pipeline -- no need to update
70+
listener.onResponse(AcknowledgedResponse.TRUE);
71+
return;
72+
}
73+
}
74+
6175
if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) {
62-
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
76+
pipelineConfig = pipelineConfig == null
77+
? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2()
78+
: pipelineConfig;
6379
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
6480
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0);
6581
}
6682
}
6783
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
68-
nodesInfoRequest.clear()
69-
.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
70-
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> {
71-
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
72-
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
73-
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
74-
}
75-
ingestService.putPipeline(ingestInfos, request, listener);
76-
}, listener::onFailure));
84+
nodesInfoRequest.clear();
85+
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
86+
client.admin().cluster().nodesInfo(
87+
nodesInfoRequest,
88+
ActionListener.wrap(
89+
nodeInfos -> {
90+
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
91+
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
92+
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
93+
}
94+
ingestService.putPipeline(ingestInfos, request, listener);
95+
},
96+
listener::onFailure
97+
)
98+
);
7799
}
78100

79101
@Override
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.ingest;
10+
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.ActionFilters;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.cluster.ClusterName;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.common.bytes.BytesArray;
18+
import org.elasticsearch.common.util.concurrent.EsExecutors;
19+
import org.elasticsearch.common.xcontent.XContentBuilder;
20+
import org.elasticsearch.common.xcontent.XContentType;
21+
import org.elasticsearch.core.Tuple;
22+
import org.elasticsearch.ingest.IngestMetadata;
23+
import org.elasticsearch.ingest.IngestService;
24+
import org.elasticsearch.ingest.PipelineConfiguration;
25+
import org.elasticsearch.test.ESTestCase;
26+
import org.elasticsearch.test.client.NoOpNodeClient;
27+
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.transport.TransportService;
29+
30+
import java.io.OutputStream;
31+
import java.util.Map;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
35+
import static org.elasticsearch.core.Tuple.tuple;
36+
import static org.hamcrest.Matchers.equalTo;
37+
import static org.mockito.Matchers.anyString;
38+
import static org.mockito.Mockito.mock;
39+
import static org.mockito.Mockito.when;
40+
41+
public class PutPipelineTransportActionTests extends ESTestCase {
42+
43+
public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception {
44+
var randomMap = randomMap(10, 50, PutPipelineTransportActionTests::randomMapEntry);
45+
46+
XContentBuilder x = XContentBuilder.builder(XContentType.JSON.xContent())
47+
.startObject()
48+
.field("processors", randomMap)
49+
.endObject();
50+
51+
OutputStream os = x.getOutputStream();
52+
x.generator().close();
53+
testUpdatingPipeline(os.toString());
54+
}
55+
56+
public void testUpdatingPipelineWithoutChangesIsNoOp() throws Exception {
57+
var value = randomAlphaOfLength(5);
58+
var pipelineString = "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"" + value + "\"}}]}";
59+
testUpdatingPipeline(pipelineString);
60+
}
61+
62+
private void testUpdatingPipeline(String pipelineString) throws Exception {
63+
var threadPool = mock(ThreadPool.class);
64+
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
65+
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
66+
var client = new NoOpNodeClient(threadPool);
67+
var action = new PutPipelineTransportAction(
68+
threadPool,
69+
mock(TransportService.class),
70+
mock(ActionFilters.class),
71+
null,
72+
mock(IngestService.class),
73+
client
74+
);
75+
76+
var pipelineId = randomAlphaOfLength(5);
77+
var value = randomAlphaOfLength(5);
78+
var existingPipeline = new PipelineConfiguration(pipelineId, new BytesArray(pipelineString), XContentType.JSON);
79+
var clusterState = ClusterState.builder(new ClusterName("test"))
80+
.metadata(Metadata.builder().putCustom(
81+
IngestMetadata.TYPE,
82+
new IngestMetadata(Map.of(pipelineId, existingPipeline))
83+
).build()
84+
).build();
85+
86+
CountDownLatch latch = new CountDownLatch(1);
87+
var listener = new ActionListener<AcknowledgedResponse>() {
88+
final AtomicLong successCount = new AtomicLong(0);
89+
final AtomicLong failureCount = new AtomicLong(0);
90+
91+
@Override
92+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
93+
successCount.incrementAndGet();
94+
latch.countDown();
95+
}
96+
97+
@Override
98+
public void onFailure(Exception e) {
99+
failureCount.incrementAndGet();
100+
latch.countDown();
101+
}
102+
103+
public long getSuccessCount() {
104+
return successCount.get();
105+
}
106+
107+
public long getFailureCount() {
108+
return failureCount.get();
109+
}
110+
};
111+
112+
var request = new PutPipelineRequest(pipelineId, new BytesArray(pipelineString), XContentType.JSON);
113+
action.masterOperation(null, request, clusterState, listener);
114+
latch.await();
115+
116+
assertThat(client.getExecutionCount(), equalTo(0L));
117+
assertThat(listener.getSuccessCount(), equalTo(1L));
118+
assertThat(listener.getFailureCount(), equalTo(0L));
119+
}
120+
121+
private static Tuple<String, Object> randomMapEntry() {
122+
return tuple(randomAlphaOfLength(5), randomObject());
123+
}
124+
125+
private static Object randomObject() {
126+
return randomFrom(
127+
random(),
128+
ESTestCase::randomLong,
129+
() -> generateRandomStringArray(10, 5, true),
130+
() -> randomMap(3, 5, PutPipelineTransportActionTests::randomMapEntry),
131+
() -> randomAlphaOfLength(5),
132+
ESTestCase::randomTimeValue,
133+
ESTestCase::randomDouble
134+
);
135+
}
136+
}

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,14 @@ public static <T> T randomFrom(Random random, T... array) {
729729
return RandomPicks.randomFrom(random, array);
730730
}
731731

732+
/** Pick a random object from the given array of suppliers. The array must not be empty. */
733+
@SafeVarargs
734+
@SuppressWarnings("varargs")
735+
public static <T> T randomFrom(Random random, Supplier<T>... array) {
736+
Supplier<T> supplier = RandomPicks.randomFrom(random, array);
737+
return supplier.get();
738+
}
739+
732740
/** Pick a random object from the given list. */
733741
public static <T> T randomFrom(List<T> list) {
734742
return RandomPicks.randomFrom(random(), list);

test/framework/src/main/java/org/elasticsearch/test/client/NoOpNodeClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.Map;
3030
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
3132
import java.util.function.Supplier;
3233

3334
/**
@@ -39,6 +40,8 @@
3940
*/
4041
public class NoOpNodeClient extends NodeClient {
4142

43+
private final AtomicLong executionCount = new AtomicLong(0);
44+
4245
/**
4346
* Build with {@link ThreadPool}. This {@linkplain ThreadPool} is terminated on {@link #close()}.
4447
*/
@@ -56,6 +59,7 @@ public NoOpNodeClient(String testName) {
5659
@Override
5760
public <Request extends ActionRequest, Response extends ActionResponse>
5861
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
62+
executionCount.incrementAndGet();
5963
listener.onResponse(null);
6064
}
6165

@@ -74,13 +78,15 @@ public void initialize(
7478
@Override
7579
public <Request extends ActionRequest, Response extends ActionResponse>
7680
Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
81+
executionCount.incrementAndGet();
7782
listener.onResponse(null);
7883
return null;
7984
}
8085

8186
@Override
8287
public <Request extends ActionRequest, Response extends ActionResponse>
8388
Task executeLocally(ActionType<Response> action, Request request, TaskListener<Response> listener) {
89+
executionCount.incrementAndGet();
8490
listener.onResponse(null, null);
8591
return null;
8692
}
@@ -103,4 +109,8 @@ public void close() {
103109
throw new ElasticsearchException(e.getMessage(), e);
104110
}
105111
}
112+
113+
public long getExecutionCount() {
114+
return executionCount.get();
115+
}
106116
}

0 commit comments

Comments
 (0)