Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ String getTargetField() {
}

@Override
public final void execute(IngestDocument document) {
public final IngestDocument execute(IngestDocument document) {
String val = document.getFieldValue(field, String.class, ignoreMissing);

if (val == null && ignoreMissing) {
return;
return document;
} else if (val == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}

document.setFieldValue(targetField, process(val));
return document;
}

protected abstract T process(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public ValueSource getValue() {
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.appendFieldValue(field, value);
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ boolean isIgnoreMissing() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing);
Object newValue;

if (oldValue == null && ignoreMissing) {
return;
return document;
} else if (oldValue == null) {
throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]");
}
Expand All @@ -194,6 +194,7 @@ public void execute(IngestDocument document) {
newValue = convertType.convert(oldValue);
}
document.setFieldValue(targetField, newValue);
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
// Date can be specified as a string or long:
Object obj = ingestDocument.getFieldValue(field, Object.class);
String date = null;
Expand Down Expand Up @@ -101,6 +101,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
.append('>');
String dynamicIndexName = builder.toString();
ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName);
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private Locale newLocale(Map<String, Object> params) {
}

@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
Object obj = ingestDocument.getFieldValue(field, Object.class);
String value = null;
if (obj != null) {
Expand All @@ -98,6 +98,7 @@ public void execute(IngestDocument ingestDocument) {
}

ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor {
}

@Override
public void execute(IngestDocument ingestDocument) {
public IngestDocument execute(IngestDocument ingestDocument) {
String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (input == null && ignoreMissing) {
return;
return ingestDocument;
} else if (input == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
dissectParser.parse(input).forEach(ingestDocument::setFieldValue);
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor {

@Override
@SuppressWarnings("unchecked")
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String path;
Map<String, Object> map;
if (this.path != null) {
Expand Down Expand Up @@ -75,6 +75,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
Object value = map.remove(field);
ingestDocument.setFieldValue(path, value);
}
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

/**
* Drop processor only returns {@code null} for the execution result to indicate that any document
* executed by it should not be indexed.
*/
public final class DropProcessor extends AbstractProcessor {

public static final String TYPE = "drop";

private DropProcessor(final String tag) {
super(tag);
}

@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
return null;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

@Override
public Processor create(final Map<String, Processor.Factory> processorFactories, final String tag,
final Map<String, Object> config) {
return new DropProcessor(tag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public TemplateScript.Factory getMessage() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
throw new FailProcessorException(document.renderTemplate(message));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,29 @@ boolean isIgnoreMissing() {
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
if (ignoreMissing) {
return;
return ingestDocument;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
}
List<Object> newValues = new ArrayList<>(values.size());
IngestDocument document = ingestDocument;
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
processor.execute(ingestDocument);
document = processor.execute(document);
if (document == null) {
return null;
}
} finally {
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
}
}
ingestDocument.setFieldValue(field, newValues);
document.setFieldValue(field, newValues);
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public final class GrokProcessor extends AbstractProcessor {
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String fieldValue = ingestDocument.getFieldValue(matchField, String.class, ignoreMissing);

if (fieldValue == null && ignoreMissing) {
return;
return ingestDocument;
} else if (fieldValue == null) {
throw new IllegalArgumentException("field [" + matchField + "] is null, cannot process it.");
}
Expand All @@ -81,6 +81,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
ingestDocument.setFieldValue(PATTERN_MATCH_KEY, "0");
}
}
return ingestDocument;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
processors.put(DropProcessor.TYPE, new DropProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ String getTargetField() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
List<?> list = document.getFieldValue(field, List.class);
if (list == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot join.");
Expand All @@ -69,6 +69,7 @@ public void execute(IngestDocument document) {
.map(Object::toString)
.collect(Collectors.joining(separator));
document.setFieldValue(targetField, joined);
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
}

@Override
public void execute(IngestDocument document) throws Exception {
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
}
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ private static void append(IngestDocument document, String targetField, String v
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
execution.accept(document);
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline == null) {
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
}
ingestDocument.executePipeline(pipeline);
return ingestDocument.executePipeline(pipeline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ public static void json(Map<String, Object> ctx, String field) {
public static String urlDecode(String value) {
return URLDecodeProcessor.apply(value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public List<TemplateScript.Factory> getFields() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
if (ignoreMissing) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
Expand All @@ -63,6 +63,7 @@ public void execute(IngestDocument document) {
} else {
fields.forEach(document::removeField);
}
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ boolean isIgnoreMissing() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
String path = document.renderTemplate(field);
if (document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
return document;
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
Expand All @@ -86,6 +86,7 @@ public void execute(IngestDocument document) {
document.setFieldValue(path, value);
throw e;
}
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ public final class ScriptProcessor extends AbstractProcessor {
* @param document The Ingest document passed into the script context under the "ctx" object.
*/
@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT);
factory.newInstance(script.getParams()).execute(document.getSourceAndMetadata());
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ public ValueSource getValue() {
}

@Override
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value);
}
return document;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ String getTargetField() {

@Override
@SuppressWarnings("unchecked")
public void execute(IngestDocument document) {
public IngestDocument execute(IngestDocument document) {
List<? extends Comparable<Object>> list = document.getFieldValue(field, List.class);

if (list == null) {
Expand All @@ -110,6 +110,7 @@ public void execute(IngestDocument document) {
}

document.setFieldValue(targetField, copy);
return document;
}

@Override
Expand Down
Loading