Skip to content

Commit be31cc6

Browse files
INGEST: Enable default pipelines (#32286)
* INGEST: Enable default pipelines * Add `default_pipeline` index setting * `_none` is interpreted as no pipeline * closes #21101
1 parent db6e8c7 commit be31cc6

File tree

11 files changed

+200
-23
lines changed

11 files changed

+200
-23
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test index with default pipeline":
10+
- do:
11+
ingest.put_pipeline:
12+
id: "my_pipeline"
13+
body: >
14+
{
15+
"description": "_description",
16+
"processors": [
17+
{
18+
"bytes" : {
19+
"field" : "bytes_source_field",
20+
"target_field" : "bytes_target_field"
21+
}
22+
}
23+
]
24+
}
25+
- match: { acknowledged: true }
26+
27+
- do:
28+
indices.create:
29+
index: test
30+
body:
31+
settings:
32+
index:
33+
default_pipeline: "my_pipeline"
34+
35+
- do:
36+
index:
37+
index: test
38+
type: test
39+
id: 1
40+
body: {bytes_source_field: "1kb"}
41+
42+
- do:
43+
get:
44+
index: test
45+
type: test
46+
id: 1
47+
- match: { _source.bytes_source_field: "1kb" }
48+
- match: { _source.bytes_target_field: 1024 }
49+
50+
- do:
51+
index:
52+
index: test
53+
type: test
54+
id: 2
55+
pipeline: "_none"
56+
body: {bytes_source_field: "1kb"}
57+
58+
- do:
59+
get:
60+
index: test
61+
type: test
62+
id: 2
63+
- match: { _source.bytes_source_field: "1kb" }
64+
- is_false: _source.bytes_target_field
65+
66+
- do:
67+
catch: bad_request
68+
index:
69+
index: test
70+
type: test
71+
id: 3
72+
pipeline: ""
73+
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -523,22 +523,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
523523
return -1;
524524
}
525525

526-
/**
527-
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
528-
*/
529-
public boolean hasIndexRequestsWithPipelines() {
530-
for (DocWriteRequest<?> actionRequest : requests) {
531-
if (actionRequest instanceof IndexRequest) {
532-
IndexRequest indexRequest = (IndexRequest) actionRequest;
533-
if (Strings.hasText(indexRequest.getPipeline())) {
534-
return true;
535-
}
536-
}
537-
}
538-
539-
return false;
540-
}
541-
542526
@Override
543527
public ActionRequestValidationException validate() {
544528
ActionRequestValidationException validationException = null;

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@
4747
import org.elasticsearch.cluster.metadata.MappingMetaData;
4848
import org.elasticsearch.cluster.metadata.MetaData;
4949
import org.elasticsearch.cluster.service.ClusterService;
50+
import org.elasticsearch.common.collect.ImmutableOpenMap;
5051
import org.elasticsearch.common.inject.Inject;
5152
import org.elasticsearch.common.settings.Settings;
5253
import org.elasticsearch.common.unit.TimeValue;
5354
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5455
import org.elasticsearch.common.util.concurrent.AtomicArray;
5556
import org.elasticsearch.index.Index;
5657
import org.elasticsearch.index.IndexNotFoundException;
58+
import org.elasticsearch.index.IndexSettings;
5759
import org.elasticsearch.index.VersionType;
5860
import org.elasticsearch.index.shard.ShardId;
5961
import org.elasticsearch.indices.IndexClosedException;
@@ -125,7 +127,29 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe
125127

126128
@Override
127129
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
128-
if (bulkRequest.hasIndexRequestsWithPipelines()) {
130+
boolean hasIndexRequestsWithPipelines = false;
131+
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
132+
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
133+
if (actionRequest instanceof IndexRequest) {
134+
IndexRequest indexRequest = (IndexRequest) actionRequest;
135+
String pipeline = indexRequest.getPipeline();
136+
if (pipeline == null) {
137+
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
138+
if (indexMetaData == null) {
139+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
140+
} else {
141+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
142+
indexRequest.setPipeline(defaultPipeline);
143+
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
144+
hasIndexRequestsWithPipelines = true;
145+
}
146+
}
147+
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
148+
hasIndexRequestsWithPipelines = true;
149+
}
150+
}
151+
}
152+
if (hasIndexRequestsWithPipelines) {
129153
if (clusterService.localNode().isIngestNode()) {
130154
processBulkIndexIngestRequest(task, bulkRequest, listener);
131155
} else {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ public ActionRequestValidationException validate() {
185185
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
186186
}
187187

188+
if (pipeline != null && pipeline.isEmpty()) {
189+
validationException = addValidationError("pipeline cannot be an empty string", validationException);
190+
}
191+
188192
return validationException;
189193
}
190194

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
155155
EngineConfig.INDEX_CODEC_SETTING,
156156
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
157157
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
158+
IndexSettings.DEFAULT_PIPELINE,
158159

159160
// validate that built-in similarities don't get redefined
160161
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.unit.ByteSizeValue;
3232
import org.elasticsearch.common.unit.TimeValue;
3333
import org.elasticsearch.index.translog.Translog;
34+
import org.elasticsearch.ingest.IngestService;
3435
import org.elasticsearch.node.Node;
3536

3637
import java.util.Collections;
@@ -254,6 +255,14 @@ public final class IndexSettings {
254255
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length",
255256
1000, 1, Property.Dynamic, Property.IndexScope);
256257

258+
public static final Setting<String> DEFAULT_PIPELINE =
259+
new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
260+
if (s == null || s.isEmpty()) {
261+
throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
262+
}
263+
return s;
264+
}, Property.Dynamic, Property.IndexScope);
265+
257266
private final Index index;
258267
private final Version version;
259268
private final Logger logger;
@@ -293,6 +302,7 @@ public final class IndexSettings {
293302
private volatile TimeValue searchIdleAfter;
294303
private volatile int maxAnalyzedOffset;
295304
private volatile int maxTermsCount;
305+
private volatile String defaultPipeline;
296306

297307
/**
298308
* The maximum number of refresh listeners allows on this shard.
@@ -408,6 +418,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
408418
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
409419
this.indexSortConfig = new IndexSortConfig(this);
410420
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
421+
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
411422

412423
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
413424
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
@@ -446,6 +457,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
446457
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
447458
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
448459
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
460+
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
449461
}
450462

451463
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
@@ -821,4 +833,12 @@ public boolean isExplicitRefresh() {
821833
* Returns the time that an index shard becomes search idle unless it's accessed in between
822834
*/
823835
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
836+
837+
public String getDefaultPipeline() {
838+
return defaultPipeline;
839+
}
840+
841+
public void setDefaultPipeline(String defaultPipeline) {
842+
this.defaultPipeline = defaultPipeline;
843+
}
824844
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
* Holder class for several ingest related services.
4040
*/
4141
public class IngestService {
42+
43+
public static final String NOOP_PIPELINE_NAME = "_none";
44+
4245
private final PipelineStore pipelineStore;
4346
private final PipelineExecutionService pipelineExecutionService;
4447

server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.update.UpdateRequest;
2525
import org.elasticsearch.cluster.ClusterChangedEvent;
2626
import org.elasticsearch.cluster.ClusterStateApplier;
27-
import org.elasticsearch.common.Strings;
2827
import org.elasticsearch.common.metrics.CounterMetric;
2928
import org.elasticsearch.common.metrics.MeanMetric;
3029
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -73,12 +72,16 @@ protected void doRun() throws Exception {
7372
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
7473
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
7574
}
76-
if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
75+
if (indexRequest == null) {
76+
continue;
77+
}
78+
String pipeline = indexRequest.getPipeline();
79+
if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
7780
try {
7881
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
7982
//this shouldn't be needed here but we do it for consistency with index api
8083
// which requires it to prevent double execution
81-
indexRequest.setPipeline(null);
84+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
8285
} catch (Exception e) {
8386
itemFailureHandler.accept(indexRequest, e);
8487
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.support.ActionFilters;
2727
import org.elasticsearch.action.update.UpdateRequest;
2828
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.metadata.MetaData;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.unit.TimeValue;
@@ -45,6 +46,7 @@
4546
import static java.util.Collections.emptySet;
4647
import static java.util.Collections.singleton;
4748
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.when;
4850

4951
public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
5052
public void testNonExceptional() {
@@ -97,7 +99,11 @@ public void testSomeFail() {
9799

98100
private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
99101
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
100-
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
102+
ClusterService clusterService = mock(ClusterService.class);
103+
ClusterState state = mock(ClusterState.class);
104+
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
105+
when(clusterService.state()).thenReturn(state);
106+
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
101107
null, null, null, mock(ActionFilters.class), null, null) {
102108
@Override
103109
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.DocWriteRequest;
2425
import org.elasticsearch.action.index.IndexAction;
@@ -28,13 +29,16 @@
2829
import org.elasticsearch.cluster.ClusterChangedEvent;
2930
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.ClusterStateApplier;
32+
import org.elasticsearch.cluster.metadata.IndexMetaData;
33+
import org.elasticsearch.cluster.metadata.MetaData;
3134
import org.elasticsearch.cluster.node.DiscoveryNode;
3235
import org.elasticsearch.cluster.node.DiscoveryNodes;
3336
import org.elasticsearch.cluster.service.ClusterService;
3437
import org.elasticsearch.common.collect.ImmutableOpenMap;
3538
import org.elasticsearch.common.settings.Settings;
3639
import org.elasticsearch.common.util.concurrent.AtomicArray;
3740
import org.elasticsearch.index.IndexNotFoundException;
41+
import org.elasticsearch.index.IndexSettings;
3842
import org.elasticsearch.ingest.IngestService;
3943
import org.elasticsearch.ingest.PipelineExecutionService;
4044
import org.elasticsearch.tasks.Task;
@@ -68,6 +72,11 @@
6872

6973
public class TransportBulkActionIngestTests extends ESTestCase {
7074

75+
/**
76+
* Index for which mock settings contain a default pipeline.
77+
*/
78+
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
79+
7180
/** Services needed by bulk action */
7281
TransportService transportService;
7382
ClusterService clusterService;
@@ -153,6 +162,15 @@ public void setupAction() {
153162
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
154163
ClusterState state = mock(ClusterState.class);
155164
when(state.getNodes()).thenReturn(nodes);
165+
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
166+
.putAll(
167+
Collections.singletonMap(
168+
WITH_DEFAULT_PIPELINE,
169+
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
170+
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
171+
.build()
172+
).numberOfShards(1).numberOfReplicas(1).build()))
173+
.build()).build());
156174
when(clusterService.state()).thenReturn(state);
157175
doAnswer(invocation -> {
158176
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
@@ -227,7 +245,7 @@ public void testIngestLocal() throws Exception {
227245
// now check success
228246
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
229247
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
230-
indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
248+
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
231249
completionHandler.getValue().accept(null);
232250
assertTrue(action.isExecuted);
233251
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -259,7 +277,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
259277
assertTrue(failureCalled.get());
260278

261279
// now check success
262-
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
280+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
263281
completionHandler.getValue().accept(null);
264282
assertTrue(action.isExecuted);
265283
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -359,4 +377,35 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
359377
}
360378
}
361379

380+
public void testUseDefaultPipeline() throws Exception {
381+
Exception exception = new Exception("fake exception");
382+
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
383+
indexRequest.source(Collections.emptyMap());
384+
AtomicBoolean responseCalled = new AtomicBoolean(false);
385+
AtomicBoolean failureCalled = new AtomicBoolean(false);
386+
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
387+
response -> {
388+
responseCalled.set(true);
389+
},
390+
e -> {
391+
assertThat(e, sameInstance(exception));
392+
failureCalled.set(true);
393+
}));
394+
395+
// check failure works, and passes through to the listener
396+
assertFalse(action.isExecuted); // haven't executed yet
397+
assertFalse(responseCalled.get());
398+
assertFalse(failureCalled.get());
399+
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
400+
completionHandler.getValue().accept(exception);
401+
assertTrue(failureCalled.get());
402+
403+
// now check success
404+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
405+
completionHandler.getValue().accept(null);
406+
assertTrue(action.isExecuted);
407+
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
408+
verifyZeroInteractions(transportService);
409+
}
410+
362411
}

0 commit comments

Comments
 (0)