Skip to content

Commit 4ec68ec

Browse files
authored
Add ingest processor existence helper method (#45156)
This commit adds a helper method to the ingest service allowing it to inspect a pipeline by id and verify the existence of a processor in the pipeline. This work exposed a potential bug in that some processors contain inner processors that are passed in at instantiation. These processors needed a common way to expose their inner processors, so the WrappingProcessor was created in order to expose the inner processor.
1 parent 65b5020 commit 4ec68ec

File tree

9 files changed

+272
-43
lines changed

9 files changed

+272
-43
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Set;
31+
32+
import org.elasticsearch.ingest.WrappingProcessor;
3133
import org.elasticsearch.script.ScriptService;
3234

3335
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -43,7 +45,7 @@
4345
*
4446
* Note that this processor is experimental.
4547
*/
46-
public final class ForEachProcessor extends AbstractProcessor {
48+
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {
4749

4850
public static final String TYPE = "foreach";
4951

@@ -97,7 +99,7 @@ String getField() {
9799
return field;
98100
}
99101

100-
Processor getProcessor() {
102+
public Processor getInnerProcessor() {
101103
return processor;
102104
}
103105

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void testCreate() throws Exception {
4949
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
5050
assertThat(forEachProcessor, Matchers.notNullValue());
5151
assertThat(forEachProcessor.getField(), equalTo("_field"));
52-
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
52+
assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
5353
assertFalse(forEachProcessor.isIgnoreMissing());
5454
}
5555

@@ -66,7 +66,7 @@ public void testSetIgnoreMissing() throws Exception {
6666
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
6767
assertThat(forEachProcessor, Matchers.notNullValue());
6868
assertThat(forEachProcessor.getField(), equalTo("_field"));
69-
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
69+
assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
7070
assertTrue(forEachProcessor.isIgnoreMissing());
7171
}
7272

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.util.function.LongSupplier;
3838
import java.util.stream.Collectors;
3939

40-
public class ConditionalProcessor extends AbstractProcessor {
40+
public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor {
4141

4242
private static final Map<String, String> DEPRECATIONS =
4343
Map.of("_type", "[types removal] Looking up doc types [_type] in scripts is deprecated.");
@@ -90,7 +90,7 @@ boolean evaluate(IngestDocument ingestDocument) {
9090
new DeprecationMap(ingestDocument.getSourceAndMetadata(), DEPRECATIONS, "conditional-processor")));
9191
}
9292

93-
Processor getProcessor() {
93+
public Processor getInnerProcessor() {
9494
return processor;
9595
}
9696

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public IngestStats stats() {
388388
static String getProcessorName(Processor processor){
389389
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
390390
if(processor instanceof ConditionalProcessor){
391-
processor = ((ConditionalProcessor) processor).getProcessor();
391+
processor = ((ConditionalProcessor) processor).getInnerProcessor();
392392
}
393393
StringBuilder sb = new StringBuilder(5);
394394
sb.append(processor.getType());
@@ -561,6 +561,40 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
561561
}
562562
}
563563

564+
/**
565+
* Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline.
566+
* @param pipelineId the pipeline to inspect
567+
* @param clazz the Processor class to look for
568+
* @return True if the pipeline contains an instance of the Processor class passed in
569+
*/
570+
public boolean hasProcessor(String pipelineId, Class<? extends Processor> clazz) {
571+
Pipeline pipeline = getPipeline(pipelineId);
572+
if (pipeline == null) {
573+
return false;
574+
}
575+
576+
for (Processor processor: pipeline.flattenAllProcessors()) {
577+
if (clazz.isAssignableFrom(processor.getClass())) {
578+
return true;
579+
}
580+
581+
while (processor instanceof WrappingProcessor) {
582+
WrappingProcessor wrappingProcessor = (WrappingProcessor) processor;
583+
if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) {
584+
return true;
585+
}
586+
processor = wrappingProcessor.getInnerProcessor();
587+
// break in the case of self referencing processors in the event a processor author creates a
588+
// wrapping processor that has its inner processor refer to itself.
589+
if (wrappingProcessor == processor) {
590+
break;
591+
}
592+
}
593+
}
594+
595+
return false;
596+
}
597+
564598
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
565599
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
566600
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";

server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
4949
if (conditionalProcessor.evaluate(ingestDocument) == false) {
5050
return ingestDocument;
5151
}
52-
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
53-
processor = conditionalProcessor.getProcessor();
52+
if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
53+
processor = conditionalProcessor.getInnerProcessor();
5454
}
5555
}
5656
if (processor instanceof PipelineProcessor) {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest;
21+
22+
/**
23+
* A srapping processor is one that encapsulates an inner processor, or a processor that the wrapped processor enacts upon. All processors
24+
* that contain an "inner" processor should implement this interface, such that the actual processor can be obtained.
25+
*/
26+
public interface WrappingProcessor extends Processor {
27+
28+
/**
29+
* Method for retrieving the inner processor from a wrapped processor.
30+
* @return the inner processor
31+
*/
32+
Processor getInnerProcessor();
33+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest;
21+
22+
import java.util.function.Consumer;
23+
24+
class FakeProcessor implements Processor {
25+
private String type;
26+
private String tag;
27+
private Consumer<IngestDocument> executor;
28+
29+
FakeProcessor(String type, String tag, Consumer<IngestDocument> executor) {
30+
this.type = type;
31+
this.tag = tag;
32+
this.executor = executor;
33+
}
34+
35+
@Override
36+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
37+
executor.accept(ingestDocument);
38+
return ingestDocument;
39+
}
40+
41+
@Override
42+
public String getType() {
43+
return type;
44+
}
45+
46+
@Override
47+
public String getTag() {
48+
return tag;
49+
}
50+
}

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 93 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,16 @@
4343
import org.elasticsearch.cluster.service.ClusterService;
4444
import org.elasticsearch.common.bytes.BytesArray;
4545
import org.elasticsearch.common.logging.Loggers;
46+
import org.elasticsearch.common.settings.Settings;
4647
import org.elasticsearch.common.util.concurrent.EsExecutors;
4748
import org.elasticsearch.common.xcontent.XContentType;
4849
import org.elasticsearch.index.VersionType;
4950
import org.elasticsearch.plugins.IngestPlugin;
51+
import org.elasticsearch.script.MockScriptEngine;
52+
import org.elasticsearch.script.Script;
53+
import org.elasticsearch.script.ScriptModule;
54+
import org.elasticsearch.script.ScriptService;
55+
import org.elasticsearch.script.ScriptType;
5056
import org.elasticsearch.test.ESTestCase;
5157
import org.elasticsearch.test.MockLogAppender;
5258
import org.elasticsearch.threadpool.ThreadPool;
@@ -64,6 +70,7 @@
6470
import java.util.concurrent.ExecutorService;
6571
import java.util.function.BiConsumer;
6672
import java.util.function.Consumer;
73+
import java.util.function.LongSupplier;
6774

6875
import static java.util.Collections.emptyMap;
6976
import static java.util.Collections.emptySet;
@@ -263,6 +270,89 @@ public void testValidateNoIngestInfo() throws Exception {
263270
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
264271
}
265272

273+
public void testHasProcessor() throws Exception {
274+
IngestService ingestService = createWithProcessors();
275+
String id = "_id";
276+
Pipeline pipeline = ingestService.getPipeline(id);
277+
assertThat(pipeline, nullValue());
278+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
279+
280+
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
281+
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," +
282+
"{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"),
283+
XContentType.JSON);
284+
ClusterState previousClusterState = clusterState;
285+
clusterState = IngestService.innerPut(putRequest, clusterState);
286+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
287+
pipeline = ingestService.getPipeline(id);
288+
assertThat(pipeline, notNullValue());
289+
290+
assertTrue(ingestService.hasProcessor(id, Processor.class));
291+
assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
292+
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
293+
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
294+
295+
assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class));
296+
}
297+
298+
public void testHasProcessorComplexConditional() throws Exception {
299+
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
300+
String scriptName = "conditionalScript";
301+
ScriptService scriptService = new ScriptService(Settings.builder().build(),
302+
Collections.singletonMap(
303+
Script.DEFAULT_SCRIPT_LANG,
304+
new MockScriptEngine(
305+
Script.DEFAULT_SCRIPT_LANG,
306+
Collections.singletonMap(
307+
scriptName, ctx -> {
308+
ctx.get("_type");
309+
return true;
310+
}
311+
),
312+
Collections.emptyMap()
313+
)
314+
),
315+
new HashMap<>(ScriptModule.CORE_CONTEXTS)
316+
);
317+
318+
Map<String, Processor.Factory> processors = new HashMap<>();
319+
processors.put("complexSet", (factories, tag, config) -> {
320+
String field = (String) config.remove("field");
321+
String value = (String) config.remove("value");
322+
323+
return new ConditionalProcessor(randomAlphaOfLength(10),
324+
new Script(
325+
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
326+
scriptName, Collections.emptyMap()), scriptService,
327+
new ConditionalProcessor(randomAlphaOfLength(10) + "-nested",
328+
new Script(
329+
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
330+
scriptName, Collections.emptyMap()), scriptService,
331+
new FakeProcessor("complexSet", tag, (ingestDocument) -> ingestDocument.setFieldValue(field, value))));
332+
});
333+
334+
IngestService ingestService = createWithProcessors(processors);
335+
String id = "_id";
336+
Pipeline pipeline = ingestService.getPipeline(id);
337+
assertThat(pipeline, nullValue());
338+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
339+
340+
PutPipelineRequest putRequest = new PutPipelineRequest(id,
341+
new BytesArray("{\"processors\": [{\"complexSet\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON);
342+
ClusterState previousClusterState = clusterState;
343+
clusterState = IngestService.innerPut(putRequest, clusterState);
344+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
345+
pipeline = ingestService.getPipeline(id);
346+
assertThat(pipeline, notNullValue());
347+
348+
assertTrue(ingestService.hasProcessor(id, Processor.class));
349+
assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
350+
assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
351+
assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class));
352+
353+
assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
354+
}
355+
266356
public void testCrud() throws Exception {
267357
IngestService ingestService = createWithProcessors();
268358
String id = "_id";
@@ -946,7 +1036,7 @@ public void testStatName(){
9461036
assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
9471037

9481038
ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
949-
when(conditionalProcessor.getProcessor()).thenReturn(processor);
1039+
when(conditionalProcessor.getInnerProcessor()).thenReturn(processor);
9501040
assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
9511041

9521042
PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
@@ -1012,42 +1102,11 @@ private static IngestService createWithProcessors() {
10121102
processors.put("set", (factories, tag, config) -> {
10131103
String field = (String) config.remove("field");
10141104
String value = (String) config.remove("value");
1015-
return new Processor() {
1016-
@Override
1017-
public IngestDocument execute(IngestDocument ingestDocument) {
1018-
ingestDocument.setFieldValue(field, value);
1019-
return ingestDocument;
1020-
}
1021-
1022-
@Override
1023-
public String getType() {
1024-
return "set";
1025-
}
1026-
1027-
@Override
1028-
public String getTag() {
1029-
return tag;
1030-
}
1031-
};
1105+
return new FakeProcessor("set", tag, (ingestDocument) ->ingestDocument.setFieldValue(field, value));
10321106
});
10331107
processors.put("remove", (factories, tag, config) -> {
10341108
String field = (String) config.remove("field");
1035-
return new Processor() {
1036-
@Override
1037-
public IngestDocument execute(IngestDocument ingestDocument) {
1038-
ingestDocument.removeField(field);
1039-
return ingestDocument;
1040-
}
1041-
1042-
@Override
1043-
public String getType() {
1044-
return "remove";
1045-
}
1046-
1047-
@Override
1048-
public String getTag() {
1049-
return tag;
1050-
}
1109+
return new WrappingProcessorImpl("remove", tag, (ingestDocument -> ingestDocument.removeField(field))) {
10511110
};
10521111
});
10531112
return createWithProcessors(processors);

0 commit comments

Comments
 (0)