Skip to content

Commit 4677409

Browse files
INGEST: Implement Drop Processor (#32278)
* INGEST: Implement Drop Processor * Adjust Processor API * Implement Drop Processor * Closes #23726
1 parent a296829 commit 4677409

File tree

43 files changed

+235
-115
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+235
-115
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@ String getTargetField() {
5757
}
5858

5959
@Override
60-
public final void execute(IngestDocument document) {
60+
public final IngestDocument execute(IngestDocument document) {
6161
String val = document.getFieldValue(field, String.class, ignoreMissing);
6262

6363
if (val == null && ignoreMissing) {
64-
return;
64+
return document;
6565
} else if (val == null) {
6666
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
6767
}
6868

6969
document.setFieldValue(targetField, process(val));
70+
return document;
7071
}
7172

7273
protected abstract T process(String value);

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ public ValueSource getValue() {
5656
}
5757

5858
@Override
59-
public void execute(IngestDocument ingestDocument) throws Exception {
59+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6060
ingestDocument.appendFieldValue(field, value);
61+
return ingestDocument;
6162
}
6263

6364
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ boolean isIgnoreMissing() {
173173
}
174174

175175
@Override
176-
public void execute(IngestDocument document) {
176+
public IngestDocument execute(IngestDocument document) {
177177
Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing);
178178
Object newValue;
179179

180180
if (oldValue == null && ignoreMissing) {
181-
return;
181+
return document;
182182
} else if (oldValue == null) {
183183
throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]");
184184
}
@@ -194,6 +194,7 @@ public void execute(IngestDocument document) {
194194
newValue = convertType.convert(oldValue);
195195
}
196196
document.setFieldValue(targetField, newValue);
197+
return document;
197198
}
198199

199200
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
6363
}
6464

6565
@Override
66-
public void execute(IngestDocument ingestDocument) throws Exception {
66+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6767
// Date can be specified as a string or long:
6868
Object obj = ingestDocument.getFieldValue(field, Object.class);
6969
String date = null;
@@ -101,6 +101,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
101101
.append('>');
102102
String dynamicIndexName = builder.toString();
103103
ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName);
104+
return ingestDocument;
104105
}
105106

106107
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private Locale newLocale(Map<String, Object> params) {
7474
}
7575

7676
@Override
77-
public void execute(IngestDocument ingestDocument) {
77+
public IngestDocument execute(IngestDocument ingestDocument) {
7878
Object obj = ingestDocument.getFieldValue(field, Object.class);
7979
String value = null;
8080
if (obj != null) {
@@ -98,6 +98,7 @@ public void execute(IngestDocument ingestDocument) {
9898
}
9999

100100
ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
101+
return ingestDocument;
101102
}
102103

103104
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor {
4747
}
4848

4949
@Override
50-
public void execute(IngestDocument ingestDocument) {
50+
public IngestDocument execute(IngestDocument ingestDocument) {
5151
String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
5252
if (input == null && ignoreMissing) {
53-
return;
53+
return ingestDocument;
5454
} else if (input == null) {
5555
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
5656
}
5757
dissectParser.parse(input).forEach(ingestDocument::setFieldValue);
58+
return ingestDocument;
5859
}
5960

6061
@Override

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor {
4141

4242
@Override
4343
@SuppressWarnings("unchecked")
44-
public void execute(IngestDocument ingestDocument) throws Exception {
44+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
4545
String path;
4646
Map<String, Object> map;
4747
if (this.path != null) {
@@ -75,6 +75,7 @@ public void execute(IngestDocument ingestDocument) throws Exception {
7575
Object value = map.remove(field);
7676
ingestDocument.setFieldValue(path, value);
7777
}
78+
return ingestDocument;
7879
}
7980

8081
@Override
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import java.util.Map;
23+
import org.elasticsearch.ingest.AbstractProcessor;
24+
import org.elasticsearch.ingest.IngestDocument;
25+
import org.elasticsearch.ingest.Processor;
26+
27+
/**
28+
* Drop processor only returns {@code null} for the execution result to indicate that any document
29+
* executed by it should not be indexed.
30+
*/
31+
public final class DropProcessor extends AbstractProcessor {
32+
33+
public static final String TYPE = "drop";
34+
35+
private DropProcessor(final String tag) {
36+
super(tag);
37+
}
38+
39+
@Override
40+
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
41+
return null;
42+
}
43+
44+
@Override
45+
public String getType() {
46+
return TYPE;
47+
}
48+
49+
public static final class Factory implements Processor.Factory {
50+
51+
@Override
52+
public Processor create(final Map<String, Processor.Factory> processorFactories, final String tag,
53+
final Map<String, Object> config) {
54+
return new DropProcessor(tag);
55+
}
56+
}
57+
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public TemplateScript.Factory getMessage() {
4848
}
4949

5050
@Override
51-
public void execute(IngestDocument document) {
51+
public IngestDocument execute(IngestDocument document) {
5252
throw new FailProcessorException(document.renderTemplate(message));
5353
}
5454

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,29 @@ boolean isIgnoreMissing() {
6363
}
6464

6565
@Override
66-
public void execute(IngestDocument ingestDocument) throws Exception {
66+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6767
List<?> values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
6868
if (values == null) {
6969
if (ignoreMissing) {
70-
return;
70+
return ingestDocument;
7171
}
7272
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
7373
}
7474
List<Object> newValues = new ArrayList<>(values.size());
75+
IngestDocument document = ingestDocument;
7576
for (Object value : values) {
7677
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
7778
try {
78-
processor.execute(ingestDocument);
79+
document = processor.execute(document);
80+
if (document == null) {
81+
return null;
82+
}
7983
} finally {
8084
newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue));
8185
}
8286
}
83-
ingestDocument.setFieldValue(field, newValues);
87+
document.setFieldValue(field, newValues);
88+
return document;
8489
}
8590

8691
@Override

0 commit comments

Comments
 (0)