Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9b6657d
Ingest: Add conditional per processor
original-brownbear Jul 24, 2018
e98feb8
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 19, 2018
8f06954
deep copy data instead of lazy wrap
original-brownbear Aug 19, 2018
713f604
start tests
original-brownbear Aug 20, 2018
8090a40
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 20, 2018
f898188
start tests
original-brownbear Aug 20, 2018
3884159
add tests
original-brownbear Aug 20, 2018
fc5d078
unmodifiable wrappers
original-brownbear Aug 20, 2018
288899a
unmodifiable wrappers
original-brownbear Aug 20, 2018
fe2d826
unmodifiable wrappers
original-brownbear Aug 20, 2018
8eb6424
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 21, 2018
d004181
CR: Remove date type from immutable wrapper
original-brownbear Aug 21, 2018
8d77313
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 21, 2018
2b328ff
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 27, 2018
9d64863
CR: renamings
original-brownbear Aug 27, 2018
51deb46
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 28, 2018
b63e03c
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 28, 2018
8618ac0
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 29, 2018
c22ea2e
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 29, 2018
0f88489
remove conditional processor factory
original-brownbear Aug 29, 2018
881d6c4
Add some tests for immutability
original-brownbear Aug 29, 2018
e2f7936
fix javadoc
original-brownbear Aug 29, 2018
cda2bad
Merge remote-tracking branch 'elastic/master' into 21248
original-brownbear Aug 29, 2018
69100ba
CR: Rename + unwrap else
original-brownbear Aug 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.script.ScriptService;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand Down Expand Up @@ -96,6 +97,13 @@ Processor getProcessor() {
}

public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;

Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
Expand All @@ -107,7 +115,8 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService));
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService));
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

Expand All @@ -30,14 +31,17 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class ForEachProcessorFactoryTests extends ESTestCase {

private final ScriptService scriptService = mock(ScriptService.class);

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -53,7 +57,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -71,7 +75,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -84,7 +88,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -97,15 +101,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test conditional processor fulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'bar'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- match: { _source.bytes_target_field: 1024 }

---
"Test conditional processor unfulfilled condition":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"if" : "ctx.conditional_field == 'foo'",
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {bytes_source_field: "1kb", conditional_field: "bar"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- is_false: _source.bytes_target_field

Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, IngestService pipelineStore) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService()
);
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
Expand Down
Loading