From 30e7c57b29601cff91f4c48cc0fb4bbbfa43ebb5 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 21 Aug 2019 08:05:02 -0500 Subject: [PATCH 1/3] [ML][Transforms] unifying logging, adding some more logging --- .../transforms/DataFrameTransformTask.java | 83 ++++++++++--------- .../ClientDataFrameIndexerTests.java | 28 +++---- 2 files changed, 60 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 641e3a0d1d777..068e1c0314bc5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -246,7 +245,7 @@ public synchronized void setTaskStateStopped() { * @param listener Started listener */ public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { - logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState()); + logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState()); if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { listener.onFailure(new ElasticsearchStatusException( DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, @@ -288,7 +287,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis null, getIndexer().getProgress()); - logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + logger.info("[{}] updating state for data frame transform to [{}].", transform.getId(), state.toString()); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that @@ -304,6 +303,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { + logger.error("[" + transform.getId() + "] failed updating state to [" + state + "].", exc); getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); @@ -352,12 +352,12 @@ public synchronized void triggered(Event event) { } if (getIndexer() == null) { - logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); + logger.warn("[{}] data frame task triggered with an unintialized indexer.", getTransformId()); return; } if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId()); + logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId()); return; } @@ -366,15 +366,15 @@ public synchronized void triggered(Event event) { if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState) || IndexerState.STOPPED.equals(indexerState)) { - logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState); + logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState); return; } - logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState); + logger.debug("[{}] data frame indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState); // if it runs for the 1st time we just do it, if not we check for changes if (currentCheckpoint.get() == 0) { - logger.debug("Trigger initial run"); + logger.debug("Trigger initial run."); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } else if (getIndexer().isContinuous()) { getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); @@ -405,12 +405,12 @@ void persistStateToClusterState(DataFrameTransformState state, ActionListener> listener) { updatePersistentTaskState(state, ActionListener.wrap( success -> { - logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + logger.debug("[{}] successfully updated state for data frame transform to [{}].", transform.getId(), state.toString()); listener.onResponse(success); }, failure -> { auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage()); - logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure); + logger.error("[" + transform.getId() + "] failed to update cluster state for data frame transform.", failure); listener.onFailure(failure); } )); @@ -420,7 +420,7 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); + logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason); listener.onResponse(null); return; } @@ -428,7 +428,7 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops, // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { - logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); + logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason); auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); listener.onResponse(null); return; @@ -436,7 +436,7 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // If we are stopped, this means that between the failure occurring and being handled, somebody called stop // We should just allow that stop to continue if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) { - logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason); + logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason); listener.onResponse(null); return; } @@ -454,7 +454,7 @@ synchronized void markAsFailed(String reason, ActionListener listener) { persistStateToClusterState(newState, ActionListener.wrap( r -> listener.onResponse(null), e -> { - logger.error("Failed to set task state as failed to cluster state", e); + logger.error("[" + getTransformId() + "] failed to set task state as failed to cluster state.", e); listener.onFailure(e); } )); @@ -467,8 +467,9 @@ synchronized void markAsFailed(String reason, ActionListener listener) { */ @Override public synchronized void onCancelled() { - logger.info( - "Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]"); + logger.info("[{}] received cancellation request for data frame transform, state: [{}].", + getTransformId(), + taskState.get()); if (getIndexer() != null && getIndexer().abort()) { // there is no background transform running, we can shutdown safely shutdown(); @@ -692,13 +693,13 @@ protected void onStart(long now, ActionListener listener) { } TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( newProgress -> { - logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); + logger.trace("[{}] reset the progress from [{}] to [{}].", transformId, progress, newProgress); progress = newProgress; super.onStart(now, listener); }, failure -> { progress = null; - logger.warn("Unable to load progress information for task [" + transformId + "]", failure); + logger.warn("[" + transformId + "] unable to load progress information for task.", failure); super.onStart(now, listener); } )); @@ -775,14 +776,14 @@ public CheckpointProvider getCheckpointProvider() { @Override public synchronized boolean maybeTriggerAsyncJob(long now) { if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId()); + logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId()); return false; } // ignore trigger if indexer is running, prevents log spam in A2P indexer IndexerState indexerState = getState(); if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) { - logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getJobId(), indexerState); + logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState); return false; } @@ -873,7 +874,7 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p indexerState = IndexerState.STOPPED; auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); - logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId()); + logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); } final DataFrameTransformState state = new DataFrameTransformState( @@ -883,7 +884,7 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), getProgress()); - logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString()); + logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); // Persist the current state and stats in the internal index. The interval of this method being // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so @@ -899,7 +900,7 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p next.run(); }, statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + logger.error("[" + transformConfig.getId() + "] updating stats of transform failed.", statsExc); auditor.warning(getJobId(), "Failure updating stats of transform: " + statsExc.getMessage()); // for auto stop shutdown the task @@ -923,7 +924,9 @@ protected void onFailure(Exception exc) { } handleFailure(exc); } catch (Exception e) { - logger.error("Data frame transform encountered an unexpected internal exception: " ,e); + logger.error( + "[" + transformTask.getTransformId() + "] data frame transform encountered an unexpected internal exception: ", + e); } } @@ -948,7 +951,6 @@ protected void onFinish(ActionListener listener) { if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); } - logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint)); // If the last checkpoint is now greater than 1, that means that we have just processed the first // continuous checkpoint and should start recording the exponential averages if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) { @@ -968,7 +970,7 @@ protected void onFinish(ActionListener listener) { "Finished indexing for data frame transform checkpoint [" + checkpoint + "]."); } logger.debug( - "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]"); + "[{}] finished indexing for data frame transform checkpoint [{}].", getJobId(), checkpoint); auditBulkFailures = true; listener.onResponse(null); } catch (Exception e) { @@ -990,7 +992,10 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { if (++logCount % logEvery != 0) { return false; } - int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1)); + if (completedCheckpoint == 0) { + return true; + } + int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint)); logEvery = log10Checkpoint >= 3 ? 1_000 : (int)Math.pow(10.0, log10Checkpoint); logCount = 0; return true; @@ -999,13 +1004,13 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { @Override protected void onStop() { auditor.info(transformConfig.getId(), "Data frame transform has stopped."); - logger.info("Data frame transform [{}] has stopped", transformConfig.getId()); + logger.info("[{}] data frame transform has stopped.", transformConfig.getId()); } @Override protected void onAbort() { auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform."); - logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer"); + logger.info("[{}] data frame transform received abort request. Stopping indexer.", transformConfig.getId()); transformTask.shutdown(); } @@ -1015,11 +1020,15 @@ protected void createCheckpoint(ActionListener lis checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap( putCheckPointResponse -> listener.onResponse(checkpoint), - createCheckpointException -> - listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)) + createCheckpointException -> { + logger.warn("[" + transformId + "] failed to create checkpoint.", createCheckpointException); + listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)); + } )), - getCheckPointException -> - listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)) + getCheckPointException -> { + logger.warn("[" + transformId + "] failed to retrieve checkpoint.", getCheckPointException); + listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); + } )); } @@ -1028,12 +1037,12 @@ protected void sourceHasChanged(ActionListener hasChangedListener) { checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap( hasChanged -> { - logger.trace("[{}] change detected [{}]", transformId, hasChanged); + logger.trace("[{}] change detected [{}].", transformId, hasChanged); hasChangedListener.onResponse(hasChanged); }, e -> { logger.warn( - "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.", + "[" + transformId + "] failed to detect changes for data frame transform. Skipping update till next check.", e); auditor.warning(transformId, "Failed to detect changes for data frame transform, skipping update till next check. Exception: " @@ -1049,7 +1058,7 @@ private boolean isIrrecoverableFailure(Exception e) { } synchronized void handleFailure(Exception e) { - logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e); + logger.warn("[" + transformTask.getTransformId() + "] data frame transform encountered an exception: ", e); if (handleCircuitBreakingException(e)) { return; } @@ -1064,7 +1073,7 @@ synchronized void handleFailure(Exception e) { @Override protected void failIndexer(String failureMessage) { - logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage); + logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage); auditor.error(transformTask.getTransformId(), failureMessage); transformTask.markAsFailed(failureMessage, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java index 2090e75ab45a5..4a23a57efccc5 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java @@ -74,30 +74,30 @@ public void testAudiOnFinishFrequency() { // Audit every checkpoint for the first 10 assertTrue(shouldAudit.get(0)); assertTrue(shouldAudit.get(1)); - assertTrue(shouldAudit.get(9)); + assertTrue(shouldAudit.get(10)); // Then audit every 10 while < 100 - assertFalse(shouldAudit.get(10)); assertFalse(shouldAudit.get(11)); - assertTrue(shouldAudit.get(19)); - assertTrue(shouldAudit.get(29)); - assertFalse(shouldAudit.get(30)); - assertTrue(shouldAudit.get(99)); + assertTrue(shouldAudit.get(20)); + assertFalse(shouldAudit.get(29)); + assertTrue(shouldAudit.get(30)); + assertFalse(shouldAudit.get(99)); // Then audit every 100 < 1000 - assertFalse(shouldAudit.get(100)); + assertTrue(shouldAudit.get(100)); assertFalse(shouldAudit.get(109)); assertFalse(shouldAudit.get(110)); - assertTrue(shouldAudit.get(199)); + assertFalse(shouldAudit.get(199)); // Then audit every 1000 for the rest of time - assertTrue(shouldAudit.get(1999)); + assertFalse(shouldAudit.get(1999)); assertFalse(shouldAudit.get(2199)); - assertTrue(shouldAudit.get(2999)); - assertTrue(shouldAudit.get(9999)); - assertTrue(shouldAudit.get(10_999)); - assertFalse(shouldAudit.get(11_000)); - assertTrue(shouldAudit.get(11_999)); + assertTrue(shouldAudit.get(3000)); + assertTrue(shouldAudit.get(10_000)); + assertFalse(shouldAudit.get(10_999)); + assertTrue(shouldAudit.get(11_000)); + assertFalse(shouldAudit.get(11_001)); + assertFalse(shouldAudit.get(11_999)); } } From cd3be50bf0282e94c4f90b85e37a7ffb78687596 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 22 Aug 2019 07:29:39 -0500 Subject: [PATCH 2/3] using parameterizedMessage instead of string concat --- .../transforms/DataFrameTransformTask.java | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 068e1c0314bc5..3e80769dc448d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; @@ -303,7 +304,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { - logger.error("[" + transform.getId() + "] failed updating state to [" + state + "].", exc); + logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc); getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); @@ -410,7 +411,9 @@ void persistStateToClusterState(DataFrameTransformState state, }, failure -> { auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage()); - logger.error("[" + transform.getId() + "] failed to update cluster state for data frame transform.", failure); + logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.", + transform.getId()), + failure); listener.onFailure(failure); } )); @@ -454,7 +457,8 @@ synchronized void markAsFailed(String reason, ActionListener listener) { persistStateToClusterState(newState, ActionListener.wrap( r -> listener.onResponse(null), e -> { - logger.error("[" + getTransformId() + "] failed to set task state as failed to cluster state.", e); + logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()), + e); listener.onFailure(e); } )); @@ -699,7 +703,9 @@ protected void onStart(long now, ActionListener listener) { }, failure -> { progress = null; - logger.warn("[" + transformId + "] unable to load progress information for task.", failure); + logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.", + transformId), + failure); super.onStart(now, listener); } )); @@ -900,7 +906,9 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p next.run(); }, statsExc -> { - logger.error("[" + transformConfig.getId() + "] updating stats of transform failed.", statsExc); + logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", + transformConfig.getId()), + statsExc); auditor.warning(getJobId(), "Failure updating stats of transform: " + statsExc.getMessage()); // for auto stop shutdown the task @@ -925,7 +933,7 @@ protected void onFailure(Exception exc) { handleFailure(exc); } catch (Exception e) { logger.error( - "[" + transformTask.getTransformId() + "] data frame transform encountered an unexpected internal exception: ", + new ParameterizedMessage("[{} data frame transform encountered an unexpected internal exception: ", transformId), e); } } @@ -1021,12 +1029,14 @@ protected void createCheckpoint(ActionListener lis ActionListener.wrap( putCheckPointResponse -> listener.onResponse(checkpoint), createCheckpointException -> { - logger.warn("[" + transformId + "] failed to create checkpoint.", createCheckpointException); + logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId), + createCheckpointException); listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)); } )), getCheckPointException -> { - logger.warn("[" + transformId + "] failed to retrieve checkpoint.", getCheckPointException); + logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId), + getCheckPointException); listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); } )); @@ -1042,7 +1052,9 @@ protected void sourceHasChanged(ActionListener hasChangedListener) { }, e -> { logger.warn( - "[" + transformId + "] failed to detect changes for data frame transform. Skipping update till next check.", + new ParameterizedMessage( + "[{}] failed to detect changes for data frame transform. Skipping update till next check.", + transformId), e); auditor.warning(transformId, "Failed to detect changes for data frame transform, skipping update till next check. Exception: " @@ -1058,7 +1070,9 @@ private boolean isIrrecoverableFailure(Exception e) { } synchronized void handleFailure(Exception e) { - logger.warn("[" + transformTask.getTransformId() + "] data frame transform encountered an exception: ", e); + logger.warn(new ParameterizedMessage("[{} data frame transform encountered an exception: ", + transformTask.getTransformId()), + e); if (handleCircuitBreakingException(e)) { return; } From f7080afc39b505fd2369e35cf51ddf46edb8c3ba Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 22 Aug 2019 08:12:54 -0500 Subject: [PATCH 3/3] fixing bracket closure --- .../xpack/dataframe/transforms/DataFrameTransformTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 3e80769dc448d..efec474e3b702 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -933,7 +933,7 @@ protected void onFailure(Exception exc) { handleFailure(exc); } catch (Exception e) { logger.error( - new ParameterizedMessage("[{} data frame transform encountered an unexpected internal exception: ", transformId), + new ParameterizedMessage("[{}] data frame transform encountered an unexpected internal exception: ", transformId), e); } } @@ -1070,7 +1070,7 @@ private boolean isIrrecoverableFailure(Exception e) { } synchronized void handleFailure(Exception e) { - logger.warn(new ParameterizedMessage("[{} data frame transform encountered an exception: ", + logger.warn(new ParameterizedMessage("[{}] data frame transform encountered an exception: ", transformTask.getTransformId()), e); if (handleCircuitBreakingException(e)) {