Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -155,115 +154,13 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk

boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);

if (indexRequest != null) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && actionRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}

if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}

}

if (hasIndexRequestsWithPipelines) {
Expand Down Expand Up @@ -359,6 +256,112 @@ public void onFailure(Exception e) {
}
}

static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
MetaData metaData) {

if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && originalRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(originalRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}

/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
}

// Return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
}

boolean needToCheck() {
return autoCreateIndex.needToCheck();
}
Expand Down
Loading