Skip to content

Commit 857e618

Browse files
committed
Fix delete_expired_data/nightly maintenance when many model snapshots need deleting (elastic#57041)
The queries performed by the expired data removers pull back entire documents when only a few fields are required. For ModelSnapshots in particular this is a problem as they contain quantiles which may be 100s of KB and the search size is set to 10,000. This change makes the search more efficient by only requesting the fields needed to work out which expired data should be deleted.
1 parent 0cfddb0 commit 857e618

File tree

9 files changed

+247
-77
lines changed

9 files changed

+247
-77
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
4646
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
4747
}
4848

49+
/**
50+
* Safely parses a string epoch representation to a Long
51+
*
52+
* Commonly this function is used for parsing Date fields from doc values
53+
* requested with the format "epoch_millis".
54+
*
55+
* Since nanosecond support was added epoch_millis timestamps may have a fractional component.
56+
* We discard this, taking just whole milliseconds. Arguably it would be better to retain the
57+
* precision here and let the downstream component decide whether it wants the accuracy, but
58+
* that makes it hard to pass around the value as a number. The double type doesn't have
59+
* enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
60+
* work, but that isn't supported by the JSON parser if the number gets round-tripped through
61+
* JSON. So String is really the only format that could be used, but the consumers of time
62+
* are expecting a number.
63+
*
64+
* @param epoch The epoch value as a string. This may contain a fractional component.
65+
* @return The epoch value.
66+
*/
67+
public static long parseToEpochMs(String epoch) {
68+
int dotPos = epoch.indexOf('.');
69+
if (dotPos == -1) {
70+
return Long.parseLong(epoch);
71+
} else if (dotPos > 0) {
72+
return Long.parseLong(epoch.substring(0, dotPos));
73+
} else {
74+
// The first character is '.' so round down to 0
75+
return 0L;
76+
}
77+
}
78+
4979
/**
5080
* First tries to parse the date first as a Long and convert that to an
5181
* epoch time. If the long number has more than 10 digits it is considered a

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public void testDateStringToEpoch() {
7272
assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500"));
7373
}
7474

75+
public void testParseToEpochMs() {
76+
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000"));
77+
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005"));
78+
assertEquals(0L, TimeUtils.parseToEpochMs(".005"));
79+
}
80+
7581
public void testCheckMultiple_GivenMultiples() {
7682
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo"));
7783
TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo"));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.common.util.set.Sets;
99
import org.elasticsearch.search.SearchHit;
10+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
1011

1112
import java.util.Collections;
1213
import java.util.Objects;
@@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) {
4445
return value;
4546
}
4647
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
47-
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
48-
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
49-
// precision here and let the downstream component decide whether it wants the accuracy, but
50-
// that makes it hard to pass around the value as a number. The double type doesn't have
51-
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
52-
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
53-
// JSON. So String is really the only format that could be used, but the ML consumers of time
54-
// are expecting a number.
55-
String strVal0 = (String) value[0];
56-
int dotPos = strVal0.indexOf('.');
57-
if (dotPos == -1) {
58-
value[0] = Long.parseLong(strVal0);
59-
} else if (dotPos > 0) {
60-
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
61-
} else {
62-
value[0] = 0L;
63-
}
48+
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
6449
} else if (value[0] instanceof Long == false) { // pre-6.0 field
6550
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
6651
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414
import org.elasticsearch.action.search.SearchResponse;
1515
import org.elasticsearch.action.support.ThreadedActionListener;
1616
import org.elasticsearch.client.OriginSettingClient;
17-
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
18-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
19-
import org.elasticsearch.common.xcontent.XContentFactory;
20-
import org.elasticsearch.common.xcontent.XContentParser;
21-
import org.elasticsearch.common.xcontent.XContentType;
2217
import org.elasticsearch.index.query.BoolQueryBuilder;
2318
import org.elasticsearch.index.query.QueryBuilder;
2419
import org.elasticsearch.index.query.QueryBuilders;
@@ -30,6 +25,7 @@
3025
import org.elasticsearch.search.SearchHits;
3126
import org.elasticsearch.search.builder.SearchSourceBuilder;
3227
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
3329
import org.elasticsearch.xpack.core.ml.job.config.Job;
3430
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
3531
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@@ -38,8 +34,6 @@
3834
import org.elasticsearch.xpack.core.ml.job.results.Result;
3935
import org.elasticsearch.xpack.ml.MachineLearning;
4036

41-
import java.io.IOException;
42-
import java.io.InputStream;
4337
import java.time.Clock;
4438
import java.time.Instant;
4539
import java.util.ArrayList;
@@ -85,6 +79,11 @@ public void remove(float requestsPerSec, ActionListener<Boolean> listener, Suppl
8579
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
8680
source.size(MAX_FORECASTS);
8781
source.trackTotalHits(true);
82+
source.fetchSource(false);
83+
source.docValueField(Job.ID.getPreferredName(), null);
84+
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
85+
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");
86+
8887

8988
// _doc is the most efficient sort order and will also disable scoring
9089
source.sort(ElasticsearchMappings.ES_DOC);
@@ -101,11 +100,9 @@ private void deleteForecasts(
101100
ActionListener<Boolean> listener,
102101
Supplier<Boolean> isTimedOutSupplier
103102
) {
104-
List<ForecastRequestStats> forecastsToDelete;
105-
try {
106-
forecastsToDelete = findForecastsToDelete(searchResponse);
107-
} catch (IOException e) {
108-
listener.onFailure(e);
103+
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
104+
if (forecastsToDelete.isEmpty()) {
105+
listener.onResponse(true);
109106
return;
110107
}
111108

@@ -117,7 +114,7 @@ private void deleteForecasts(
117114
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete)
118115
.setRequestsPerSecond(requestsPerSec)
119116
.setAbortOnVersionConflict(false);
120-
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
117+
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
121118
@Override
122119
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
123120
try {
@@ -138,39 +135,51 @@ public void onFailure(Exception e) {
138135
});
139136
}
140137

141-
private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
142-
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
138+
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
139+
List<JobForecastId> forecastsToDelete = new ArrayList<>();
143140

144141
SearchHits hits = searchResponse.getHits();
145142
if (hits.getTotalHits().value > MAX_FORECASTS) {
146143
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
147144
}
148145

149146
for (SearchHit hit : hits.getHits()) {
150-
try (InputStream stream = hit.getSourceRef().streamInput();
151-
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
152-
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
153-
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
154-
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
155-
forecastsToDelete.add(forecastRequestStats);
147+
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
148+
if (expiryTime == null) {
149+
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
150+
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
151+
continue;
152+
}
153+
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
154+
if (expiryMs < cutoffEpochMs) {
155+
JobForecastId idPair = new JobForecastId(
156+
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
157+
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));
158+
159+
if (idPair.hasNullValue() == false) {
160+
forecastsToDelete.add(idPair);
156161
}
162+
157163
}
164+
158165
}
159166
return forecastsToDelete;
160167
}
161168

162-
private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
169+
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
163170
DeleteByQueryRequest request = new DeleteByQueryRequest();
164171
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
165172

166173
request.indices(RESULTS_INDEX_PATTERN);
167174
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
168175
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
169176
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
170-
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
171-
boolQuery.should(QueryBuilders.boolQuery()
172-
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
173-
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
177+
for (JobForecastId jobForecastId : ids) {
178+
if (jobForecastId.hasNullValue() == false) {
179+
boolQuery.should(QueryBuilders.boolQuery()
180+
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
181+
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
182+
}
174183
}
175184
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
176185
request.setQuery(query);
@@ -180,4 +189,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
180189

181190
return request;
182191
}
192+
193+
private static class JobForecastId {
194+
private final String jobId;
195+
private final String forecastId;
196+
197+
private JobForecastId(String jobId, String forecastId) {
198+
this.jobId = jobId;
199+
this.forecastId = forecastId;
200+
}
201+
202+
boolean hasNullValue() {
203+
return jobId == null || forecastId == null;
204+
}
205+
}
183206
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.search.sort.SortBuilder;
2525
import org.elasticsearch.search.sort.SortOrder;
2626
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
2728
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2829
import org.elasticsearch.xpack.core.ml.job.config.Job;
2930
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -104,13 +105,16 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDe
104105
private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
105106
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
106107
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
107-
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
108+
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()))
109+
.filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName()));
108110

109111
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
110112
searchSourceBuilder.sort(sortBuilder);
111113
searchSourceBuilder.query(snapshotQuery);
112114
searchSourceBuilder.size(1);
113115
searchSourceBuilder.trackTotalHits(false);
116+
searchSourceBuilder.fetchSource(false);
117+
searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
114118

115119
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
116120
SearchRequest searchRequest = new SearchRequest(indexName);
@@ -124,8 +128,14 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener
124128
// no snapshots found
125129
listener.onResponse(null);
126130
} else {
127-
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
128-
listener.onResponse(snapshot.getTimestamp().getTime());
131+
String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName());
132+
if (timestamp == null) {
133+
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId());
134+
listener.onResponse(null);
135+
} else {
136+
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
137+
listener.onResponse(timestampMs);
138+
}
129139
}
130140
},
131141
listener::onFailure)
@@ -159,8 +169,15 @@ protected void removeDataBefore(
159169
.mustNot(activeSnapshotFilter)
160170
.mustNot(retainFilter);
161171

162-
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)
163-
.sort(ModelSnapshot.TIMESTAMP.getPreferredName()));
172+
SearchSourceBuilder source = new SearchSourceBuilder();
173+
source.query(query);
174+
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
175+
source.sort(ModelSnapshot.TIMESTAMP.getPreferredName());
176+
source.fetchSource(false);
177+
source.docValueField(Job.ID.getPreferredName(), null);
178+
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
179+
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
180+
searchRequest.source(source);
164181

165182
long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
166183
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
@@ -175,19 +192,29 @@ private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, lo
175192
public void onResponse(SearchResponse searchResponse) {
176193
long nextToKeepMs = deleteAllBeforeMs;
177194
try {
178-
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
195+
List<JobSnapshotId> snapshotIds = new ArrayList<>();
179196
for (SearchHit hit : searchResponse.getHits()) {
180-
ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef());
181-
long timestampMs = modelSnapshot.getTimestamp().getTime();
197+
String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName());
198+
if (timestamp == null) {
199+
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId());
200+
continue;
201+
}
202+
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
182203
if (timestampMs >= nextToKeepMs) {
183204
do {
184205
nextToKeepMs += MS_IN_ONE_DAY;
185206
} while (timestampMs >= nextToKeepMs);
186207
continue;
187208
}
188-
modelSnapshots.add(modelSnapshot);
209+
JobSnapshotId idPair = new JobSnapshotId(
210+
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
211+
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));
212+
213+
if (idPair.hasNullValue() == false) {
214+
snapshotIds.add(idPair);
215+
}
189216
}
190-
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
217+
deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
191218
} catch (Exception e) {
192219
onFailure(e);
193220
}
@@ -200,15 +227,15 @@ public void onFailure(Exception e) {
200227
};
201228
}
202229

203-
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
230+
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
204231
if (modelSnapshotIterator.hasNext() == false) {
205232
listener.onResponse(true);
206233
return;
207234
}
208-
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
209-
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
210-
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
211-
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
235+
JobSnapshotId idPair = modelSnapshotIterator.next();
236+
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
237+
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
238+
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() {
212239
@Override
213240
public void onResponse(AcknowledgedResponse response) {
214241
try {
@@ -220,9 +247,23 @@ public void onResponse(AcknowledgedResponse response) {
220247

221248
@Override
222249
public void onFailure(Exception e) {
223-
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
224-
+ modelSnapshot.getSnapshotId() + "]", e));
250+
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
251+
+ idPair.snapshotId + "]", e));
225252
}
226253
});
227254
}
255+
256+
static class JobSnapshotId {
257+
private final String jobId;
258+
private final String snapshotId;
259+
260+
JobSnapshotId(String jobId, String snapshotId) {
261+
this.jobId = jobId;
262+
this.snapshotId = snapshotId;
263+
}
264+
265+
boolean hasNullValue() {
266+
return jobId == null || snapshotId == null;
267+
}
268+
}
228269
}

0 commit comments

Comments
 (0)