Skip to content

Commit 9782aaa

Browse files
authored
ML: Add reason field in JobTaskState (#38029)
* ML: adding reason to job failure status * marking reason as nullable * Update AutodetectProcessManager.java
1 parent ed460c2 commit 9782aaa

File tree

14 files changed

+89
-53
lines changed

14 files changed

+89
-53
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.job.config;
77

8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.common.Nullable;
810
import org.elasticsearch.common.ParseField;
911
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -20,16 +22,19 @@
2022
import java.util.Objects;
2123

2224
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
25+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
2326

2427
public class JobTaskState implements PersistentTaskState {
2528

2629
public static final String NAME = MlTasks.JOB_TASK_NAME;
2730

2831
private static ParseField STATE = new ParseField("state");
2932
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
33+
private static ParseField REASON = new ParseField("reason");
3034

3135
private static final ConstructingObjectParser<JobTaskState, Void> PARSER =
32-
new ConstructingObjectParser<>(NAME, true, args -> new JobTaskState((JobState) args[0], (Long) args[1]));
36+
new ConstructingObjectParser<>(NAME, true,
37+
args -> new JobTaskState((JobState) args[0], (Long) args[1], (String) args[2]));
3338

3439
static {
3540
PARSER.declareField(constructorArg(), p -> {
@@ -39,6 +44,7 @@ public class JobTaskState implements PersistentTaskState {
3944
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
4045
}, STATE, ObjectParser.ValueType.STRING);
4146
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
47+
PARSER.declareString(optionalConstructorArg(), REASON);
4248
}
4349

4450
public static JobTaskState fromXContent(XContentParser parser) {
@@ -51,21 +57,33 @@ public static JobTaskState fromXContent(XContentParser parser) {
5157

5258
private final JobState state;
5359
private final long allocationId;
60+
private final String reason;
5461

55-
public JobTaskState(JobState state, long allocationId) {
62+
public JobTaskState(JobState state, long allocationId, @Nullable String reason) {
5663
this.state = Objects.requireNonNull(state);
5764
this.allocationId = allocationId;
65+
this.reason = reason;
5866
}
5967

6068
public JobTaskState(StreamInput in) throws IOException {
6169
state = JobState.fromStream(in);
6270
allocationId = in.readLong();
71+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
72+
reason = in.readOptionalString();
73+
} else {
74+
reason = null;
75+
}
6376
}
6477

6578
public JobState getState() {
6679
return state;
6780
}
6881

82+
@Nullable
83+
public String getReason() {
84+
return reason;
85+
}
86+
6987
/**
7088
* The job state stores the allocation ID at the time it was last set.
7189
* This method compares the allocation ID in the state with the allocation
@@ -90,6 +108,9 @@ public String getWriteableName() {
90108
public void writeTo(StreamOutput out) throws IOException {
91109
state.writeTo(out);
92110
out.writeLong(allocationId);
111+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
112+
out.writeOptionalString(reason);
113+
}
93114
}
94115

95116
@Override
@@ -102,6 +123,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
102123
builder.startObject();
103124
builder.field(STATE.getPreferredName(), state.value());
104125
builder.field(ALLOCATION_ID.getPreferredName(), allocationId);
126+
if (reason != null) {
127+
builder.field(REASON.getPreferredName(), reason);
128+
}
105129
builder.endObject();
106130
return builder;
107131
}
@@ -112,11 +136,12 @@ public boolean equals(Object o) {
112136
if (o == null || getClass() != o.getClass()) return false;
113137
JobTaskState that = (JobTaskState) o;
114138
return state == that.state &&
115-
Objects.equals(allocationId, that.allocationId);
139+
Objects.equals(allocationId, that.allocationId) &&
140+
Objects.equals(reason, that.reason);
116141
}
117142

118143
@Override
119144
public int hashCode() {
120-
return Objects.hash(state, allocationId);
145+
return Objects.hash(state, allocationId, reason);
121146
}
122147
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void testGetJobState() {
3333
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
3434
assertEquals(JobState.OPENING, MlTasks.getJobState("foo", tasksBuilder.build()));
3535

36-
tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId()));
36+
tasksBuilder.updateTaskState(MlTasks.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId(), null));
3737
assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build()));
3838
}
3939

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List
266266
@Override
267267
protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask,
268268
ActionListener<CloseJobAction.Response> listener) {
269-
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId());
269+
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId(), "close job (api)");
270270
jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
271271
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
272272
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
1010

1111
import java.util.concurrent.ExecutorService;
12+
import java.util.function.Consumer;
1213

1314
/**
1415
* Factory interface for creating implementations of {@link AutodetectProcess}
@@ -28,5 +29,5 @@ public interface AutodetectProcessFactory {
2829
AutodetectProcess createAutodetectProcess(Job job,
2930
AutodetectParams autodetectParams,
3031
ExecutorService executorService,
31-
Runnable onProcessCrash);
32+
Consumer<String> onProcessCrash);
3233
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -475,14 +475,14 @@ protected void doRun() {
475475
.kill();
476476
processByAllocation.remove(jobTask.getAllocationId());
477477
} finally {
478-
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
478+
setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true));
479479
}
480480
}
481481
}
482482
});
483483
}, e1 -> {
484484
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
485-
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
485+
setJobState(jobTask, JobState.FAILED, e1.getMessage(), e2 -> closeHandler.accept(e1, true));
486486
});
487487
},
488488
e -> closeHandler.accept(e, true)
@@ -601,16 +601,16 @@ private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectPara
601601
auditor.info(jobId, msg);
602602
}
603603

604-
private Runnable onProcessCrash(JobTask jobTask) {
605-
return () -> {
604+
private Consumer<String> onProcessCrash(JobTask jobTask) {
605+
return (reason) -> {
606606
ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId());
607607
if (processContext != null) {
608608
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
609609
if (communicator != null) {
610610
communicator.destroyCategorizationAnalyzer();
611611
}
612612
}
613-
setJobState(jobTask, JobState.FAILED);
613+
setJobState(jobTask, JobState.FAILED, reason);
614614
try {
615615
removeTmpStorage(jobTask.getJobId());
616616
} catch (IOException e) {
@@ -666,7 +666,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
666666
throw e;
667667
}
668668
logger.warn("[" + jobId + "] Exception closing autodetect process", e);
669-
setJobState(jobTask, JobState.FAILED);
669+
setJobState(jobTask, JobState.FAILED, e.getMessage());
670670
throw ExceptionsHelper.serverError("Exception closing autodetect process", e);
671671
} finally {
672672
// to ensure the contract that multiple simultaneous close calls for the same job wait until
@@ -720,8 +720,8 @@ public Optional<Duration> jobOpenTime(JobTask jobTask) {
720720
return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
721721
}
722722

723-
void setJobState(JobTask jobTask, JobState state) {
724-
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
723+
void setJobState(JobTask jobTask, JobState state, String reason) {
724+
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
725725
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
726726
@Override
727727
public void onResponse(PersistentTask<?> persistentTask) {
@@ -735,27 +735,31 @@ public void onFailure(Exception e) {
735735
});
736736
}
737737

738-
void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
739-
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
738+
void setJobState(JobTask jobTask, JobState state) {
739+
setJobState(jobTask, state, null);
740+
}
741+
742+
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
743+
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason);
740744
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
741-
@Override
742-
public void onResponse(PersistentTask<?> persistentTask) {
743-
try {
744-
handler.accept(null);
745-
} catch (IOException e1) {
746-
logger.warn("Error while delegating response", e1);
747-
}
748-
}
745+
@Override
746+
public void onResponse(PersistentTask<?> persistentTask) {
747+
try {
748+
handler.accept(null);
749+
} catch (IOException e1) {
750+
logger.warn("Error while delegating response", e1);
751+
}
752+
}
749753

750-
@Override
751-
public void onFailure(Exception e) {
752-
try {
753-
handler.accept(e);
754-
} catch (IOException e1) {
755-
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
756-
}
757-
}
758-
});
754+
@Override
755+
public void onFailure(Exception e) {
756+
try {
757+
handler.accept(e);
758+
} catch (IOException e1) {
759+
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
760+
}
761+
}
762+
});
759763
}
760764

761765
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.file.Path;
2929
import java.util.Iterator;
3030
import java.util.List;
31+
import java.util.function.Consumer;
3132

3233
/**
3334
* Autodetect process using native code.
@@ -42,7 +43,7 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
4243

4344
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
4445
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
45-
AutodetectResultsParser resultsParser, Runnable onProcessCrash) {
46+
AutodetectResultsParser resultsParser, Consumer<String> onProcessCrash) {
4647
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
4748
this.resultsParser = resultsParser;
4849
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Objects;
3232
import java.util.concurrent.ExecutorService;
33+
import java.util.function.Consumer;
3334

3435
public class NativeAutodetectProcessFactory implements AutodetectProcessFactory {
3536

@@ -56,7 +57,7 @@ public NativeAutodetectProcessFactory(Environment env, Settings settings, Native
5657
public AutodetectProcess createAutodetectProcess(Job job,
5758
AutodetectParams params,
5859
ExecutorService executorService,
59-
Runnable onProcessCrash) {
60+
Consumer<String> onProcessCrash) {
6061
List<Path> filesToDelete = new ArrayList<>();
6162
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(),
6263
true, false, true, true, params.modelSnapshot() != null,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
2020
private static final String NAME = "normalizer";
2121

2222
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) {
23-
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), () -> {});
23+
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
2424
}
2525

2626
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import java.time.Duration;
2424
import java.time.ZonedDateTime;
2525
import java.util.List;
26+
import java.util.Locale;
2627
import java.util.Objects;
2728
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.Future;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.TimeoutException;
33+
import java.util.function.Consumer;
3234

3335
/**
3436
* Abstract class for implementing a native process.
@@ -48,7 +50,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
4850
private final ZonedDateTime startTime;
4951
private final int numberOfFields;
5052
private final List<Path> filesToDelete;
51-
private final Runnable onProcessCrash;
53+
private final Consumer<String> onProcessCrash;
5254
private volatile Future<?> logTailFuture;
5355
private volatile Future<?> stateProcessorFuture;
5456
private volatile boolean processCloseInitiated;
@@ -57,7 +59,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
5759

5860
protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
5961
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
60-
Runnable onProcessCrash) {
62+
Consumer<String> onProcessCrash) {
6163
this.jobId = jobId;
6264
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
6365
this.processInStream = new BufferedOutputStream(processInStream);
@@ -90,8 +92,9 @@ public void start(ExecutorService executorService) {
9092
// by a user or other process (e.g. the Linux OOM killer)
9193

9294
String errors = cppLogHandler.getErrors();
93-
LOGGER.error("[{}] {} process stopped unexpectedly: {}", jobId, getName(), errors);
94-
onProcessCrash.run();
95+
String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors);
96+
LOGGER.error(fullError);
97+
onProcessCrash.accept(fullError);
9598
}
9699
}
97100
});

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ public static void addJobTask(String jobId, String nodeId, JobState jobState, Pe
552552
new Assignment(nodeId, "test assignment"));
553553
if (jobState != null) {
554554
builder.updateTaskState(MlTasks.jobTaskId(jobId),
555-
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0)));
555+
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0), null));
556556
}
557557
}
558558

0 commit comments

Comments
 (0)