Skip to content

Commit 7c503ce

Browse files
authored
[ML Data Frame] Persist and restore checkpoint and position (#41942)
Persist and restore Data frame's current checkpoint and position
1 parent 8907dc9 commit 7c503ce

File tree

17 files changed

+288
-210
lines changed

17 files changed

+288
-210
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerTransformStats.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,6 @@ public void writeTo(StreamOutput out) throws IOException {
109109
out.writeString(transformId);
110110
}
111111

112-
/**
113-
* Get the persisted stats document name from the Data Frame Transformer Id.
114-
*
115-
* @return The id of document the where the transform stats are persisted
116-
*/
117-
public static String documentId(String transformId) {
118-
return NAME + "-" + transformId;
119-
}
120-
121112
@Nullable
122113
public String getTransformId() {
123114
return transformId;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323

2424
public class DataFrameTransformProgress implements Writeable, ToXContentObject {
2525

26-
private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
27-
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
28-
private static final String PERCENT_COMPLETE = "percent_complete";
26+
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
27+
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
28+
public static final String PERCENT_COMPLETE = "percent_complete";
2929

3030
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
3131
"data_frame_transform_progress",

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
4242
@Nullable
4343
private final String reason;
4444

45-
private static final ParseField TASK_STATE = new ParseField("task_state");
46-
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
47-
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
48-
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
49-
private static final ParseField REASON = new ParseField("reason");
50-
private static final ParseField PROGRESS = new ParseField("progress");
45+
public static final ParseField TASK_STATE = new ParseField("task_state");
46+
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
47+
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
48+
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
49+
public static final ParseField REASON = new ParseField("reason");
50+
public static final ParseField PROGRESS = new ParseField("progress");
5151

5252
@SuppressWarnings("unchecked")
5353
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.core.dataframe.transforms;
88

9+
import org.elasticsearch.common.Nullable;
910
import org.elasticsearch.common.ParseField;
1011
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.io.stream.StreamInput;
@@ -14,6 +15,7 @@
1415
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
1516
import org.elasticsearch.common.xcontent.ToXContentObject;
1617
import org.elasticsearch.common.xcontent.XContentBuilder;
18+
import org.elasticsearch.common.xcontent.XContentParser;
1719
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
1820
import org.elasticsearch.xpack.core.indexing.IndexerState;
1921

@@ -22,7 +24,7 @@
2224

2325
public class DataFrameTransformStateAndStats implements Writeable, ToXContentObject {
2426

25-
private static final String NAME = "data_frame_transform_state_and_stats";
27+
public static final String NAME = "data_frame_transform_state_and_stats";
2628
public static final ParseField STATE_FIELD = new ParseField("state");
2729
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
2830

@@ -47,6 +49,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
4749
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
4850
}
4951

52+
public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
53+
return PARSER.parse(parser, null);
54+
}
55+
5056
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
5157
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
5258
}
@@ -58,6 +64,15 @@ public static DataFrameTransformStateAndStats initialStateAndStats(String id, Da
5864
DataFrameTransformCheckpointingInfo.EMPTY);
5965
}
6066

67+
/**
68+
* Get the persisted state and stats document name from the Data Frame Transform Id.
69+
*
70+
* @return The id of document the where the transform stats are persisted
71+
*/
72+
public static String documentId(String transformId) {
73+
return NAME + "-" + transformId;
74+
}
75+
6176
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
6277
DataFrameTransformCheckpointingInfo checkpointingInfo) {
6378
this.id = Objects.requireNonNull(id);
@@ -73,13 +88,21 @@ public DataFrameTransformStateAndStats(StreamInput in) throws IOException {
7388
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
7489
}
7590

91+
@Nullable
92+
public String getTransformId() {
93+
return transformStats.getTransformId();
94+
}
95+
7696
@Override
7797
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
7898
builder.startObject();
7999
builder.field(DataFrameField.ID.getPreferredName(), id);
80100
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
81101
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
82102
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
103+
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
104+
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
105+
}
83106
builder.endObject();
84107
return builder;
85108
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import java.util.List;
1818
import java.util.concurrent.Executor;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.concurrent.atomic.AtomicReference;
2021

2122
/**
@@ -94,16 +95,21 @@ public synchronized IndexerState start() {
9495
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9596
*/
9697
public synchronized IndexerState stop() {
98+
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
9799
IndexerState currentState = state.updateAndGet(previousState -> {
98100
if (previousState == IndexerState.INDEXING) {
99101
return IndexerState.STOPPING;
100102
} else if (previousState == IndexerState.STARTED) {
101-
onStop();
103+
wasStartedAndSetStopped.set(true);
102104
return IndexerState.STOPPED;
103105
} else {
104106
return previousState;
105107
}
106108
});
109+
110+
if (wasStartedAndSetStopped.get()) {
111+
onStop();
112+
}
107113
return currentState;
108114
}
109115

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public static void removeIndices() throws Exception {
292292
wipeIndices();
293293
}
294294

295-
public void wipeDataFrameTransforms() throws IOException, InterruptedException {
295+
public void wipeDataFrameTransforms() throws IOException {
296296
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
297297
for (Map<String, Object> transformConfig : transformConfigs) {
298298
String transformId = (String) transformConfig.get("id");

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.elasticsearch.client.Response;
1111
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1212
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
13-
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
13+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
1414
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
1515
import org.junit.Before;
1616

@@ -72,7 +72,7 @@ public void testUsage() throws Exception {
7272
Request statsExistsRequest = new Request("GET",
7373
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
7474
INDEX_DOC_TYPE.getPreferredName() + ":" +
75-
DataFrameIndexerTransformStats.NAME);
75+
DataFrameTransformStateAndStats.NAME);
7676
// Verify that we have our two stats documents
7777
assertBusy(() -> {
7878
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
@@ -100,7 +100,6 @@ public void testUsage() throws Exception {
100100
expectedStats.merge(statName, statistic, Integer::sum);
101101
}
102102

103-
104103
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
105104

106105
usageAsMap = entityAsMap(usageResponse);
@@ -109,7 +108,8 @@ public void testUsage() throws Exception {
109108
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
110109
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
111110
for(String statName : PROVIDED_STATS) {
112-
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
111+
assertEquals("Incorrect stat " + statName,
112+
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
113113
}
114114
}
115115
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
3636
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
3737
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
38+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
3839
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
3940
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
4041

@@ -176,6 +177,7 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
176177

177178
for(String statName : PROVIDED_STATS) {
178179
Aggregation agg = searchResponse.getAggregations().get(statName);
180+
179181
if (agg instanceof NumericMetricsAggregation.SingleValue) {
180182
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
181183
} else {
@@ -197,14 +199,15 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
197199
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
198200
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
199201
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
200-
DataFrameIndexerTransformStats.NAME)));
202+
DataFrameTransformStateAndStats.NAME)));
201203

202204
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
203205
.setSize(0)
204206
.setQuery(queryBuilder);
205207

208+
final String path = DataFrameField.STATS_FIELD.getPreferredName() + ".";
206209
for(String statName : PROVIDED_STATS) {
207-
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
210+
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
208211
}
209212

210213
ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
@@ -213,6 +216,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
213216
logger.error("statistics summations search returned shard failures: {}",
214217
Arrays.toString(searchResponse.getShardFailures()));
215218
}
219+
216220
statsListener.onResponse(parseSearchAggs(searchResponse));
217221
},
218222
failure -> {

0 commit comments

Comments
 (0)