Skip to content

Commit c53b2ee

Browse files
authored
introduce KV Processor in Ingest Node (#22272)
Now you can parse field values of the `key=value` variety and have `key` be inserted as a field name in an ingest document. Closes #22222.
1 parent 5a90d9d commit c53b2ee

File tree

7 files changed

+411
-9
lines changed

7 files changed

+411
-9
lines changed

docs/reference/ingest/ingest-node.asciidoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,6 +1515,38 @@ Converts a JSON string into a structured JSON object.
15151515
}
15161516
--------------------------------------------------
15171517

1518+
[[kv-processor]]
1519+
=== KV Processor
1520+
This processor helps automatically parse messages (or specific event fields) which are of the foo=bar variety.
1521+
1522+
For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring:
1523+
1524+
1525+
[source,js]
1526+
--------------------------------------------------
1527+
{
1528+
"kv": {
1529+
"field": "message",
1530+
"field_split": " ",
1531+
"value_split": "="
1532+
}
1533+
}
1534+
--------------------------------------------------
1535+
1536+
[[kv-options]]
1537+
.Kv Options
1538+
[options="header"]
1539+
|======
1540+
| Name | Required | Default | Description
1541+
| `field` | yes | - | The field to be parsed
1542+
| `field_split` | yes | - | Regex pattern to use for splitting key-value pairs
1543+
| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair
1544+
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document
1545+
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
1546+
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
1547+
|======
1548+
1549+
15181550
[[lowercase-processor]]
15191551
=== Lowercase Processor
15201552
Converts a string to its lowercase equivalent.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6363
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
6464
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
6565
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
66+
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
6667
return Collections.unmodifiableMap(processors);
6768
}
6869

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import org.elasticsearch.ingest.AbstractProcessor;
23+
import org.elasticsearch.ingest.ConfigurationUtils;
24+
import org.elasticsearch.ingest.IngestDocument;
25+
import org.elasticsearch.ingest.Processor;
26+
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
/**
33+
* The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
34+
*/
35+
public final class KeyValueProcessor extends AbstractProcessor {
36+
37+
public static final String TYPE = "kv";
38+
39+
private final String field;
40+
private final String fieldSplit;
41+
private final String valueSplit;
42+
private final List<String> includeKeys;
43+
private final String targetField;
44+
private final boolean ignoreMissing;
45+
46+
KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List<String> includeKeys,
47+
String targetField, boolean ignoreMissing) {
48+
super(tag);
49+
this.field = field;
50+
this.targetField = targetField;
51+
this.fieldSplit = fieldSplit;
52+
this.valueSplit = valueSplit;
53+
this.includeKeys = includeKeys;
54+
this.ignoreMissing = ignoreMissing;
55+
}
56+
57+
String getField() {
58+
return field;
59+
}
60+
61+
String getFieldSplit() {
62+
return fieldSplit;
63+
}
64+
65+
String getValueSplit() {
66+
return valueSplit;
67+
}
68+
69+
List<String> getIncludeKeys() {
70+
return includeKeys;
71+
}
72+
73+
String getTargetField() {
74+
return targetField;
75+
}
76+
77+
boolean isIgnoreMissing() {
78+
return ignoreMissing;
79+
}
80+
81+
public void append(IngestDocument document, String targetField, String value) {
82+
if (document.hasField(targetField)) {
83+
document.appendFieldValue(targetField, value);
84+
} else {
85+
document.setFieldValue(targetField, value);
86+
}
87+
}
88+
89+
@Override
90+
public void execute(IngestDocument document) {
91+
String oldVal = document.getFieldValue(field, String.class, ignoreMissing);
92+
93+
if (oldVal == null && ignoreMissing) {
94+
return;
95+
} else if (oldVal == null) {
96+
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
97+
}
98+
99+
String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
100+
Arrays.stream(oldVal.split(fieldSplit))
101+
.map((f) -> f.split(valueSplit, 2))
102+
.filter((p) -> includeKeys == null || includeKeys.contains(p[0]))
103+
.forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
104+
}
105+
106+
@Override
107+
public String getType() {
108+
return TYPE;
109+
}
110+
111+
public static class Factory implements Processor.Factory {
112+
@Override
113+
public KeyValueProcessor create(Map<String, Processor.Factory> registry, String processorTag,
114+
Map<String, Object> config) throws Exception {
115+
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
116+
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
117+
String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
118+
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
119+
List<String> includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
120+
if (includeKeys != null) {
121+
includeKeys = Collections.unmodifiableList(includeKeys);
122+
}
123+
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
124+
return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing);
125+
}
126+
}
127+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchParseException;
24+
import org.elasticsearch.test.ESTestCase;
25+
26+
import java.util.Arrays;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
import static org.hamcrest.CoreMatchers.equalTo;
31+
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.Matchers.nullValue;
33+
34+
public class KeyValueProcessorFactoryTests extends ESTestCase {
35+
36+
public void testCreateWithDefaults() throws Exception {
37+
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
38+
Map<String, Object> config = new HashMap<>();
39+
config.put("field", "field1");
40+
config.put("field_split", "&");
41+
config.put("value_split", "=");
42+
String processorTag = randomAsciiOfLength(10);
43+
KeyValueProcessor processor = factory.create(null, processorTag, config);
44+
assertThat(processor.getTag(), equalTo(processorTag));
45+
assertThat(processor.getField(), equalTo("field1"));
46+
assertThat(processor.getFieldSplit(), equalTo("&"));
47+
assertThat(processor.getValueSplit(), equalTo("="));
48+
assertThat(processor.getIncludeKeys(), is(nullValue()));
49+
assertThat(processor.getTargetField(), is(nullValue()));
50+
assertFalse(processor.isIgnoreMissing());
51+
}
52+
53+
public void testCreateWithAllFieldsSet() throws Exception {
54+
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
55+
Map<String, Object> config = new HashMap<>();
56+
config.put("field", "field1");
57+
config.put("field_split", "&");
58+
config.put("value_split", "=");
59+
config.put("target_field", "target");
60+
config.put("include_keys", Arrays.asList("a", "b"));
61+
config.put("ignore_missing", true);
62+
String processorTag = randomAsciiOfLength(10);
63+
KeyValueProcessor processor = factory.create(null, processorTag, config);
64+
assertThat(processor.getTag(), equalTo(processorTag));
65+
assertThat(processor.getField(), equalTo("field1"));
66+
assertThat(processor.getFieldSplit(), equalTo("&"));
67+
assertThat(processor.getValueSplit(), equalTo("="));
68+
assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b")));
69+
assertThat(processor.getTargetField(), equalTo("target"));
70+
assertTrue(processor.isIgnoreMissing());
71+
}
72+
73+
public void testCreateWithMissingField() {
74+
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
75+
Map<String, Object> config = new HashMap<>();
76+
String processorTag = randomAsciiOfLength(10);
77+
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
78+
() -> factory.create(null, processorTag, config));
79+
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
80+
}
81+
82+
public void testCreateWithMissingFieldSplit() {
83+
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
84+
Map<String, Object> config = new HashMap<>();
85+
config.put("field", "field1");
86+
String processorTag = randomAsciiOfLength(10);
87+
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
88+
() -> factory.create(null, processorTag, config));
89+
assertThat(exception.getMessage(), equalTo("[field_split] required property is missing"));
90+
}
91+
92+
public void testCreateWithMissingValueSplit() {
93+
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
94+
Map<String, Object> config = new HashMap<>();
95+
config.put("field", "field1");
96+
config.put("field_split", "&");
97+
String processorTag = randomAsciiOfLength(10);
98+
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
99+
() -> factory.create(null, processorTag, config));
100+
assertThat(exception.getMessage(), equalTo("[value_split] required property is missing"));
101+
}
102+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import org.elasticsearch.ingest.IngestDocument;
23+
import org.elasticsearch.ingest.Processor;
24+
import org.elasticsearch.ingest.RandomDocumentPicks;
25+
import org.elasticsearch.test.ESTestCase;
26+
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.List;
30+
31+
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
public class KeyValueProcessorTests extends ESTestCase {
35+
36+
public void test() throws Exception {
37+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
38+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
39+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=", null, "target", false);
40+
processor.execute(ingestDocument);
41+
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
42+
assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
43+
}
44+
45+
public void testRootTarget() throws Exception {
46+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
47+
ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
48+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "myField", "&", "=", null, null, false);
49+
processor.execute(ingestDocument);
50+
assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
51+
assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
52+
}
53+
54+
public void testKeySameAsSourceField() throws Exception {
55+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
56+
ingestDocument.setFieldValue("first", "first=hello");
57+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "first", "&", "=", null, null, false);
58+
processor.execute(ingestDocument);
59+
assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
60+
}
61+
62+
public void testIncludeKeys() throws Exception {
63+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
64+
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
65+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=",
66+
Collections.singletonList("first"), "target", false);
67+
processor.execute(ingestDocument);
68+
assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
69+
assertFalse(ingestDocument.hasField("target.second"));
70+
}
71+
72+
public void testMissingField() {
73+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
74+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "&", "=", null, "target", false);
75+
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
76+
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
77+
}
78+
79+
public void testNullValueWithIgnoreMissing() throws Exception {
80+
String fieldName = RandomDocumentPicks.randomFieldName(random());
81+
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
82+
Collections.singletonMap(fieldName, null));
83+
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
84+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "", "", null, "target", true);
85+
processor.execute(ingestDocument);
86+
assertIngestDocument(originalIngestDocument, ingestDocument);
87+
}
88+
89+
public void testNonExistentWithIgnoreMissing() throws Exception {
90+
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
91+
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
92+
Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "", "", null, "target", true);
93+
processor.execute(ingestDocument);
94+
assertIngestDocument(originalIngestDocument, ingestDocument);
95+
}
96+
}

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
- match: { nodes.$master.ingest.processors.8.type: gsub }
2121
- match: { nodes.$master.ingest.processors.9.type: join }
2222
- match: { nodes.$master.ingest.processors.10.type: json }
23-
- match: { nodes.$master.ingest.processors.11.type: lowercase }
24-
- match: { nodes.$master.ingest.processors.12.type: remove }
25-
- match: { nodes.$master.ingest.processors.13.type: rename }
26-
- match: { nodes.$master.ingest.processors.14.type: script }
27-
- match: { nodes.$master.ingest.processors.15.type: set }
28-
- match: { nodes.$master.ingest.processors.16.type: sort }
29-
- match: { nodes.$master.ingest.processors.17.type: split }
30-
- match: { nodes.$master.ingest.processors.18.type: trim }
31-
- match: { nodes.$master.ingest.processors.19.type: uppercase }
23+
- match: { nodes.$master.ingest.processors.11.type: kv }
24+
- match: { nodes.$master.ingest.processors.12.type: lowercase }
25+
- match: { nodes.$master.ingest.processors.13.type: remove }
26+
- match: { nodes.$master.ingest.processors.14.type: rename }
27+
- match: { nodes.$master.ingest.processors.15.type: script }
28+
- match: { nodes.$master.ingest.processors.16.type: set }
29+
- match: { nodes.$master.ingest.processors.17.type: sort }
30+
- match: { nodes.$master.ingest.processors.18.type: split }
31+
- match: { nodes.$master.ingest.processors.19.type: trim }
32+
- match: { nodes.$master.ingest.processors.20.type: uppercase }

0 commit comments

Comments
 (0)