@@ -928,10 +928,8 @@ private void innerExecute(
928928 boolean ensureNoSelfReferences = ingestDocument .doNoSelfReferencesCheck ();
929929 indexRequest .source (ingestDocument .getSource (), indexRequest .getContentType (), ensureNoSelfReferences );
930930 } catch (IllegalArgumentException ex ) {
931- // An IllegalArgumentException can be thrown when an ingest
932- // processor creates a source map that is self-referencing.
933- // In that case, we catch and wrap the exception so we can
934- // include which pipeline failed.
931+ // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
932+ // In that case, we catch and wrap the exception, so we can include which pipeline failed.
935933 totalMetrics .ingestFailed ();
936934 handler .accept (
937935 new IllegalArgumentException (
@@ -940,6 +938,19 @@ private void innerExecute(
940938 )
941939 );
942940 return ;
941+ } catch (Exception ex ) {
942+ // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
943+ // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
944+ // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
945+ // no self references.
946+ totalMetrics .ingestFailed ();
947+ handler .accept (
948+ new RuntimeException (
949+ "Failed to generate the source document for ingest pipeline [" + pipeline .getId () + "]" ,
950+ ex
951+ )
952+ );
953+ return ;
943954 }
944955 Map <String , String > map ;
945956 if ((map = metadata .getDynamicTemplates ()) != null ) {
0 commit comments