diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index ad93298c646e1..d56a2731d35b9 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; + +import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.script.ScriptService; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; @@ -43,7 +45,7 @@ * * Note that this processor is experimental. */ -public final class ForEachProcessor extends AbstractProcessor { +public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; @@ -97,7 +99,7 @@ String getField() { return field; } - Processor getProcessor() { + public Processor getInnerProcessor() { return processor; } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 7ab19c4147eab..65fee8ce19a7d 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -49,7 +49,7 @@ public void testCreate() throws Exception { ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); - assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor)); assertFalse(forEachProcessor.isIgnoreMissing()); } @@ -66,7 +66,7 @@ public void testSetIgnoreMissing() throws Exception { ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config); assertThat(forEachProcessor, Matchers.notNullValue()); assertThat(forEachProcessor.getField(), equalTo("_field")); - assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor)); + assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor)); assertTrue(forEachProcessor.isIgnoreMissing()); } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index e5333485db52b..e0a054df2db5a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -37,7 +37,7 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; -public class ConditionalProcessor extends AbstractProcessor { +public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor { private static final Map DEPRECATIONS = Map.of("_type", "[types removal] Looking up doc types [_type] in scripts is deprecated."); @@ -90,7 +90,7 @@ boolean evaluate(IngestDocument ingestDocument) { new DeprecationMap(ingestDocument.getSourceAndMetadata(), DEPRECATIONS, "conditional-processor"))); } - Processor getProcessor() { + public Processor getInnerProcessor() { return processor; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9e7d1b7b5bdbd..68fb8284d5a1f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -388,7 +388,7 @@ public IngestStats stats() { static String getProcessorName(Processor processor){ // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name if(processor instanceof ConditionalProcessor){ - processor = ((ConditionalProcessor) processor).getProcessor(); + processor = ((ConditionalProcessor) processor).getInnerProcessor(); } StringBuilder sb = new StringBuilder(5); sb.append(processor.getType()); @@ -561,6 +561,40 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { } } + /** + * Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline. + * @param pipelineId the pipeline to inspect + * @param clazz the Processor class to look for + * @return True if the pipeline contains an instance of the Processor class passed in + */ + public boolean hasProcessor(String pipelineId, Class clazz) { + Pipeline pipeline = getPipeline(pipelineId); + if (pipeline == null) { + return false; + } + + for (Processor processor: pipeline.flattenAllProcessors()) { + if (clazz.isAssignableFrom(processor.getClass())) { + return true; + } + + while (processor instanceof WrappingProcessor) { + WrappingProcessor wrappingProcessor = (WrappingProcessor) processor; + if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) { + return true; + } + processor = wrappingProcessor.getInnerProcessor(); + // break in the case of self referencing processors in the event a processor author creates a + // wrapping processor that has its inner processor refer to itself. + if (wrappingProcessor == processor) { + break; + } + } + } + + return false; + } + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4b78715144649..6c1127a33f801 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -49,8 +49,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (conditionalProcessor.evaluate(ingestDocument) == false) { return ingestDocument; } - if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getProcessor(); + if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getInnerProcessor(); } } if (processor instanceof PipelineProcessor) { diff --git a/server/src/main/java/org/elasticsearch/ingest/WrappingProcessor.java b/server/src/main/java/org/elasticsearch/ingest/WrappingProcessor.java new file mode 100644 index 0000000000000..8783f65e2d79e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/WrappingProcessor.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +/** + * A srapping processor is one that encapsulates an inner processor, or a processor that the wrapped processor enacts upon. All processors + * that contain an "inner" processor should implement this interface, such that the actual processor can be obtained. + */ +public interface WrappingProcessor extends Processor { + + /** + * Method for retrieving the inner processor from a wrapped processor. + * @return the inner processor + */ + Processor getInnerProcessor(); +} diff --git a/server/src/test/java/org/elasticsearch/ingest/FakeProcessor.java b/server/src/test/java/org/elasticsearch/ingest/FakeProcessor.java new file mode 100644 index 0000000000000..90c14a20a8f00 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/FakeProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.function.Consumer; + +class FakeProcessor implements Processor { + private String type; + private String tag; + private Consumer executor; + + FakeProcessor(String type, String tag, Consumer executor) { + this.type = type; + this.tag = tag; + this.executor = executor; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + executor.accept(ingestDocument); + return ingestDocument; + } + + @Override + public String getType() { + return type; + } + + @Override + public String getTag() { + return tag; + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 43e2a8a584979..1d48347a5fa41 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -43,10 +43,16 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; @@ -64,6 +70,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.LongSupplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -263,6 +270,89 @@ public void testValidateNoIngestInfo() throws Exception { ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); } + public void testHasProcessor() throws Exception { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), + XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + + assertTrue(ingestService.hasProcessor(id, Processor.class)); + assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); + assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); + assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); + + assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class)); + } + + public void testHasProcessorComplexConditional() throws Exception { + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), + Collections.singletonMap( + Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine( + Script.DEFAULT_SCRIPT_LANG, + Collections.singletonMap( + scriptName, ctx -> { + ctx.get("_type"); + return true; + } + ), + Collections.emptyMap() + ) + ), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Map processors = new HashMap<>(); + processors.put("complexSet", (factories, tag, config) -> { + String field = (String) config.remove("field"); + String value = (String) config.remove("value"); + + return new ConditionalProcessor(randomAlphaOfLength(10), + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, + new ConditionalProcessor(randomAlphaOfLength(10) + "-nested", + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, + new FakeProcessor("complexSet", tag, (ingestDocument) -> ingestDocument.setFieldValue(field, value)))); + }); + + IngestService ingestService = createWithProcessors(processors); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"complexSet\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + + assertTrue(ingestService.hasProcessor(id, Processor.class)); + assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class)); + assertTrue(ingestService.hasProcessor(id, FakeProcessor.class)); + assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class)); + + assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class)); + } + public void testCrud() throws Exception { IngestService ingestService = createWithProcessors(); String id = "_id"; @@ -946,7 +1036,7 @@ public void testStatName(){ assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); - when(conditionalProcessor.getProcessor()).thenReturn(processor); + when(conditionalProcessor.getInnerProcessor()).thenReturn(processor); assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); @@ -1012,42 +1102,11 @@ private static IngestService createWithProcessors() { processors.put("set", (factories, tag, config) -> { String field = (String) config.remove("field"); String value = (String) config.remove("value"); - return new Processor() { - @Override - public IngestDocument execute(IngestDocument ingestDocument) { - ingestDocument.setFieldValue(field, value); - return ingestDocument; - } - - @Override - public String getType() { - return "set"; - } - - @Override - public String getTag() { - return tag; - } - }; + return new FakeProcessor("set", tag, (ingestDocument) ->ingestDocument.setFieldValue(field, value)); }); processors.put("remove", (factories, tag, config) -> { String field = (String) config.remove("field"); - return new Processor() { - @Override - public IngestDocument execute(IngestDocument ingestDocument) { - ingestDocument.removeField(field); - return ingestDocument; - } - - @Override - public String getType() { - return "remove"; - } - - @Override - public String getTag() { - return tag; - } + return new WrappingProcessorImpl("remove", tag, (ingestDocument -> ingestDocument.removeField(field))) { }; }); return createWithProcessors(processors); diff --git a/server/src/test/java/org/elasticsearch/ingest/WrappingProcessorImpl.java b/server/src/test/java/org/elasticsearch/ingest/WrappingProcessorImpl.java new file mode 100644 index 0000000000000..db2b827c0144b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/WrappingProcessorImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.function.Consumer; + +class WrappingProcessorImpl extends FakeProcessor implements WrappingProcessor { + + WrappingProcessorImpl(String type, String tag, Consumer executor) { + super(type, tag, executor); + } + + @Override + public Processor getInnerProcessor() { + String theType = getType(); + String theTag = getTag(); + return new Processor() { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return ingestDocument; + } + + @Override + public String getType() { + return theType; + } + + @Override + public String getTag() { + return theTag; + } + }; + } +}