@@ -317,6 +317,47 @@ public String getType() {
317317 assertThat (onFailureProcessor .getInvokedCounter (), equalTo (1 ));
318318 }
319319
320+ public void testNewCompoundProcessorException () {
321+ TestProcessor processor = new TestProcessor ("my_tag" , "my_type" , new RuntimeException ());
322+ IngestProcessorException ingestProcessorException1 =
323+ CompoundProcessor .newCompoundProcessorException (new RuntimeException (), processor , ingestDocument );
324+ assertThat (ingestProcessorException1 .getHeader ("processor_tag" ), equalTo (List .of ("my_tag" )));
325+ assertThat (ingestProcessorException1 .getHeader ("processor_type" ), equalTo (List .of ("my_type" )));
326+ assertThat (ingestProcessorException1 .getHeader ("pipeline_origin" ), nullValue ());
327+
328+ IngestProcessorException ingestProcessorException2 =
329+ CompoundProcessor .newCompoundProcessorException (ingestProcessorException1 , processor , ingestDocument );
330+ assertThat (ingestProcessorException2 , sameInstance (ingestProcessorException1 ));
331+ }
332+
333+ public void testNewCompoundProcessorExceptionPipelineOrigin () {
334+ Pipeline pipeline2 = new Pipeline ("2" , null , null ,
335+ new CompoundProcessor (new TestProcessor ("my_tag" , "my_type" , new RuntimeException ())));
336+ Pipeline pipeline1 = new Pipeline ("1" , null , null , new CompoundProcessor (new AbstractProcessor (null ) {
337+ @ Override
338+ public IngestDocument execute (IngestDocument ingestDocument ) throws Exception {
339+ throw new UnsupportedOperationException ();
340+ }
341+
342+ @ Override
343+ public void execute (IngestDocument ingestDocument , BiConsumer <IngestDocument , Exception > handler ) {
344+ ingestDocument .executePipeline (pipeline2 , handler );
345+ }
346+
347+ @ Override
348+ public String getType () {
349+ return "my_type2" ;
350+ }
351+ }));
352+
353+ Exception [] holder = new Exception [1 ];
354+ ingestDocument .executePipeline (pipeline1 , (document , e ) -> holder [0 ] = e );
355+ IngestProcessorException ingestProcessorException = (IngestProcessorException ) holder [0 ];
356+ assertThat (ingestProcessorException .getHeader ("processor_tag" ), equalTo (List .of ("my_tag" )));
357+ assertThat (ingestProcessorException .getHeader ("processor_type" ), equalTo (List .of ("my_type" )));
358+ assertThat (ingestProcessorException .getHeader ("pipeline_origin" ), equalTo (List .of ("2" , "1" )));
359+ }
360+
320361 private void assertStats (CompoundProcessor compoundProcessor , long count , long failed , long time ) {
321362 assertStats (0 , compoundProcessor , 0L , count , failed , time );
322363 }
0 commit comments