Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ teardown:
settings:
index:
default_pipeline: "my_pipeline"
aliases:
test_alias: {}

- do:
index:
Expand All @@ -49,25 +51,40 @@ teardown:

- do:
index:
index: test_alias
type: test
id: 2
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

- do:
index:
index: test
type: test
id: 3
pipeline: "_none"
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
id: 3
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

- do:
catch: bad_request
index:
index: test
type: test
id: 3
id: 4
pipeline: ""
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
Expand Down Expand Up @@ -203,13 +204,22 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
//check the alias
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
* Index for which mock settings contain a default pipeline.
*/
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline";

private static final Settings SETTINGS =
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
Expand Down Expand Up @@ -190,7 +192,7 @@ public void setupAction() {
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.build()
).numberOfShards(1).numberOfReplicas(1).build()))
).putAlias(AliasMetaData.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build()))
.build()).build();
when(state.getMetaData()).thenReturn(metaData);
when(state.metaData()).thenReturn(metaData);
Expand Down Expand Up @@ -399,15 +401,24 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
}

public void testUseDefaultPipeline() throws Exception {
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id"));
}

public void testUseDefaultPipelineWithAlias() throws Exception {
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
}

public void testCreateIndexBeforeRunPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
indexRequest.setPipeline("testpipeline");
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.needToCheck = true;
action.indexCreated = false;
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
response -> responseCalled.set(true),
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
Expand All @@ -429,17 +440,15 @@ public void testUseDefaultPipeline() throws Exception {
verifyZeroInteractions(transportService);
}

public void testCreateIndexBeforeRunPipeline() throws Exception {
private void validateDefaultPipeline(IndexRequest indexRequest) {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
indexRequest.setPipeline("testpipeline");
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
action.needToCheck = true;
action.indexCreated = false;
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> responseCalled.set(true),
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
Expand All @@ -460,5 +469,4 @@ public void testCreateIndexBeforeRunPipeline() throws Exception {
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

}