From 9b6657d9b918a67fd8064d1e7aeed91739f9d35d Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 24 Jul 2018 13:22:31 +0200 Subject: [PATCH 01/14] Ingest: Add conditional per processor --- .../ingest/common/ConditionalProcessor.java | 118 ++++++++++++++++++ .../ingest/common/IngestCommonPlugin.java | 1 + .../ingest/ConfigurationUtils.java | 24 +++- .../script/ProcessorConditionalScript.java | 52 ++++++++ .../elasticsearch/script/ScriptModule.java | 1 + 5 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java create mode 100644 server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java new file mode 100644 index 0000000000000..20381450f31a6 --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.script.ProcessorConditionalScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptException; +import org.elasticsearch.script.ScriptService; + +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.elasticsearch.ingest.ConfigurationUtils.readMap; + +public class ConditionalProcessor extends AbstractProcessor { + + static final String TYPE = "conditional"; + + private final Script condition; + + private final ScriptService scriptService; + + private final Processor processor; + + private ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + super(tag); + this.condition = script; + this.scriptService = scriptService; + this.processor = processor; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + if (scriptService.compile(condition, ProcessorConditionalScript.CONTEXT) + .newInstance(condition.getParams()).execute(ingestDocument.getSourceAndMetadata())) { + processor.execute(ingestDocument); + } + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + private final ScriptService scriptService; + + public Factory(ScriptService scriptService) { + this.scriptService = scriptService; + } + + @Override + public ConditionalProcessor create(Map factories, String tag, + Map config) throws Exception { + Map> processorConfig = readMap(TYPE, tag, config, "processor"); + final Script script; + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent) + .map(normalizeScript(config.get("script"))); + InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + script = Script.parse(parser); + config.remove("script"); + // verify script is able to be compiled before successfully creating processor. + try { + scriptService.compile(script, ProcessorConditionalScript.CONTEXT); + } catch (ScriptException e) { + throw newConfigurationException(TYPE, tag, null, e); + } + } + Map.Entry> entry = processorConfig.entrySet().iterator().next(); + Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); + return new ConditionalProcessor(tag, script, scriptService, processor); + } + + @SuppressWarnings("unchecked") + private static Map normalizeScript(Object scriptConfig) { + if (scriptConfig instanceof Map) { + return (Map) scriptConfig; + } else if (scriptConfig instanceof String) { + return Collections.singletonMap("source", scriptConfig); + } else { + throw newConfigurationException(TYPE, null, "script", + "property isn't a map or string, but of type [" + scriptConfig.getClass().getName() + "]"); + } + } + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index bc475a2a00539..7eea0871225ff 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -82,6 +82,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()); processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); + processors.put(ConditionalProcessor.TYPE, new ConditionalProcessor.Factory(parameters.scriptService)); return Collections.unmodifiableMap(processors); } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 78dc0ec6bfef1..ba4e9a67c7a93 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -372,11 +372,15 @@ public static Processor readProcessor(Map processorFa public static Processor readProcessor(Map processorFactories, String type, Map config) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); + Map parsedConfig = maybeExtractConditional(config, type); + if (parsedConfig != config) { + type = "conditional"; + } Processor.Factory factory = processorFactories.get(type); if (factory != null) { boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); List> onFailureProcessorConfigs = - ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); + ConfigurationUtils.readOptionalList(null, null, parsedConfig, Pipeline.ON_FAILURE_KEY); List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); @@ -386,10 +390,10 @@ public static Processor readProcessor(Map processorFa } try { - Processor processor = factory.create(processorFactories, tag, config); - if (config.isEmpty() == false) { + Processor processor = factory.create(processorFactories, tag, parsedConfig); + if (parsedConfig.isEmpty() == false) { throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", - type, Arrays.toString(config.keySet().toArray())); + type, Arrays.toString(parsedConfig.keySet().toArray())); } if (onFailureProcessors.size() > 0 || ignoreFailure) { return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); @@ -402,4 +406,16 @@ public static Processor readProcessor(Map processorFa } throw newConfigurationException(type, tag, null, "No processor type exists with name [" + type + "]"); } + + private static Map maybeExtractConditional(Map config, String type) { + if (config.containsKey("if")) { + Map rewrittenConfig = new HashMap<>(); + Map rewrittenProcessorConfig = new HashMap<>(config); + rewrittenConfig.put("script", rewrittenProcessorConfig.remove("if")); + rewrittenConfig.put("processor", Collections.singletonMap(type, rewrittenProcessorConfig)); + return rewrittenConfig; + } else { + return config; + } + } } diff --git a/server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java b/server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java new file mode 100644 index 0000000000000..bcadf8024024e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java @@ -0,0 +1,52 @@ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.script; + +import java.util.Map; + +/** + * A script used by the Ingest Script Processor. + */ +public abstract class ProcessorConditionalScript { + + public static final String[] PARAMETERS = { "ctx" }; + + /** The context used to compile {@link ProcessorConditionalScript} factories. */ + public static final ScriptContext CONTEXT = new ScriptContext<>("processor_conditional", Factory.class); + + /** The generic runtime parameters for the script. */ + private final Map params; + + public ProcessorConditionalScript(Map params) { + this.params = params; + } + + /** Return the parameters for this script. */ + public Map getParams() { + return params; + } + + public abstract boolean execute(Map ctx); + + public interface Factory { + ProcessorConditionalScript newInstance(Map params); + } +} diff --git a/server/src/main/java/org/elasticsearch/script/ScriptModule.java b/server/src/main/java/org/elasticsearch/script/ScriptModule.java index a3da1dafe48d7..df8469b92dfeb 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -50,6 +50,7 @@ public class ScriptModule { ExecutableScript.AGGS_CONTEXT, ExecutableScript.UPDATE_CONTEXT, IngestScript.CONTEXT, + ProcessorConditionalScript.CONTEXT, FilterScript.CONTEXT, SimilarityScript.CONTEXT, SimilarityWeightScript.CONTEXT, From 8f069543beac3aa639f9685c28ca1536498a1546 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 19 Aug 2018 22:22:02 +0200 Subject: [PATCH 02/14] deep copy data instead of lazy wrap --- .../org/elasticsearch/ingest/common/ConditionalProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 20381450f31a6..1cd4ebcb3c9b5 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -61,7 +61,7 @@ private ConditionalProcessor(String tag, Script script, ScriptService scriptServ @Override public void execute(IngestDocument ingestDocument) throws Exception { if (scriptService.compile(condition, ProcessorConditionalScript.CONTEXT) - .newInstance(condition.getParams()).execute(ingestDocument.getSourceAndMetadata())) { + .newInstance(condition.getParams()).execute(IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata()))) { processor.execute(ingestDocument); } } From 713f6047820a39fccfbdc75192050bdec61cfbb2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 08:03:06 +0200 Subject: [PATCH 03/14] start tests --- .../ingest/common/ConditionalProcessorTests.java | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java new file mode 100644 index 0000000000000..c189719456e0e --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java @@ -0,0 +1,7 @@ +package org.elasticsearch.ingest.common; + +import org.elasticsearch.test.ESTestCase; + +public class ConditionalProcessorTests extends ESTestCase { + +} From f8981886ab91cb4e786b1e9e86f136d55e23dfd8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 08:42:29 +0200 Subject: [PATCH 04/14] start tests --- .../common/ConditionalProcessorTests.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java index c189719456e0e..6d596fb3e9d29 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.ingest.common; import org.elasticsearch.test.ESTestCase; From 3884159914df32c13bd8936ed9721b26a27ea3cc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 11:30:03 +0200 Subject: [PATCH 05/14] add tests --- .../common/ConditionalProcessorTests.java | 55 +++++++++++++ .../test/ingest/210_conditional_processor.yml | 81 +++++++++++++++++++ .../script/MockScriptEngine.java | 8 ++ 3 files changed, 144 insertions(+) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java index 6d596fb3e9d29..fe45c3cab06d7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java @@ -19,8 +19,63 @@ package org.elasticsearch.ingest.common; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.Is.is; + public class ConditionalProcessorTests extends ESTestCase { + public void testChecksCondition() throws Exception { + String conditionalField = "field1"; + String scriptSource = "conditionalScript"; + String trueValue = "truthy"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), + Collections.singletonMap( + Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine( + Script.DEFAULT_SCRIPT_LANG, + Collections.singletonMap( + scriptSource, ctx -> trueValue.equals(ctx.get(conditionalField)) + ) + ) + ), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + Map document = new HashMap<>(); + Map setConfig = new HashMap<>(); + setConfig.put("field", "foo"); + setConfig.put("value", "bar"); + Map config = new HashMap<>(); + config.put("processor", Collections.singletonMap("set", setConfig)); + config.put("script", Collections.singletonMap("source", scriptSource)); + ConditionalProcessor processor = new ConditionalProcessor.Factory(scriptService).create( + Collections.singletonMap("set", new SetProcessor.Factory(scriptService)), + ConditionalProcessor.TYPE, config + ); + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, trueValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); + assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); + + String falseValue = "falsy"; + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue(conditionalField, falseValue); + processor.execute(ingestDocument); + assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); + assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); + } } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml new file mode 100644 index 0000000000000..532519c4ca073 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_conditional_processor.yml @@ -0,0 +1,81 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test conditional processor fulfilled condition": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "if" : "ctx.conditional_field == 'bar'", + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {bytes_source_field: "1kb", conditional_field: "bar"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.conditional_field: "bar" } + - match: { _source.bytes_target_field: 1024 } + +--- +"Test conditional processor unfulfilled condition": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "if" : "ctx.conditional_field == 'foo'", + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {bytes_source_field: "1kb", conditional_field: "bar"} + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.conditional_field: "bar" } + - is_false: _source.bytes_target_field + diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index 0d340a91d4cea..d0f527d851acc 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -96,6 +96,14 @@ public void execute(Map ctx) { } }; return context.factoryClazz.cast(factory); + } else if (context.instanceClazz.equals(ProcessorConditionalScript.class)) { + ProcessorConditionalScript.Factory factory = parameters -> new ProcessorConditionalScript(parameters) { + @Override + public boolean execute(Map ctx) { + return (boolean) script.apply(ctx); + } + }; + return context.factoryClazz.cast(factory); } else if (context.instanceClazz.equals(UpdateScript.class)) { UpdateScript.Factory factory = parameters -> new UpdateScript(parameters) { @Override From fc5d078714309f8c0106c8029400c1b5724ff0b4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 13:50:13 +0200 Subject: [PATCH 06/14] unmodifiable wrappers --- .../ingest/common/ConditionalProcessor.java | 241 +++++++++++++++++- 1 file changed, 240 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 1cd4ebcb3c9b5..0385239a048aa 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -20,8 +20,16 @@ package org.elasticsearch.ingest.common; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; import java.util.Map; +import java.util.Set; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -61,7 +69,7 @@ private ConditionalProcessor(String tag, Script script, ScriptService scriptServ @Override public void execute(IngestDocument ingestDocument) throws Exception { if (scriptService.compile(condition, ProcessorConditionalScript.CONTEXT) - .newInstance(condition.getParams()).execute(IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata()))) { + .newInstance(condition.getParams()).execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { processor.execute(ingestDocument); } } @@ -115,4 +123,235 @@ private static Map normalizeScript(Object scriptConfig) { } } } + + private static Object wrapUnmodifiable(Object raw) { + if (raw instanceof Map) { + return new UnmodifiableIngestData((Map) raw); + } else if (raw instanceof List) { + return new UnmodifiableIngestList((List) raw); + } else if (raw instanceof byte[]) { + return ((byte[]) raw).clone(); + } else if (raw instanceof Date) { + return ((Date) raw).clone(); + } + return raw; + } + + private static final class UnmodifiableIngestData implements Map { + + private final Map data; + + UnmodifiableIngestData(Map data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public boolean containsKey(final Object key) { + return data.containsKey(key); + } + + @Override + public boolean containsValue(final Object value) { + return data.containsValue(value); + } + + @Override + public Object get(final Object key) { + return wrapUnmodifiable(data.get(key)); + } + + @Override + public Object put(final String key, final Object value) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public Object remove(final Object key) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void putAll(final Map m) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public Set keySet() { + return Collections.unmodifiableSet(data.keySet()); + } + + @Override + public Collection values() { + return new UnmodifiableIngestList(new ArrayList<>(data.values())); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException("Getting EntrySet for ingest documents in conditionals is not supported"); + } + } + + private static final class UnmodifiableIngestList implements List { + + private final List data; + + UnmodifiableIngestList(List data) { + this.data = data; + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return data.contains(o); + } + + @Override + public Iterator iterator() { + Iterator wrapped = data.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return wrapped.hasNext(); + } + + @Override + public Object next() { + return wrapped.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + }; + } + + @Override + public Object[] toArray() { + Object[] wrapped = data.toArray(new Object[0]); + for (int i = 0; i < wrapped.length; i++) { + wrapped[i] = wrapUnmodifiable(wrapped[i]); + } + return wrapped; + } + + @Override + public T[] toArray(final T[] a) { + Object[] raw = data.toArray(new Object[0]); + T[] wrapped = (T[]) Arrays.copyOf(raw, a.length, a.getClass()); + for (int i = 0; i < wrapped.length; i++) { + wrapped[i] = (T) wrapUnmodifiable(wrapped[i]); + } + return wrapped; + } + + @Override + public boolean add(final Object o) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public boolean remove(final Object o) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public boolean containsAll(final Collection c) { + return data.contains(c); + } + + @Override + public boolean addAll(final Collection c) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public boolean addAll(final int index, final Collection c) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public boolean removeAll(final Collection c) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public boolean retainAll(final Collection c) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public Object get(final int index) { + return wrapUnmodifiable(data.get(index)); + } + + @Override + public Object set(final int index, final Object element) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void add(final int index, final Object element) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public Object remove(final int index) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public int indexOf(final Object o) { + return data.indexOf(o); + } + + @Override + public int lastIndexOf(final Object o) { + return data.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + return null; + } + + @Override + public ListIterator listIterator(final int index) { + return null; + } + + @Override + public List subList(final int fromIndex, final int toIndex) { + return new UnmodifiableIngestList(data.subList(fromIndex, toIndex)); + } + } } From 288899aeb7934104c071a06877527c125e678464 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 13:56:23 +0200 Subject: [PATCH 07/14] unmodifiable wrappers --- .../ingest/common/ConditionalProcessor.java | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 0385239a048aa..35a06362f966d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -341,17 +341,71 @@ public int lastIndexOf(final Object o) { @Override public ListIterator listIterator() { - return null; + return new UnmodifiableListIterator(data.listIterator()); } @Override public ListIterator listIterator(final int index) { - return null; + return new UnmodifiableListIterator(data.listIterator(index)); } @Override public List subList(final int fromIndex, final int toIndex) { return new UnmodifiableIngestList(data.subList(fromIndex, toIndex)); } + + private static final class UnmodifiableListIterator implements ListIterator { + + private final ListIterator data; + + UnmodifiableListIterator(ListIterator data) { + this.data = data; + } + + @Override + public boolean hasNext() { + return data.hasNext(); + } + + @Override + public Object next() { + return wrapUnmodifiable(data.next()); + } + + @Override + public boolean hasPrevious() { + return data.hasPrevious(); + } + + @Override + public Object previous() { + return wrapUnmodifiable(data.previous()); + } + + @Override + public int nextIndex() { + return data.nextIndex(); + } + + @Override + public int previousIndex() { + return data.previousIndex(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void set(final Object o) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + + @Override + public void add(final Object o) { + throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + } } } From fe2d826d1563bb8678d3f6f76c9b61eb20902a63 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 20 Aug 2018 14:10:00 +0200 Subject: [PATCH 08/14] unmodifiable wrappers --- .../ingest/common/ConditionalProcessor.java | 69 ++++++++++++++----- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 35a06362f966d..0daa6aabf6b47 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -30,6 +30,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -137,6 +138,10 @@ private static Object wrapUnmodifiable(Object raw) { return raw; } + private static UnsupportedOperationException unmodifiableException() { + return new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + } + private static final class UnmodifiableIngestData implements Map { private final Map data; @@ -172,22 +177,22 @@ public Object get(final Object key) { @Override public Object put(final String key, final Object value) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public Object remove(final Object key) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void putAll(final Map m) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void clear() { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override @@ -202,7 +207,33 @@ public Collection values() { @Override public Set> entrySet() { - throw new UnsupportedOperationException("Getting EntrySet for ingest documents in conditionals is not supported"); + return data.entrySet().stream().map(entry -> + new Entry() { + @Override + public String getKey() { + return entry.getKey(); + } + + @Override + public Object getValue() { + return wrapUnmodifiable(entry.getValue()); + } + + @Override + public Object setValue(final Object value) { + throw unmodifiableException(); + } + + @Override + public boolean equals(final Object o) { + return entry.equals(o); + } + + @Override + public int hashCode() { + return entry.hashCode(); + } + }).collect(Collectors.toSet()); } } @@ -245,7 +276,7 @@ public Object next() { @Override public void remove() { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } }; } @@ -271,12 +302,12 @@ public T[] toArray(final T[] a) { @Override public boolean add(final Object o) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public boolean remove(final Object o) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override @@ -286,27 +317,27 @@ public boolean containsAll(final Collection c) { @Override public boolean addAll(final Collection c) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public boolean addAll(final int index, final Collection c) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public boolean removeAll(final Collection c) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public boolean retainAll(final Collection c) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void clear() { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override @@ -316,17 +347,17 @@ public Object get(final int index) { @Override public Object set(final int index, final Object element) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void add(final int index, final Object element) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public Object remove(final int index) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override @@ -394,17 +425,17 @@ public int previousIndex() { @Override public void remove() { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void set(final Object o) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } @Override public void add(final Object o) { - throw new UnsupportedOperationException("Mutating ingest documents in conditionals is not supported"); + throw unmodifiableException(); } } } From d0041813b9254670248105936b1c0b189da7f047 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 21 Aug 2018 14:34:15 +0200 Subject: [PATCH 09/14] CR: Remove date type from immutable wrapper --- .../org/elasticsearch/ingest/common/ConditionalProcessor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 0daa6aabf6b47..194fad13c9f82 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -132,8 +131,6 @@ private static Object wrapUnmodifiable(Object raw) { return new UnmodifiableIngestList((List) raw); } else if (raw instanceof byte[]) { return ((byte[]) raw).clone(); - } else if (raw instanceof Date) { - return ((Date) raw).clone(); } return raw; } From 9d64863a9f6591e57eb3baf2a40fce56927c2586 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 Aug 2018 23:43:29 +0200 Subject: [PATCH 10/14] CR: renamings --- .../ingest/common/ConditionalProcessor.java | 11 +++++++---- ...tionalScript.java => IngestConditionalScript.java} | 8 ++++---- .../java/org/elasticsearch/script/ScriptModule.java | 2 +- .../org/elasticsearch/script/MockScriptEngine.java | 4 ++-- 4 files changed, 14 insertions(+), 11 deletions(-) rename server/src/main/java/org/elasticsearch/script/{ProcessorConditionalScript.java => IngestConditionalScript.java} (83%) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java index 194fad13c9f82..af84333bb2ce6 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java @@ -41,7 +41,7 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; -import org.elasticsearch.script.ProcessorConditionalScript; +import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptException; import org.elasticsearch.script.ScriptService; @@ -68,8 +68,9 @@ private ConditionalProcessor(String tag, Script script, ScriptService scriptServ @Override public void execute(IngestDocument ingestDocument) throws Exception { - if (scriptService.compile(condition, ProcessorConditionalScript.CONTEXT) - .newInstance(condition.getParams()).execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { processor.execute(ingestDocument); } } @@ -101,7 +102,7 @@ public ConditionalProcessor create(Map factories, Str config.remove("script"); // verify script is able to be compiled before successfully creating processor. try { - scriptService.compile(script, ProcessorConditionalScript.CONTEXT); + scriptService.compile(script, IngestConditionalScript.CONTEXT); } catch (ScriptException e) { throw newConfigurationException(TYPE, tag, null, e); } @@ -125,6 +126,8 @@ private static Map normalizeScript(Object scriptConfig) { } private static Object wrapUnmodifiable(Object raw) { + // Wraps all mutable types that the JSON parser can create by immutable wrappers. + // Any inputs not wrapped are assumed to be immutable if (raw instanceof Map) { return new UnmodifiableIngestData((Map) raw); } else if (raw instanceof List) { diff --git a/server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java similarity index 83% rename from server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java rename to server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java index bcadf8024024e..1f770c42be6fc 100644 --- a/server/src/main/java/org/elasticsearch/script/ProcessorConditionalScript.java +++ b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java @@ -25,17 +25,17 @@ /** * A script used by the Ingest Script Processor. */ -public abstract class ProcessorConditionalScript { +public abstract class IngestConditionalScript { public static final String[] PARAMETERS = { "ctx" }; - /** The context used to compile {@link ProcessorConditionalScript} factories. */ + /** The context used to compile {@link IngestConditionalScript} factories. */ public static final ScriptContext CONTEXT = new ScriptContext<>("processor_conditional", Factory.class); /** The generic runtime parameters for the script. */ private final Map params; - public ProcessorConditionalScript(Map params) { + public IngestConditionalScript(Map params) { this.params = params; } @@ -47,6 +47,6 @@ public Map getParams() { public abstract boolean execute(Map ctx); public interface Factory { - ProcessorConditionalScript newInstance(Map params); + IngestConditionalScript newInstance(Map params); } } diff --git a/server/src/main/java/org/elasticsearch/script/ScriptModule.java b/server/src/main/java/org/elasticsearch/script/ScriptModule.java index af8b206931b5c..1788d8c792bf0 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -51,7 +51,7 @@ public class ScriptModule { BucketAggregationSelectorScript.CONTEXT, SignificantTermsHeuristicScoreScript.CONTEXT, IngestScript.CONTEXT, - ProcessorConditionalScript.CONTEXT, + IngestConditionalScript.CONTEXT, FilterScript.CONTEXT, SimilarityScript.CONTEXT, SimilarityWeightScript.CONTEXT, diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index d0f527d851acc..33eb50fc28174 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -96,8 +96,8 @@ public void execute(Map ctx) { } }; return context.factoryClazz.cast(factory); - } else if (context.instanceClazz.equals(ProcessorConditionalScript.class)) { - ProcessorConditionalScript.Factory factory = parameters -> new ProcessorConditionalScript(parameters) { + } else if (context.instanceClazz.equals(IngestConditionalScript.class)) { + IngestConditionalScript.Factory factory = parameters -> new IngestConditionalScript(parameters) { @Override public boolean execute(Map ctx) { return (boolean) script.apply(ctx); From 0f88489544c32c9f6f7321f69e94526f89f0988d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Aug 2018 14:32:55 +0200 Subject: [PATCH 11/14] remove conditional processor factory --- .../ingest/common/ForEachProcessor.java | 11 ++- .../ingest/common/IngestCommonPlugin.java | 3 +- .../common/ForEachProcessorFactoryTests.java | 16 +++-- .../ingest/SimulatePipelineRequest.java | 6 +- .../ingest}/ConditionalProcessor.java | 65 +---------------- .../ingest/ConfigurationUtils.java | 72 +++++++++++++------ .../elasticsearch/ingest/IngestService.java | 16 ++++- .../org/elasticsearch/ingest/Pipeline.java | 8 ++- .../ingest}/ConditionalProcessorTests.java | 40 +++++++---- .../ingest/ConfigurationUtilsTests.java | 19 +++-- .../ingest/PipelineFactoryTests.java | 30 +++++--- 11 files changed, 153 insertions(+), 133 deletions(-) rename {modules/ingest-common/src/main/java/org/elasticsearch/ingest/common => server/src/main/java/org/elasticsearch/ingest}/ConditionalProcessor.java (76%) rename {modules/ingest-common/src/test/java/org/elasticsearch/ingest/common => server/src/test/java/org/elasticsearch/ingest}/ConditionalProcessorTests.java (75%) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index f5bf9cc959105..31c0ae8cc3dc8 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.elasticsearch.script.ScriptService; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -96,6 +97,13 @@ Processor getProcessor() { } public static final class Factory implements Processor.Factory { + + private final ScriptService scriptService; + + Factory(ScriptService scriptService) { + this.scriptService = scriptService; + } + @Override public ForEachProcessor create(Map factories, String tag, Map config) throws Exception { @@ -107,7 +115,8 @@ public ForEachProcessor create(Map factories, String throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type"); } Map.Entry> entry = entries.iterator().next(); - Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); + Processor processor = + ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); return new ForEachProcessor(tag, field, processor, ignoreMissing); } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index be282b9d264f7..8b048282814ea 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -72,7 +72,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory()); processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)); - processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory()); + processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); @@ -82,7 +82,6 @@ public Map getProcessors(Processor.Parameters paramet processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()); processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); - processors.put(ConditionalProcessor.TYPE, new ConditionalProcessor.Factory(parameters.scriptService)); processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory()); return Collections.unmodifiableMap(processors); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index f382ad8dcfb6a..7ab19c4147eab 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -30,14 +31,17 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class ForEachProcessorFactoryTests extends ESTestCase { + private final ScriptService scriptService = mock(ScriptService.class); + public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -53,7 +57,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -71,7 +75,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -84,7 +88,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -97,7 +101,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -105,7 +109,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index fecee5f265fe9..7514a41f5756b 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -171,9 +171,11 @@ static Parsed parseWithPipelineId(String pipelineId, Map config, return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, IngestService pipelineStore) throws Exception { + static Parsed parse(Map config, boolean verbose, IngestService ingestService) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); + Pipeline pipeline = Pipeline.create( + SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService() + ); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java similarity index 76% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index af84333bb2ce6..d1eb651acae03 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -30,25 +29,10 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; import org.elasticsearch.script.IngestConditionalScript; import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptException; import org.elasticsearch.script.ScriptService; -import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; -import static org.elasticsearch.ingest.ConfigurationUtils.readMap; - public class ConditionalProcessor extends AbstractProcessor { static final String TYPE = "conditional"; @@ -59,7 +43,7 @@ public class ConditionalProcessor extends AbstractProcessor { private final Processor processor; - private ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { + ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) { super(tag); this.condition = script; this.scriptService = scriptService; @@ -80,51 +64,6 @@ public String getType() { return TYPE; } - public static final class Factory implements Processor.Factory { - - private final ScriptService scriptService; - - public Factory(ScriptService scriptService) { - this.scriptService = scriptService; - } - - @Override - public ConditionalProcessor create(Map factories, String tag, - Map config) throws Exception { - Map> processorConfig = readMap(TYPE, tag, config, "processor"); - final Script script; - try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent) - .map(normalizeScript(config.get("script"))); - InputStream stream = BytesReference.bytes(builder).streamInput(); - XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, stream)) { - script = Script.parse(parser); - config.remove("script"); - // verify script is able to be compiled before successfully creating processor. - try { - scriptService.compile(script, IngestConditionalScript.CONTEXT); - } catch (ScriptException e) { - throw newConfigurationException(TYPE, tag, null, e); - } - } - Map.Entry> entry = processorConfig.entrySet().iterator().next(); - Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue()); - return new ConditionalProcessor(tag, script, scriptService, processor); - } - - @SuppressWarnings("unchecked") - private static Map normalizeScript(Object scriptConfig) { - if (scriptConfig instanceof Map) { - return (Map) scriptConfig; - } else if (scriptConfig instanceof String) { - return Collections.singletonMap("source", scriptConfig); - } else { - throw newConfigurationException(TYPE, null, "script", - "property isn't a map or string, but of type [" + scriptConfig.getClass().getName() + "]"); - } - } - } - private static Object wrapUnmodifiable(Object raw) { // Wraps all mutable types that the JSON parser can create by immutable wrappers. // Any inputs not wrapped are assumed to be immutable diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 319ab649c1e4d..ee1906e1de0f3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -19,9 +19,18 @@ package org.elasticsearch.ingest; +import java.io.IOException; +import java.io.InputStream; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; @@ -296,6 +305,7 @@ public static ElasticsearchException newConfigurationException(String processorT } public static List readProcessorConfigs(List> processorConfigs, + ScriptService scriptService, Map processorFactories) throws Exception { Exception exception = null; List processors = new ArrayList<>(); @@ -303,7 +313,7 @@ public static List readProcessorConfigs(List> pro for (Map processorConfigWithKey : processorConfigs) { for (Map.Entry entry : processorConfigWithKey.entrySet()) { try { - processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue())); + processors.add(readProcessor(processorFactories, scriptService, entry.getKey(), entry.getValue())); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } @@ -356,13 +366,14 @@ private static void addMetadataToException(ElasticsearchException exception, Str @SuppressWarnings("unchecked") public static Processor readProcessor(Map processorFactories, + ScriptService scriptService, String type, Object config) throws Exception { if (config instanceof Map) { - return readProcessor(processorFactories, type, (Map) config); + return readProcessor(processorFactories, scriptService, type, (Map) config); } else if (config instanceof String && "script".equals(type)) { Map normalizedScript = new HashMap<>(1); normalizedScript.put(ScriptType.INLINE.getParseField().getPreferredName(), config); - return readProcessor(processorFactories, type, normalizedScript); + return readProcessor(processorFactories, scriptService, type, normalizedScript); } else { throw newConfigurationException(type, null, null, "property isn't a map, but of type [" + config.getClass().getName() + "]"); @@ -370,19 +381,17 @@ public static Processor readProcessor(Map processorFa } public static Processor readProcessor(Map processorFactories, + ScriptService scriptService, String type, Map config) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); - Map parsedConfig = maybeExtractConditional(config, type); - if (parsedConfig != config) { - type = "conditional"; - } + Script conditionalScript = maybeExtractConditional(config); Processor.Factory factory = processorFactories.get(type); if (factory != null) { boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); List> onFailureProcessorConfigs = - ConfigurationUtils.readOptionalList(null, null, parsedConfig, Pipeline.ON_FAILURE_KEY); + ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { throw newConfigurationException(type, tag, Pipeline.ON_FAILURE_KEY, @@ -390,16 +399,18 @@ public static Processor readProcessor(Map processorFa } try { - Processor processor = factory.create(processorFactories, tag, parsedConfig); - if (parsedConfig.isEmpty() == false) { + Processor processor = factory.create(processorFactories, tag, config); + if (config.isEmpty() == false) { throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", - type, Arrays.toString(parsedConfig.keySet().toArray())); + type, Arrays.toString(config.keySet().toArray())); } if (onFailureProcessors.size() > 0 || ignoreFailure) { - return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); - } else { - return processor; + processor = new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors); + } + if (conditionalScript != null) { + processor = new ConditionalProcessor(tag, conditionalScript, scriptService, processor); } + return processor; } catch (Exception e) { throw newConfigurationException(type, tag, null, e); } @@ -407,15 +418,30 @@ public static Processor readProcessor(Map processorFa throw newConfigurationException(type, tag, null, "No processor type exists with name [" + type + "]"); } - private static Map maybeExtractConditional(Map config, String type) { - if (config.containsKey("if")) { - Map rewrittenConfig = new HashMap<>(); - Map rewrittenProcessorConfig = new HashMap<>(config); - rewrittenConfig.put("script", rewrittenProcessorConfig.remove("if")); - rewrittenConfig.put("processor", Collections.singletonMap(type, rewrittenProcessorConfig)); - return rewrittenConfig; + private static Script maybeExtractConditional(Map config) throws IOException { + Object scriptSource = config.remove("if"); + if (scriptSource != null) { + try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent) + .map(normalizeScript(scriptSource)); + InputStream stream = BytesReference.bytes(builder).streamInput(); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + return Script.parse(parser); + } + } else { + return null; + } + } + + @SuppressWarnings("unchecked") + private static Map normalizeScript(Object scriptConfig) { + if (scriptConfig instanceof Map) { + return (Map) scriptConfig; + } else if (scriptConfig instanceof String) { + return Collections.singletonMap("source", scriptConfig); } else { - return config; + throw newConfigurationException("conditional", null, "script", + "property isn't a map or string, but of type [" + scriptConfig.getClass().getName() + "]"); } } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index eee14e958699f..f0f5d76caaba8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -71,6 +71,7 @@ public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; private final ClusterService clusterService; + private final ScriptService scriptService; private final Map processorFactories; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -85,6 +86,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { this.clusterService = clusterService; + this.scriptService = scriptService; this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -116,6 +118,10 @@ public ClusterService getClusterService() { return clusterService; } + public ScriptService getScriptService() { + return scriptService; + } + /** * Deletes the pipeline specified by id in the request. */ @@ -300,11 +306,12 @@ void validatePipeline(Map ingestInfos, PutPipelineReq } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add( ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) @@ -452,7 +459,10 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state List exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { - pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + pipelines.put( + pipeline.getId(), + Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories, scriptService) + ); } catch (ElasticsearchParseException e) { pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); exceptions.add(e); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 37dd3f52cb7d3..0a8f9fbc0d894 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.elasticsearch.script.ScriptService; /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. @@ -52,14 +53,15 @@ public Pipeline(String id, @Nullable String description, @Nullable Integer versi } public static Pipeline create(String id, Map config, - Map processorFactories) throws Exception { + Map processorFactories, ScriptService scriptService) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories); List> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + List onFailureProcessors = + ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, scriptService, processorFactories); if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java similarity index 75% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index fe45c3cab06d7..2498ce3f6eb02 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -17,18 +17,17 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.hasKey; @@ -39,7 +38,7 @@ public class ConditionalProcessorTests extends ESTestCase { public void testChecksCondition() throws Exception { String conditionalField = "field1"; - String scriptSource = "conditionalScript"; + String scriptName = "conditionalScript"; String trueValue = "truthy"; ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap( @@ -47,23 +46,34 @@ public void testChecksCondition() throws Exception { new MockScriptEngine( Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap( - scriptSource, ctx -> trueValue.equals(ctx.get(conditionalField)) + scriptName, ctx -> trueValue.equals(ctx.get(conditionalField)) ) ) ), new HashMap<>(ScriptModule.CORE_CONTEXTS) ); Map document = new HashMap<>(); - Map setConfig = new HashMap<>(); - setConfig.put("field", "foo"); - setConfig.put("value", "bar"); - Map config = new HashMap<>(); - config.put("processor", Collections.singletonMap("set", setConfig)); - config.put("script", Collections.singletonMap("source", scriptSource)); - ConditionalProcessor processor = new ConditionalProcessor.Factory(scriptService).create( - Collections.singletonMap("set", new SetProcessor.Factory(scriptService)), - ConditionalProcessor.TYPE, config - ); + ConditionalProcessor processor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, + new Processor() { + @Override + public void execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue("foo", "bar"); + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + }); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java index 61afd9ce2a473..f3a11a86e54e5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -38,6 +39,9 @@ import static org.mockito.Mockito.mock; public class ConfigurationUtilsTests extends ESTestCase { + + private final ScriptService scriptService = mock(ScriptService.class); + private Map config; @Before @@ -120,7 +124,7 @@ public void testReadProcessors() throws Exception { config.add(Collections.singletonMap("test_processor", emptyConfig)); config.add(Collections.singletonMap("test_processor", emptyConfig)); - List result = ConfigurationUtils.readProcessorConfigs(config, registry); + List result = ConfigurationUtils.readProcessorConfigs(config, scriptService, registry); assertThat(result.size(), equalTo(2)); assertThat(result.get(0), sameInstance(processor)); assertThat(result.get(1), sameInstance(processor)); @@ -129,7 +133,7 @@ public void testReadProcessors() throws Exception { unknownTaggedConfig.put("tag", "my_unknown"); config.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig)); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessorConfigs(config, registry)); + () -> ConfigurationUtils.readProcessorConfigs(config, scriptService, registry)); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor"))); @@ -142,7 +146,10 @@ public void testReadProcessors() throws Exception { Map secondUnknonwTaggedConfig = new HashMap<>(); secondUnknonwTaggedConfig.put("tag", "my_second_unknown"); config2.add(Collections.singletonMap("second_unknown_processor", secondUnknonwTaggedConfig)); - e = expectThrows(ElasticsearchParseException.class, () -> ConfigurationUtils.readProcessorConfigs(config2, registry)); + e = expectThrows( + ElasticsearchParseException.class, + () -> ConfigurationUtils.readProcessorConfigs(config2, scriptService, registry) + ); assertThat(e.getMessage(), equalTo("No processor type exists with name [unknown_processor]")); assertThat(e.getMetadata("es.processor_tag"), equalTo(Collections.singletonList("my_unknown"))); assertThat(e.getMetadata("es.processor_type"), equalTo(Collections.singletonList("unknown_processor"))); @@ -166,17 +173,17 @@ public void testReadProcessorFromObjectOrMap() throws Exception { }); Object emptyConfig = Collections.emptyMap(); - Processor processor1 = ConfigurationUtils.readProcessor(registry, "script", emptyConfig); + Processor processor1 = ConfigurationUtils.readProcessor(registry, scriptService, "script", emptyConfig); assertThat(processor1, sameInstance(processor)); Object inlineScript = "test_script"; - Processor processor2 = ConfigurationUtils.readProcessor(registry, "script", inlineScript); + Processor processor2 = ConfigurationUtils.readProcessor(registry, scriptService, "script", inlineScript); assertThat(processor2, sameInstance(processor)); Object invalidConfig = 12L; ElasticsearchParseException ex = expectThrows(ElasticsearchParseException.class, - () -> ConfigurationUtils.readProcessor(registry, "unknown_processor", invalidConfig)); + () -> ConfigurationUtils.readProcessor(registry, scriptService, "unknown_processor", invalidConfig)); assertThat(ex.getMessage(), equalTo("property isn't a map, but of type [" + invalidConfig.getClass().getName() + "]")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index cafdbcfb44690..d6d7b4ffa816b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -32,11 +33,13 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class PipelineFactoryTests extends ESTestCase { private final Integer version = randomBoolean() ? randomInt() : null; private final String versionString = version != null ? Integer.toString(version) : null; + private final ScriptService scriptService = mock(ScriptService.class); public void testCreate() throws Exception { Map processorConfig0 = new HashMap<>(); @@ -48,7 +51,7 @@ public void testCreate() throws Exception { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -64,7 +67,7 @@ public void testCreateWithNoProcessorsField() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); try { - Pipeline.create("_id", pipelineConfig, Collections.emptyMap()); + Pipeline.create("_id", pipelineConfig, Collections.emptyMap(), scriptService); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -76,7 +79,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -91,7 +94,7 @@ public void testCreateWithPipelineOnFailure() throws Exception { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -109,7 +112,10 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -121,7 +127,10 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -136,7 +145,7 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -156,7 +165,10 @@ public void testCreateUnusedProcessorOptions() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows( + ElasticsearchParseException.class, + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService) + ); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -169,7 +181,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); From 881d6c40cc8657c3b490558d7729e7075c9a55f9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Aug 2018 14:50:16 +0200 Subject: [PATCH 12/14] Add some tests for immutability --- .../ingest/ConditionalProcessorTests.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index 2498ce3f6eb02..2cb13af7a2808 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -19,9 +19,13 @@ package org.elasticsearch.ingest; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; @@ -31,6 +35,7 @@ import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.core.Is.is; @@ -88,4 +93,49 @@ public String getTag() { assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); } + + @SuppressWarnings("unchecked") + public void testActsOnImmutableData() throws Exception { + assertMutatingCtxThrows(ctx -> ctx.remove("foo")); + assertMutatingCtxThrows(ctx -> ctx.put("foo", "bar")); + assertMutatingCtxThrows(ctx -> ((List)ctx.get("listField")).add("bar")); + assertMutatingCtxThrows(ctx -> ((List)ctx.get("listField")).remove("bar")); + } + + private static void assertMutatingCtxThrows(Consumer> mutation) throws Exception { + String scriptName = "conditionalScript"; + CompletableFuture expectedException = new CompletableFuture<>(); + ScriptService scriptService = new ScriptService(Settings.builder().build(), + Collections.singletonMap( + Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine( + Script.DEFAULT_SCRIPT_LANG, + Collections.singletonMap( + scriptName, ctx -> { + try { + mutation.accept(ctx); + } catch (Exception e) { + expectedException.complete(e); + } + return false; + } + ) + ) + ), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + Map document = new HashMap<>(); + ConditionalProcessor processor = new ConditionalProcessor( + randomAlphaOfLength(10), + new Script( + ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, + scriptName, Collections.emptyMap()), scriptService, null + ); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + ingestDocument.setFieldValue("listField", new ArrayList<>()); + processor.execute(ingestDocument); + Exception e = expectedException.get(); + assertThat(e, instanceOf(UnsupportedOperationException.class)); + assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); + } } From e2f7936c4cfac40a82a34ce835e4ae872e67882a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Aug 2018 14:56:50 +0200 Subject: [PATCH 13/14] fix javadoc --- .../java/org/elasticsearch/script/IngestConditionalScript.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java index 1f770c42be6fc..27ce29b95dc50 100644 --- a/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java +++ b/server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java @@ -1,4 +1,3 @@ - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -23,7 +22,7 @@ import java.util.Map; /** - * A script used by the Ingest Script Processor. + * A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}. */ public abstract class IngestConditionalScript { From 69100ba8086d773eb8fb5cd858a5e9eb7f8d251c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Aug 2018 22:15:05 +0200 Subject: [PATCH 14/14] CR: Rename + unwrap else --- .../java/org/elasticsearch/ingest/ConfigurationUtils.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index ee1906e1de0f3..d4f27f47eb8f2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -384,7 +384,7 @@ public static Processor readProcessor(Map processorFa ScriptService scriptService, String type, Map config) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); - Script conditionalScript = maybeExtractConditional(config); + Script conditionalScript = extractConditional(config); Processor.Factory factory = processorFactories.get(type); if (factory != null) { boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); @@ -418,7 +418,7 @@ public static Processor readProcessor(Map processorFa throw newConfigurationException(type, tag, null, "No processor type exists with name [" + type + "]"); } - private static Script maybeExtractConditional(Map config) throws IOException { + private static Script extractConditional(Map config) throws IOException { Object scriptSource = config.remove("if"); if (scriptSource != null) { try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent) @@ -428,9 +428,8 @@ private static Script maybeExtractConditional(Map config) throws LoggingDeprecationHandler.INSTANCE, stream)) { return Script.parse(parser); } - } else { - return null; } + return null; } @SuppressWarnings("unchecked")