Skip to content

Commit 89d65cc

Browse files
committed
[hotfix] Add debug logging to the states of the AdaptiveScheduler
This commit adds debug log statements to the states of the AdaptiveScheduler to log whenever we ignore a global failure.
1 parent bebe10f commit 89d65cc

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,14 +1034,24 @@ private static int getCumulativeParallelism(VertexParallelism potentialNewParall
10341034

10351035
@Override
10361036
public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
1037+
1038+
@Nullable
1039+
final Throwable optionalFailure =
1040+
archivedExecutionGraph.getFailureInfo() != null
1041+
? archivedExecutionGraph.getFailureInfo().getException()
1042+
: null;
1043+
LOG.info(
1044+
"Job {} reached terminal state {}.",
1045+
archivedExecutionGraph.getJobID(),
1046+
archivedExecutionGraph.getState(),
1047+
optionalFailure);
1048+
10371049
if (jobStatusListener != null) {
10381050
jobStatusListener.jobStatusChanges(
10391051
jobInformation.getJobID(),
10401052
archivedExecutionGraph.getState(),
10411053
archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
1042-
archivedExecutionGraph.getFailureInfo() != null
1043-
? archivedExecutionGraph.getFailureInfo().getException()
1044-
: null);
1054+
optionalFailure);
10451055
}
10461056

10471057
jobTerminationFuture.complete(archivedExecutionGraph.getState());

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public void cancel() {
5656

5757
@Override
5858
public void handleGlobalFailure(Throwable cause) {
59-
// ignore global failures
59+
getLogger()
60+
.debug("Ignored global failure because we are already canceling the job.", cause);
6061
}
6162

6263
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void cancel() {
5858

5959
@Override
6060
public void handleGlobalFailure(Throwable cause) {
61-
// nothing to do since we are already failing
61+
getLogger().debug("Ignored global failure because we are already failing the job.", cause);
6262
}
6363

6464
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ public ArchivedExecutionGraph getJob() {
5454
}
5555

5656
@Override
57-
public void handleGlobalFailure(Throwable cause) {}
57+
public void handleGlobalFailure(Throwable cause) {
58+
logger.debug(
59+
"Ignore global failure because we are in state {}.",
60+
getClass().getSimpleName(),
61+
cause);
62+
}
5863

5964
@Override
6065
public Logger getLogger() {

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public void cancel() {
7777

7878
@Override
7979
public void handleGlobalFailure(Throwable cause) {
80-
// don't do anything
80+
getLogger()
81+
.debug("Ignored global failure because we are already restarting the job.", cause);
8182
}
8283

8384
@Override

0 commit comments

Comments
 (0)