@@ -306,6 +306,8 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
306306 listener .onResponse (new StartDataFrameTransformTaskAction .Response (true ));
307307 },
308308 exc -> {
309+ auditor .warning (transform .getId (),
310+ "Failed to persist to cluster state while marking task as started. Failure: " + exc .getMessage ());
309311 logger .error (new ParameterizedMessage ("[{}] failed updating state to [{}]." , getTransformId (), state ), exc );
310312 getIndexer ().stop ();
311313 listener .onFailure (new ElasticsearchException ("Error while updating state for data frame transform ["
@@ -412,7 +414,6 @@ void persistStateToClusterState(DataFrameTransformState state,
412414 listener .onResponse (success );
413415 },
414416 failure -> {
415- auditor .warning (transform .getId (), "Failed to persist to state to cluster state: " + failure .getMessage ());
416417 logger .error (new ParameterizedMessage ("[{}] failed to update cluster state for data frame transform." ,
417418 transform .getId ()),
418419 failure );
@@ -434,7 +435,6 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
434435 // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
435436 if (getIndexer () != null && getIndexer ().getState () == IndexerState .STOPPING ) {
436437 logger .info ("[{}] attempt to fail transform with reason [{}] while it was stopping." , getTransformId (), reason );
437- auditor .info (getTransformId (), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state." );
438438 listener .onResponse (null );
439439 return ;
440440 }
@@ -459,7 +459,10 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
459459 persistStateToClusterState (newState , ActionListener .wrap (
460460 r -> listener .onResponse (null ),
461461 e -> {
462- logger .error (new ParameterizedMessage ("[{}] failed to set task state as failed to cluster state." , getTransformId ()),
462+ String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "]." ;
463+ auditor .warning (transform .getId (),
464+ msg + " Failure: " + e .getMessage ());
465+ logger .error (new ParameterizedMessage ("[{}] {}" , getTransformId (), msg ),
463466 e );
464467 listener .onFailure (e );
465468 }
@@ -945,12 +948,6 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p
945948 protected void onFailure (Exception exc ) {
946949 // the failure handler must not throw an exception due to internal problems
947950 try {
948- // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
949- // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
950- if (exc .getMessage ().equals (lastAuditedExceptionMessage ) == false ) {
951- auditor .warning (transformTask .getTransformId (), "Data frame transform encountered an exception: " + exc .getMessage ());
952- lastAuditedExceptionMessage = exc .getMessage ();
953- }
954951 handleFailure (exc );
955952 } catch (Exception e ) {
956953 logger .error (
@@ -1052,13 +1049,17 @@ protected void createCheckpoint(ActionListener<DataFrameTransformCheckpoint> lis
10521049 createCheckpointException -> {
10531050 logger .warn (new ParameterizedMessage ("[{}] failed to create checkpoint." , transformId ),
10541051 createCheckpointException );
1055- listener .onFailure (new RuntimeException ("Failed to create checkpoint" , createCheckpointException ));
1052+ listener .onFailure (
1053+ new RuntimeException ("Failed to create checkpoint due to " + createCheckpointException .getMessage (),
1054+ createCheckpointException ));
10561055 }
10571056 )),
10581057 getCheckPointException -> {
10591058 logger .warn (new ParameterizedMessage ("[{}] failed to retrieve checkpoint." , transformId ),
10601059 getCheckPointException );
1061- listener .onFailure (new RuntimeException ("Failed to retrieve checkpoint" , getCheckPointException ));
1060+ listener .onFailure (
1061+ new RuntimeException ("Failed to retrieve checkpoint due to " + getCheckPointException .getMessage (),
1062+ getCheckPointException ));
10621063 }
10631064 ));
10641065 }
@@ -1103,6 +1104,15 @@ synchronized void handleFailure(Exception e) {
11031104 "task encountered irrecoverable failure: " + e .getMessage () :
11041105 "task encountered more than " + transformTask .getNumFailureRetries () + " failures; latest failure: " + e .getMessage ();
11051106 failIndexer (failureMessage );
1107+ } else {
1108+ // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
1109+ // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
1110+ if (e .getMessage ().equals (lastAuditedExceptionMessage ) == false ) {
1111+ auditor .warning (transformTask .getTransformId (),
1112+ "Data frame transform encountered an exception: " + e .getMessage () +
1113+ " Will attempt again at next scheduled trigger." );
1114+ lastAuditedExceptionMessage = e .getMessage ();
1115+ }
11061116 }
11071117 }
11081118
0 commit comments