@@ -497,67 +497,61 @@ private void executePipelines(
497497 final BiConsumer <Thread , Exception > onCompletion ,
498498 final Thread originalThread
499499 ) {
500- while (it .hasNext ()) {
501- final String pipelineId = it .next ();
502- try {
503- PipelineHolder holder = pipelines .get (pipelineId );
504- if (holder == null ) {
505- throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
500+ assert it .hasNext ();
501+ final String pipelineId = it .next ();
502+ try {
503+ PipelineHolder holder = pipelines .get (pipelineId );
504+ if (holder == null ) {
505+ throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
506+ }
507+ Pipeline pipeline = holder .pipeline ;
508+ String originalIndex = indexRequest .indices ()[0 ];
509+ innerExecute (slot , indexRequest , pipeline , onDropped , e -> {
510+ if (e != null ) {
511+ logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
512+ pipelineId , indexRequest .index (), indexRequest .id ()), e );
513+ onFailure .accept (slot , e );
506514 }
507- Pipeline pipeline = holder .pipeline ;
508- String originalIndex = indexRequest .indices ()[0 ];
509- innerExecute (slot , indexRequest , pipeline , onDropped , e -> {
510- if (e != null ) {
511- logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
512- pipelineId , indexRequest .index (), indexRequest .id ()), e );
513- onFailure .accept (slot , e );
514- }
515515
516- Iterator <String > newIt = it ;
517- boolean newHasFinalPipeline = hasFinalPipeline ;
518- String newIndex = indexRequest .indices ()[0 ];
516+ Iterator <String > newIt = it ;
517+ boolean newHasFinalPipeline = hasFinalPipeline ;
518+ String newIndex = indexRequest .indices ()[0 ];
519519
520- if (Objects .equals (originalIndex , newIndex ) == false ) {
521- if (hasFinalPipeline && it .hasNext () == false ) {
522- totalMetrics .ingestFailed ();
523- onFailure .accept (slot , new IllegalStateException ("final pipeline [" + pipelineId +
524- "] can't change the target index" ));
520+ if (Objects .equals (originalIndex , newIndex ) == false ) {
521+ if (hasFinalPipeline && it .hasNext () == false ) {
522+ totalMetrics .ingestFailed ();
523+ onFailure .accept (slot , new IllegalStateException ("final pipeline [" + pipelineId +
524+ "] can't change the target index" ));
525+ } else {
526+ indexRequest .isPipelineResolved (false );
527+ resolvePipelines (null , indexRequest , state .metadata ());
528+ if (IngestService .NOOP_PIPELINE_NAME .equals (indexRequest .getFinalPipeline ()) == false ) {
529+ newIt = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
530+ newHasFinalPipeline = true ;
525531 } else {
526-
527- //Drain old it so it's not looped over
528- it .forEachRemaining ($ -> {
529- });
530- indexRequest .isPipelineResolved (false );
531- resolvePipelines (null , indexRequest , state .metadata ());
532- if (IngestService .NOOP_PIPELINE_NAME .equals (indexRequest .getFinalPipeline ()) == false ) {
533- newIt = Collections .singleton (indexRequest .getFinalPipeline ()).iterator ();
534- newHasFinalPipeline = true ;
535- } else {
536- newIt = Collections .emptyIterator ();
537- }
532+ newIt = Collections .emptyIterator ();
538533 }
539534 }
535+ }
540536
541- if (newIt .hasNext ()) {
542- executePipelines (slot , newIt , newHasFinalPipeline , indexRequest , onDropped , onFailure , counter , onCompletion ,
543- originalThread );
544- } else {
545- if (counter .decrementAndGet () == 0 ) {
546- onCompletion .accept (originalThread , null );
547- }
548- assert counter .get () >= 0 ;
537+ if (newIt .hasNext ()) {
538+ executePipelines (slot , newIt , newHasFinalPipeline , indexRequest , onDropped , onFailure , counter , onCompletion ,
539+ originalThread );
540+ } else {
541+ if (counter .decrementAndGet () == 0 ) {
542+ onCompletion .accept (originalThread , null );
549543 }
550- });
551- } catch (Exception e ) {
552- logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
553- pipelineId , indexRequest .index (), indexRequest .id ()), e );
554- onFailure .accept (slot , e );
555- if (counter .decrementAndGet () == 0 ) {
556- onCompletion .accept (originalThread , null );
544+ assert counter .get () >= 0 ;
557545 }
558- assert counter .get () >= 0 ;
559- break ;
546+ });
547+ } catch (Exception e ) {
548+ logger .debug (() -> new ParameterizedMessage ("failed to execute pipeline [{}] for document [{}/{}]" ,
549+ pipelineId , indexRequest .index (), indexRequest .id ()), e );
550+ onFailure .accept (slot , e );
551+ if (counter .decrementAndGet () == 0 ) {
552+ onCompletion .accept (originalThread , null );
560553 }
554+ assert counter .get () >= 0 ;
561555 }
562556 }
563557
0 commit comments