Skip to content

Commit 190ac8e

Browse files
authored
ingest: support default pipeline through an alias (#36231)
This commit allows writes that go through an alias to use the default pipeline defined on the backing index. Fixes #35817
1 parent 2d03eeb commit 190ac8e

File tree

3 files changed

+51
-16
lines changed

3 files changed

+51
-16
lines changed

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ teardown:
3131
settings:
3232
index:
3333
default_pipeline: "my_pipeline"
34+
aliases:
35+
test_alias: {}
3436

3537
- do:
3638
index:
@@ -49,25 +51,40 @@ teardown:
4951

5052
- do:
5153
index:
54+
index: test_alias
55+
type: test
56+
id: 2
57+
body: {bytes_source_field: "1kb"}
58+
59+
- do:
60+
get:
5261
index: test
5362
type: test
5463
id: 2
64+
- match: { _source.bytes_source_field: "1kb" }
65+
- match: { _source.bytes_target_field: 1024 }
66+
67+
- do:
68+
index:
69+
index: test
70+
type: test
71+
id: 3
5572
pipeline: "_none"
5673
body: {bytes_source_field: "1kb"}
5774

5875
- do:
5976
get:
6077
index: test
6178
type: test
62-
id: 2
79+
id: 3
6380
- match: { _source.bytes_source_field: "1kb" }
6481
- is_false: _source.bytes_target_field
65-
82+
6683
- do:
6784
catch: bad_request
6885
index:
6986
index: test
7087
type: test
71-
id: 3
88+
id: 4
7289
pipeline: ""
7390
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.cluster.ClusterStateObserver;
4545
import org.elasticsearch.cluster.block.ClusterBlockException;
4646
import org.elasticsearch.cluster.block.ClusterBlockLevel;
47+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
4748
import org.elasticsearch.cluster.metadata.IndexMetaData;
4849
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4950
import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -203,13 +204,22 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina
203204
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
204205
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
205206
boolean hasIndexRequestsWithPipelines = false;
206-
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
207+
final MetaData metaData = clusterService.state().getMetaData();
208+
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
207209
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
208210
if (actionRequest instanceof IndexRequest) {
209211
IndexRequest indexRequest = (IndexRequest) actionRequest;
210212
String pipeline = indexRequest.getPipeline();
211213
if (pipeline == null) {
212214
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
215+
if (indexMetaData == null) {
216+
//check the alias
217+
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
218+
if (indexOrAlias != null && indexOrAlias.isAlias()) {
219+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
220+
indexMetaData = alias.getWriteIndex();
221+
}
222+
}
213223
if (indexMetaData == null) {
214224
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
215225
} else {

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.ClusterChangedEvent;
3232
import org.elasticsearch.cluster.ClusterState;
3333
import org.elasticsearch.cluster.ClusterStateApplier;
34+
import org.elasticsearch.cluster.metadata.AliasMetaData;
3435
import org.elasticsearch.cluster.metadata.IndexMetaData;
3536
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3637
import org.elasticsearch.cluster.metadata.MetaData;
@@ -80,6 +81,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
8081
* Index for which mock settings contain a default pipeline.
8182
*/
8283
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
84+
private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline";
8385

8486
private static final Settings SETTINGS =
8587
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
@@ -190,7 +192,7 @@ public void setupAction() {
190192
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
191193
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
192194
.build()
193-
).numberOfShards(1).numberOfReplicas(1).build()))
195+
).putAlias(AliasMetaData.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build()))
194196
.build()).build();
195197
when(state.getMetaData()).thenReturn(metaData);
196198
when(state.metaData()).thenReturn(metaData);
@@ -399,15 +401,24 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
399401
}
400402

401403
public void testUseDefaultPipeline() throws Exception {
404+
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id"));
405+
}
406+
407+
public void testUseDefaultPipelineWithAlias() throws Exception {
408+
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
409+
}
410+
411+
public void testCreateIndexBeforeRunPipeline() throws Exception {
402412
Exception exception = new Exception("fake exception");
403-
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
413+
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
414+
indexRequest.setPipeline("testpipeline");
404415
indexRequest.source(Collections.emptyMap());
405416
AtomicBoolean responseCalled = new AtomicBoolean(false);
406417
AtomicBoolean failureCalled = new AtomicBoolean(false);
418+
action.needToCheck = true;
419+
action.indexCreated = false;
407420
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
408-
response -> {
409-
responseCalled.set(true);
410-
},
421+
response -> responseCalled.set(true),
411422
e -> {
412423
assertThat(e, sameInstance(exception));
413424
failureCalled.set(true);
@@ -429,17 +440,15 @@ public void testUseDefaultPipeline() throws Exception {
429440
verifyZeroInteractions(transportService);
430441
}
431442

432-
public void testCreateIndexBeforeRunPipeline() throws Exception {
443+
private void validateDefaultPipeline(IndexRequest indexRequest) {
433444
Exception exception = new Exception("fake exception");
434-
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
435-
indexRequest.setPipeline("testpipeline");
436445
indexRequest.source(Collections.emptyMap());
437446
AtomicBoolean responseCalled = new AtomicBoolean(false);
438447
AtomicBoolean failureCalled = new AtomicBoolean(false);
439-
action.needToCheck = true;
440-
action.indexCreated = false;
441448
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
442-
response -> responseCalled.set(true),
449+
response -> {
450+
responseCalled.set(true);
451+
},
443452
e -> {
444453
assertThat(e, sameInstance(exception));
445454
failureCalled.set(true);
@@ -460,5 +469,4 @@ public void testCreateIndexBeforeRunPipeline() throws Exception {
460469
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
461470
verifyZeroInteractions(transportService);
462471
}
463-
464472
}

0 commit comments

Comments
 (0)