Skip to content

Commit 0ba1855

Browse files
INGEST: Tests for Drop Processor (#33430)
* INGEST: Tests for Drop Processor * UT for behavior of dropped callback and drop processor * Moved drop processor to `server` project to enable this test * Simple IT * Relates #32278
1 parent 5840be6 commit 0ba1855

File tree

4 files changed

+100
-4
lines changed

4 files changed

+100
-4
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.unit.TimeValue;
3232
import org.elasticsearch.grok.Grok;
3333
import org.elasticsearch.grok.ThreadWatchdog;
34+
import org.elasticsearch.ingest.DropProcessor;
3435
import org.elasticsearch.ingest.PipelineProcessor;
3536
import org.elasticsearch.ingest.Processor;
3637
import org.elasticsearch.plugins.ActionPlugin;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test Drop Processor":
10+
- do:
11+
ingest.put_pipeline:
12+
id: "my_pipeline"
13+
body: >
14+
{
15+
"description" : "pipeline with drop",
16+
"processors" : [
17+
{
18+
"drop" : {
19+
"if": "ctx.foo == 'bar'"
20+
}
21+
}
22+
]
23+
}
24+
- match: { acknowledged: true }
25+
26+
- do:
27+
index:
28+
index: test
29+
type: test
30+
id: 1
31+
pipeline: "my_pipeline"
32+
body: {
33+
foo: "bar"
34+
}
35+
36+
- do:
37+
index:
38+
index: test
39+
type: test
40+
id: 2
41+
pipeline: "my_pipeline"
42+
body: {
43+
foo: "blub"
44+
}
45+
46+
- do:
47+
catch: missing
48+
get:
49+
index: test
50+
type: test
51+
id: 1
52+
- match: { found: false }
53+
54+
- do:
55+
get:
56+
index: test
57+
type: test
58+
id: 2
59+
- match: { _source.foo: "blub" }
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.ingest.common;
20+
package org.elasticsearch.ingest;
2121

2222
import java.util.Map;
23-
import org.elasticsearch.ingest.AbstractProcessor;
24-
import org.elasticsearch.ingest.IngestDocument;
25-
import org.elasticsearch.ingest.Processor;
2623

2724
/**
2825
* Drop processor only returns {@code null} for the execution result to indicate that any document

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,45 @@ public void testUpdatingStatsWhenRemovingPipelineWorks() {
812812
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
813813
}
814814

815+
public void testExecuteWithDrop() {
816+
Map<String, Processor.Factory> factories = new HashMap<>();
817+
factories.put("drop", new DropProcessor.Factory());
818+
factories.put("mock", (processorFactories, tag, config) -> new Processor() {
819+
@Override
820+
public IngestDocument execute(final IngestDocument ingestDocument) {
821+
throw new AssertionError("Document should have been dropped but reached this processor");
822+
}
823+
824+
@Override
825+
public String getType() {
826+
return null;
827+
}
828+
829+
@Override
830+
public String getTag() {
831+
return null;
832+
}
833+
});
834+
IngestService ingestService = createWithProcessors(factories);
835+
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
836+
new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
837+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
838+
ClusterState previousClusterState = clusterState;
839+
clusterState = IngestService.innerPut(putRequest, clusterState);
840+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
841+
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
842+
@SuppressWarnings("unchecked")
843+
final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
844+
@SuppressWarnings("unchecked")
845+
final Consumer<Exception> completionHandler = mock(Consumer.class);
846+
@SuppressWarnings("unchecked")
847+
final Consumer<IndexRequest> dropHandler = mock(Consumer.class);
848+
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
849+
verify(failureHandler, never()).accept(any(), any());
850+
verify(completionHandler, times(1)).accept(null);
851+
verify(dropHandler, times(1)).accept(indexRequest);
852+
}
853+
815854
private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
816855
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
817856
}

0 commit comments

Comments
 (0)