4747import org .elasticsearch .cluster .metadata .AliasOrIndex ;
4848import org .elasticsearch .cluster .metadata .IndexMetaData ;
4949import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
50+ import org .elasticsearch .cluster .metadata .IndexTemplateMetaData ;
5051import org .elasticsearch .cluster .metadata .MappingMetaData ;
5152import org .elasticsearch .cluster .metadata .MetaData ;
53+ import org .elasticsearch .cluster .metadata .MetaDataIndexTemplateService ;
5254import org .elasticsearch .cluster .service .ClusterService ;
5355import org .elasticsearch .common .collect .ImmutableOpenMap ;
5456import org .elasticsearch .common .inject .Inject ;
@@ -155,6 +157,72 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
155157 final long startTime = relativeTime ();
156158 final AtomicArray <BulkItemResponse > responses = new AtomicArray <>(bulkRequest .requests .size ());
157159
160+ boolean hasIndexRequestsWithPipelines = false ;
161+ final MetaData metaData = clusterService .state ().getMetaData ();
162+ ImmutableOpenMap <String , IndexMetaData > indicesMetaData = metaData .indices ();
163+ for (DocWriteRequest <?> actionRequest : bulkRequest .requests ) {
164+ IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
165+ if (indexRequest != null ) {
166+ // get pipeline from request
167+ String pipeline = indexRequest .getPipeline ();
168+ if (pipeline == null ) {
169+ // start to look for default pipeline via settings found in the index meta data
170+ IndexMetaData indexMetaData = indicesMetaData .get (actionRequest .index ());
171+ if (indexMetaData == null && indexRequest .index () != null ) {
172+ // if the write request if through an alias use the write index's meta data
173+ AliasOrIndex indexOrAlias = metaData .getAliasAndIndexLookup ().get (indexRequest .index ());
174+ if (indexOrAlias != null && indexOrAlias .isAlias ()) {
175+ AliasOrIndex .Alias alias = (AliasOrIndex .Alias ) indexOrAlias ;
176+ indexMetaData = alias .getWriteIndex ();
177+ }
178+ }
179+ if (indexMetaData != null ) {
180+ // Find the the default pipeline if one is defined from and existing index.
181+ String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetaData .getSettings ());
182+ indexRequest .setPipeline (defaultPipeline );
183+ if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
184+ hasIndexRequestsWithPipelines = true ;
185+ }
186+ } else if (indexRequest .index () != null ) {
187+ // No index exists yet (and is valid request), so matching index templates to look for a default pipeline
188+ List <IndexTemplateMetaData > templates = MetaDataIndexTemplateService .findTemplates (metaData , indexRequest .index ());
189+ assert (templates != null );
190+ String defaultPipeline = IngestService .NOOP_PIPELINE_NAME ;
191+ // order of templates are highest order first, break if we find a default_pipeline
192+ for (IndexTemplateMetaData template : templates ) {
193+ final Settings settings = template .settings ();
194+ if (IndexSettings .DEFAULT_PIPELINE .exists (settings )) {
195+ defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (settings );
196+ break ;
197+ }
198+ }
199+ indexRequest .setPipeline (defaultPipeline );
200+ if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
201+ hasIndexRequestsWithPipelines = true ;
202+ }
203+ }
204+ } else if (IngestService .NOOP_PIPELINE_NAME .equals (pipeline ) == false ) {
205+ hasIndexRequestsWithPipelines = true ;
206+ }
207+ }
208+ }
209+
210+ if (hasIndexRequestsWithPipelines ) {
211+ // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
212+ // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
213+ // this path is never taken.
214+ try {
215+ if (clusterService .localNode ().isIngestNode ()) {
216+ processBulkIndexIngestRequest (task , bulkRequest , listener );
217+ } else {
218+ ingestForwarder .forwardIngestRequest (BulkAction .INSTANCE , bulkRequest , listener );
219+ }
220+ } catch (Exception e ) {
221+ listener .onFailure (e );
222+ }
223+ return ;
224+ }
225+
158226 if (needToCheck ()) {
159227 // Attempt to create all the indices that we're going to need during the bulk before we start.
160228 // Step 1: collect all the indices in the request
@@ -185,15 +253,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
185253 }
186254 // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
187255 if (autoCreateIndices .isEmpty ()) {
188- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
256+ executeBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
189257 } else {
190258 final AtomicInteger counter = new AtomicInteger (autoCreateIndices .size ());
191259 for (String index : autoCreateIndices ) {
192260 createIndex (index , bulkRequest .timeout (), new ActionListener <CreateIndexResponse >() {
193261 @ Override
194262 public void onResponse (CreateIndexResponse result ) {
195263 if (counter .decrementAndGet () == 0 ) {
196- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
264+ executeBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated );
197265 }
198266 }
199267
@@ -209,7 +277,7 @@ public void onFailure(Exception e) {
209277 }
210278 }
211279 if (counter .decrementAndGet () == 0 ) {
212- executeIngestAndBulk (task , bulkRequest , startTime , ActionListener .wrap (listener ::onResponse , inner -> {
280+ executeBulk (task , bulkRequest , startTime , ActionListener .wrap (listener ::onResponse , inner -> {
213281 inner .addSuppressed (e );
214282 listener .onFailure (inner );
215283 }), responses , indicesThatCannotBeCreated );
@@ -219,56 +287,7 @@ public void onFailure(Exception e) {
219287 }
220288 }
221289 } else {
222- executeIngestAndBulk (task , bulkRequest , startTime , listener , responses , emptyMap ());
223- }
224- }
225-
226- private void executeIngestAndBulk (Task task , final BulkRequest bulkRequest , final long startTimeNanos ,
227- final ActionListener <BulkResponse > listener , final AtomicArray <BulkItemResponse > responses ,
228- Map <String , IndexNotFoundException > indicesThatCannotBeCreated ) {
229- boolean hasIndexRequestsWithPipelines = false ;
230- final MetaData metaData = clusterService .state ().getMetaData ();
231- ImmutableOpenMap <String , IndexMetaData > indicesMetaData = metaData .indices ();
232- for (DocWriteRequest <?> actionRequest : bulkRequest .requests ) {
233- IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
234- if (indexRequest != null ){
235- String pipeline = indexRequest .getPipeline ();
236- if (pipeline == null ) {
237- IndexMetaData indexMetaData = indicesMetaData .get (actionRequest .index ());
238- if (indexMetaData == null && indexRequest .index () != null ) {
239- //check the alias
240- AliasOrIndex indexOrAlias = metaData .getAliasAndIndexLookup ().get (indexRequest .index ());
241- if (indexOrAlias != null && indexOrAlias .isAlias ()) {
242- AliasOrIndex .Alias alias = (AliasOrIndex .Alias ) indexOrAlias ;
243- indexMetaData = alias .getWriteIndex ();
244- }
245- }
246- if (indexMetaData == null ) {
247- indexRequest .setPipeline (IngestService .NOOP_PIPELINE_NAME );
248- } else {
249- String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetaData .getSettings ());
250- indexRequest .setPipeline (defaultPipeline );
251- if (IngestService .NOOP_PIPELINE_NAME .equals (defaultPipeline ) == false ) {
252- hasIndexRequestsWithPipelines = true ;
253- }
254- }
255- } else if (IngestService .NOOP_PIPELINE_NAME .equals (pipeline ) == false ) {
256- hasIndexRequestsWithPipelines = true ;
257- }
258- }
259- }
260- if (hasIndexRequestsWithPipelines ) {
261- try {
262- if (clusterService .localNode ().isIngestNode ()) {
263- processBulkIndexIngestRequest (task , bulkRequest , listener );
264- } else {
265- ingestForwarder .forwardIngestRequest (BulkAction .INSTANCE , bulkRequest , listener );
266- }
267- } catch (Exception e ) {
268- listener .onFailure (e );
269- }
270- } else {
271- executeBulk (task , bulkRequest , startTimeNanos , listener , responses , indicesThatCannotBeCreated );
290+ executeBulk (task , bulkRequest , startTime , listener , responses , emptyMap ());
272291 }
273292 }
274293
0 commit comments