Skip to content

Commit f5324b4

Browse files
author
David Roberts
committed
[ML-DataFrame] Combine task_state and indexer_state in _stats
This commit replaces task_state and indexer_state in the data frame _stats output with a single top level state that combines the two. It is defined as: - failed if what's currently reported as task_state is failed - stopped if there is no persistent task - Otherwise what's currently reported as indexer_state Closes #45201
1 parent 5508aba commit f5324b4

File tree

30 files changed

+292
-293
lines changed

30 files changed

+292
-293
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
package org.elasticsearch.client.dataframe.transforms;
2121

22-
import org.elasticsearch.client.core.IndexerState;
2322
import org.elasticsearch.common.ParseField;
2423
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25-
import org.elasticsearch.common.xcontent.ObjectParser;
2624
import org.elasticsearch.common.xcontent.XContentParser;
2725

2826
import java.io.IOException;
@@ -33,16 +31,14 @@
3331
public class DataFrameTransformCheckpointStats {
3432

3533
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
36-
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
3734
public static final ParseField POSITION = new ParseField("position");
3835
public static final ParseField CHECKPOINT_PROGRESS = new ParseField("checkpoint_progress");
3936
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
4037
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
4138

42-
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L);
39+
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, 0L, 0L);
4340

4441
private final long checkpoint;
45-
private final IndexerState indexerState;
4642
private final DataFrameIndexerPosition position;
4743
private final DataFrameTransformProgress checkpointProgress;
4844
private final long timestampMillis;
@@ -51,19 +47,16 @@ public class DataFrameTransformCheckpointStats {
5147
public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
5248
"data_frame_transform_checkpoint_stats", true, args -> {
5349
long checkpoint = args[0] == null ? 0L : (Long) args[0];
54-
IndexerState indexerState = (IndexerState) args[1];
55-
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2];
56-
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3];
57-
long timestamp = args[4] == null ? 0L : (Long) args[4];
58-
long timeUpperBound = args[5] == null ? 0L : (Long) args[5];
50+
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[1];
51+
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[2];
52+
long timestamp = args[3] == null ? 0L : (Long) args[3];
53+
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];
5954

60-
return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound);
55+
return new DataFrameTransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
6156
});
6257

6358
static {
6459
LENIENT_PARSER.declareLong(optionalConstructorArg(), CHECKPOINT);
65-
LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE,
66-
ObjectParser.ValueType.STRING);
6760
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, POSITION);
6861
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, CHECKPOINT_PROGRESS);
6962
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIMESTAMP_MILLIS);
@@ -74,11 +67,10 @@ public static DataFrameTransformCheckpointStats fromXContent(XContentParser pars
7467
return LENIENT_PARSER.parse(parser, null);
7568
}
7669

77-
public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState,
78-
final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress,
79-
final long timestampMillis, final long timeUpperBoundMillis) {
70+
public DataFrameTransformCheckpointStats(final long checkpoint, final DataFrameIndexerPosition position,
71+
final DataFrameTransformProgress checkpointProgress, final long timestampMillis,
72+
final long timeUpperBoundMillis) {
8073
this.checkpoint = checkpoint;
81-
this.indexerState = indexerState;
8274
this.position = position;
8375
this.checkpointProgress = checkpointProgress;
8476
this.timestampMillis = timestampMillis;
@@ -89,10 +81,6 @@ public long getCheckpoint() {
8981
return checkpoint;
9082
}
9183

92-
public IndexerState getIndexerState() {
93-
return indexerState;
94-
}
95-
9684
public DataFrameIndexerPosition getPosition() {
9785
return position;
9886
}
@@ -111,7 +99,7 @@ public long getTimeUpperBoundMillis() {
11199

112100
@Override
113101
public int hashCode() {
114-
return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
102+
return Objects.hash(checkpoint, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
115103
}
116104

117105
@Override
@@ -127,7 +115,6 @@ public boolean equals(Object other) {
127115
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
128116

129117
return this.checkpoint == that.checkpoint
130-
&& Objects.equals(this.indexerState, that.indexerState)
131118
&& Objects.equals(this.position, that.position)
132119
&& Objects.equals(this.checkpointProgress, that.checkpointProgress)
133120
&& this.timestampMillis == that.timestampMillis

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStats.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.xcontent.XContentParser;
2626

2727
import java.io.IOException;
28+
import java.util.Locale;
2829
import java.util.Objects;
2930

3031
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@@ -33,20 +34,20 @@
3334
public class DataFrameTransformStats {
3435

3536
public static final ParseField ID = new ParseField("id");
36-
public static final ParseField TASK_STATE_FIELD = new ParseField("task_state");
37+
public static final ParseField STATE_FIELD = new ParseField("state");
3738
public static final ParseField REASON_FIELD = new ParseField("reason");
3839
public static final ParseField NODE_FIELD = new ParseField("node");
3940
public static final ParseField STATS_FIELD = new ParseField("stats");
4041
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
4142

4243
public static final ConstructingObjectParser<DataFrameTransformStats, Void> PARSER = new ConstructingObjectParser<>(
43-
"data_frame_transform_state_and_stats_info", true,
44-
a -> new DataFrameTransformStats((String) a[0], (DataFrameTransformTaskState) a[1], (String) a[2],
45-
(NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], (DataFrameTransformCheckpointingInfo) a[5]));
44+
"data_frame_transform_state_and_stats_info", true,
45+
a -> new DataFrameTransformStats((String) a[0], (State) a[1], (String) a[2],
46+
(NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], (DataFrameTransformCheckpointingInfo) a[5]));
4647

4748
static {
4849
PARSER.declareString(constructorArg(), ID);
49-
PARSER.declareField(optionalConstructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE_FIELD,
50+
PARSER.declareField(optionalConstructorArg(), p -> State.fromString(p.text()), STATE_FIELD,
5051
ObjectParser.ValueType.STRING);
5152
PARSER.declareString(optionalConstructorArg(), REASON_FIELD);
5253
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT);
@@ -61,16 +62,15 @@ public static DataFrameTransformStats fromXContent(XContentParser parser) throws
6162

6263
private final String id;
6364
private final String reason;
64-
private final DataFrameTransformTaskState taskState;
65+
private final State state;
6566
private final NodeAttributes node;
6667
private final DataFrameIndexerTransformStats indexerStats;
6768
private final DataFrameTransformCheckpointingInfo checkpointingInfo;
6869

69-
public DataFrameTransformStats(String id, DataFrameTransformTaskState taskState, String reason, NodeAttributes node,
70-
DataFrameIndexerTransformStats stats,
70+
public DataFrameTransformStats(String id, State state, String reason, NodeAttributes node, DataFrameIndexerTransformStats stats,
7171
DataFrameTransformCheckpointingInfo checkpointingInfo) {
7272
this.id = id;
73-
this.taskState = taskState;
73+
this.state = state;
7474
this.reason = reason;
7575
this.node = node;
7676
this.indexerStats = stats;
@@ -81,8 +81,8 @@ public String getId() {
8181
return id;
8282
}
8383

84-
public DataFrameTransformTaskState getTaskState() {
85-
return taskState;
84+
public State getState() {
85+
return state;
8686
}
8787

8888
public String getReason() {
@@ -103,7 +103,7 @@ public DataFrameTransformCheckpointingInfo getCheckpointingInfo() {
103103

104104
@Override
105105
public int hashCode() {
106-
return Objects.hash(id, taskState, reason, node, indexerStats, checkpointingInfo);
106+
return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo);
107107
}
108108

109109
@Override
@@ -119,10 +119,23 @@ public boolean equals(Object other) {
119119
DataFrameTransformStats that = (DataFrameTransformStats) other;
120120

121121
return Objects.equals(this.id, that.id)
122-
&& Objects.equals(this.taskState, that.taskState)
122+
&& Objects.equals(this.state, that.state)
123123
&& Objects.equals(this.reason, that.reason)
124124
&& Objects.equals(this.node, that.node)
125125
&& Objects.equals(this.indexerStats, that.indexerStats)
126126
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
127127
}
128+
129+
public enum State {
130+
131+
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
132+
133+
public static State fromString(String name) {
134+
return valueOf(name.trim().toUpperCase(Locale.ROOT));
135+
}
136+
137+
public String value() {
138+
return name().toLowerCase(Locale.ROOT);
139+
}
140+
}
128141
}

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
4242
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
4343
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
44-
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
4544
import org.elasticsearch.client.dataframe.transforms.DestConfig;
4645
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
4746
import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
@@ -273,10 +272,11 @@ public void testStartStop() throws IOException {
273272
GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
274273
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
275274
assertThat(statsResponse.getTransformsStats(), hasSize(1));
276-
DataFrameTransformTaskState taskState = statsResponse.getTransformsStats().get(0).getTaskState();
275+
DataFrameTransformStats.State taskState = statsResponse.getTransformsStats().get(0).getState();
277276

278277
// Since we are non-continuous, the transform could auto-stop between being started earlier and us gathering the statistics
279-
assertThat(taskState, is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED)));
278+
assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
279+
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));
280280

281281
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
282282
StopDataFrameTransformResponse stopResponse =
@@ -288,8 +288,8 @@ public void testStartStop() throws IOException {
288288
// Calling stop with wait_for_completion assures that we will be in the `STOPPED` state for the transform task
289289
statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
290290
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
291-
taskState = statsResponse.getTransformsStats().get(0).getTaskState();
292-
assertThat(taskState, is(DataFrameTransformTaskState.STOPPED));
291+
taskState = statsResponse.getTransformsStats().get(0).getState();
292+
assertThat(taskState, is(DataFrameTransformStats.State.STOPPED));
293293
}
294294

295295
@SuppressWarnings("unchecked")
@@ -369,7 +369,7 @@ public void testGetStats() throws Exception {
369369

370370
assertEquals(1, statsResponse.getTransformsStats().size());
371371
DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0);
372-
assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTaskState());
372+
assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState());
373373

374374
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
375375
assertEquals(zeroIndexerStats, stats.getIndexerStats());
@@ -384,8 +384,8 @@ public void testGetStats() throws Exception {
384384
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
385385
DataFrameTransformStats stateAndStats = response.getTransformsStats().get(0);
386386
assertNotEquals(zeroIndexerStats, stateAndStats.getIndexerStats());
387-
assertThat(stateAndStats.getTaskState(),
388-
is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED)));
387+
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
388+
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));
389389
assertThat(stateAndStats.getReason(), is(nullValue()));
390390
});
391391
}

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.client.dataframe.transforms;
2121

22-
import org.elasticsearch.client.core.IndexerState;
2322
import org.elasticsearch.common.xcontent.XContentBuilder;
2423
import org.elasticsearch.test.ESTestCase;
2524

@@ -41,7 +40,6 @@ public void testFromXContent() throws IOException {
4140

4241
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
4342
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
44-
randomBoolean() ? null : randomFrom(IndexerState.values()),
4543
randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
4644
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
4745
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));
@@ -50,9 +48,6 @@ public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpoi
5048
public static void toXContent(DataFrameTransformCheckpointStats stats, XContentBuilder builder) throws IOException {
5149
builder.startObject();
5250
builder.field(DataFrameTransformCheckpointStats.CHECKPOINT.getPreferredName(), stats.getCheckpoint());
53-
if (stats.getIndexerState() != null) {
54-
builder.field(DataFrameTransformCheckpointStats.INDEXER_STATE.getPreferredName(), stats.getIndexerState().value());
55-
}
5651
if (stats.getPosition() != null) {
5752
builder.field(DataFrameTransformCheckpointStats.POSITION.getPreferredName());
5853
DataFrameIndexerPositionTests.toXContent(stats.getPosition(), builder);

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStatsTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void testFromXContent() throws IOException {
4141

4242
public static DataFrameTransformStats randomInstance() {
4343
return new DataFrameTransformStats(randomAlphaOfLength(10),
44-
randomBoolean() ? null : randomFrom(DataFrameTransformTaskState.values()),
44+
randomBoolean() ? null : randomFrom(DataFrameTransformStats.State.values()),
4545
randomBoolean() ? null : randomAlphaOfLength(100),
4646
randomBoolean() ? null : NodeAttributesTests.createRandom(),
4747
DataFrameIndexerTransformStatsTests.randomStats(),
@@ -51,9 +51,9 @@ public static DataFrameTransformStats randomInstance() {
5151
public static void toXContent(DataFrameTransformStats stats, XContentBuilder builder) throws IOException {
5252
builder.startObject();
5353
builder.field(DataFrameTransformStats.ID.getPreferredName(), stats.getId());
54-
if (stats.getTaskState() != null) {
55-
builder.field(DataFrameTransformStats.TASK_STATE_FIELD.getPreferredName(),
56-
stats.getTaskState().value());
54+
if (stats.getState() != null) {
55+
builder.field(DataFrameTransformStats.STATE_FIELD.getPreferredName(),
56+
stats.getState().value());
5757
}
5858
if (stats.getReason() != null) {
5959
builder.field(DataFrameTransformStats.REASON_FIELD.getPreferredName(), stats.getReason());

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.common.xcontent.XContentParser;
2323
import org.elasticsearch.client.AbstractHlrcXContentTestCase;
2424
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
25-
import org.elasticsearch.xpack.core.indexing.IndexerState;
2625

2726
import java.io.IOException;
2827
import java.util.function.Predicate;
@@ -34,7 +33,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent
3433
public static DataFrameTransformCheckpointStats fromHlrc(
3534
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
3635
return new DataFrameTransformCheckpointStats(instance.getCheckpoint(),
37-
(instance.getIndexerState() != null) ? IndexerState.fromString(instance.getIndexerState().value()) : null,
3836
DataFrameIndexerPositionTests.fromHlrc(instance.getPosition()),
3937
DataFrameTransformProgressTests.fromHlrc(instance.getCheckpointProgress()),
4038
instance.getTimestampMillis(),
@@ -55,7 +53,6 @@ public DataFrameTransformCheckpointStats convertHlrcToInternal(
5553

5654
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
5755
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
58-
randomBoolean() ? null : randomFrom(IndexerState.values()),
5956
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
6057
randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(),
6158
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));

0 commit comments

Comments
 (0)