Skip to content

Commit eef1ba3

Browse files
committed
Make ingest pipeline resolution logic unit testable (#47026)
Extracted ingest pipeline resolution logic into a static method and added unit tests for pipeline resolution logic. Followup from #46847
1 parent b234d2c commit eef1ba3

File tree

2 files changed

+266
-106
lines changed

2 files changed

+266
-106
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 109 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.cluster.metadata.MetaData;
5555
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
5656
import org.elasticsearch.cluster.service.ClusterService;
57-
import org.elasticsearch.common.collect.ImmutableOpenMap;
5857
import org.elasticsearch.common.inject.Inject;
5958
import org.elasticsearch.common.settings.Settings;
6059
import org.elasticsearch.common.unit.TimeValue;
@@ -159,115 +158,13 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
159158

160159
boolean hasIndexRequestsWithPipelines = false;
161160
final MetaData metaData = clusterService.state().getMetaData();
162-
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
163161
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
164162
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
165-
166163
if (indexRequest != null) {
167-
if (indexRequest.isPipelineResolved() == false) {
168-
final String requestPipeline = indexRequest.getPipeline();
169-
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
170-
boolean requestCanOverridePipeline = true;
171-
String requiredPipeline = null;
172-
// start to look for default or required pipelines via settings found in the index meta data
173-
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
174-
// check the alias for the index request (this is how normal index requests are modeled)
175-
if (indexMetaData == null && indexRequest.index() != null) {
176-
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
177-
if (indexOrAlias != null && indexOrAlias.isAlias()) {
178-
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
179-
indexMetaData = alias.getWriteIndex();
180-
}
181-
}
182-
// check the alias for the action request (this is how upserts are modeled)
183-
if (indexMetaData == null && actionRequest.index() != null) {
184-
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
185-
if (indexOrAlias != null && indexOrAlias.isAlias()) {
186-
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
187-
indexMetaData = alias.getWriteIndex();
188-
}
189-
}
190-
if (indexMetaData != null) {
191-
final Settings indexSettings = indexMetaData.getSettings();
192-
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
193-
// find the required pipeline if one is defined from an existing index
194-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
195-
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
196-
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
197-
indexRequest.setPipeline(requiredPipeline);
198-
requestCanOverridePipeline = false;
199-
} else {
200-
// find the default pipeline if one is defined from an existing index
201-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
202-
indexRequest.setPipeline(defaultPipeline);
203-
}
204-
} else if (indexRequest.index() != null) {
205-
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
206-
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
207-
assert (templates != null);
208-
// order of templates are highest order first, we have to iterate through them all though
209-
String defaultPipeline = null;
210-
for (IndexTemplateMetaData template : templates) {
211-
final Settings settings = template.settings();
212-
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
213-
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
214-
requestCanOverridePipeline = false;
215-
// we can not break in case a lower-order template has a default pipeline that we need to reject
216-
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
217-
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
218-
// we can not break in case a lower-order template has a required pipeline that we need to reject
219-
}
220-
}
221-
if (requiredPipeline != null && defaultPipeline != null) {
222-
// we can not have picked up a required and a default pipeline from applying templates
223-
final String message = String.format(
224-
Locale.ROOT,
225-
"required pipeline [%s] and default pipeline [%s] can not both be set",
226-
requiredPipeline,
227-
defaultPipeline);
228-
throw new IllegalArgumentException(message);
229-
}
230-
final String pipeline;
231-
if (requiredPipeline != null) {
232-
pipeline = requiredPipeline;
233-
} else {
234-
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
235-
}
236-
indexRequest.setPipeline(pipeline);
237-
}
238-
239-
if (requestPipeline != null) {
240-
if (requestCanOverridePipeline == false) {
241-
final String message = String.format(
242-
Locale.ROOT,
243-
"request pipeline [%s] can not override required pipeline [%s]",
244-
requestPipeline,
245-
requiredPipeline);
246-
throw new IllegalArgumentException(message);
247-
} else {
248-
indexRequest.setPipeline(requestPipeline);
249-
}
250-
}
251-
252-
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
253-
hasIndexRequestsWithPipelines = true;
254-
}
255-
/*
256-
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
257-
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
258-
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
259-
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
260-
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
261-
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
262-
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
263-
* pipeline parameter too.
264-
*/
265-
indexRequest.isPipelineResolved(true);
266-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
267-
hasIndexRequestsWithPipelines = true;
268-
}
164+
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
165+
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
166+
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
269167
}
270-
271168
}
272169

273170
if (hasIndexRequestsWithPipelines) {
@@ -363,6 +260,112 @@ public void onFailure(Exception e) {
363260
}
364261
}
365262

263+
static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
264+
IndexRequest indexRequest,
265+
MetaData metaData) {
266+
267+
if (indexRequest.isPipelineResolved() == false) {
268+
final String requestPipeline = indexRequest.getPipeline();
269+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
270+
boolean requestCanOverridePipeline = true;
271+
String requiredPipeline = null;
272+
// start to look for default or required pipelines via settings found in the index meta data
273+
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
274+
// check the alias for the index request (this is how normal index requests are modeled)
275+
if (indexMetaData == null && indexRequest.index() != null) {
276+
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
277+
if (indexOrAlias != null && indexOrAlias.isAlias()) {
278+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
279+
indexMetaData = alias.getWriteIndex();
280+
}
281+
}
282+
// check the alias for the action request (this is how upserts are modeled)
283+
if (indexMetaData == null && originalRequest.index() != null) {
284+
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(originalRequest.index());
285+
if (indexOrAlias != null && indexOrAlias.isAlias()) {
286+
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
287+
indexMetaData = alias.getWriteIndex();
288+
}
289+
}
290+
if (indexMetaData != null) {
291+
final Settings indexSettings = indexMetaData.getSettings();
292+
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
293+
// find the required pipeline if one is defined from an existing index
294+
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
295+
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
296+
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
297+
indexRequest.setPipeline(requiredPipeline);
298+
requestCanOverridePipeline = false;
299+
} else {
300+
// find the default pipeline if one is defined from an existing index
301+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
302+
indexRequest.setPipeline(defaultPipeline);
303+
}
304+
} else if (indexRequest.index() != null) {
305+
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
306+
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
307+
assert (templates != null);
308+
// order of templates are highest order first, we have to iterate through them all though
309+
String defaultPipeline = null;
310+
for (IndexTemplateMetaData template : templates) {
311+
final Settings settings = template.settings();
312+
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
313+
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
314+
requestCanOverridePipeline = false;
315+
// we can not break in case a lower-order template has a default pipeline that we need to reject
316+
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
317+
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
318+
// we can not break in case a lower-order template has a required pipeline that we need to reject
319+
}
320+
}
321+
if (requiredPipeline != null && defaultPipeline != null) {
322+
// we can not have picked up a required and a default pipeline from applying templates
323+
final String message = String.format(
324+
Locale.ROOT,
325+
"required pipeline [%s] and default pipeline [%s] can not both be set",
326+
requiredPipeline,
327+
defaultPipeline);
328+
throw new IllegalArgumentException(message);
329+
}
330+
final String pipeline;
331+
if (requiredPipeline != null) {
332+
pipeline = requiredPipeline;
333+
} else {
334+
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
335+
}
336+
indexRequest.setPipeline(pipeline);
337+
}
338+
339+
if (requestPipeline != null) {
340+
if (requestCanOverridePipeline == false) {
341+
final String message = String.format(
342+
Locale.ROOT,
343+
"request pipeline [%s] can not override required pipeline [%s]",
344+
requestPipeline,
345+
requiredPipeline);
346+
throw new IllegalArgumentException(message);
347+
} else {
348+
indexRequest.setPipeline(requestPipeline);
349+
}
350+
}
351+
352+
/*
353+
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
354+
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
355+
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
356+
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
357+
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
358+
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
359+
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
360+
* pipeline parameter too.
361+
*/
362+
indexRequest.isPipelineResolved(true);
363+
}
364+
365+
// Return whether this index request has a pipeline
366+
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
367+
}
368+
366369
boolean needToCheck() {
367370
return autoCreateIndex.needToCheck();
368371
}

0 commit comments

Comments
 (0)