Skip to content

Commit a69f90a

Browse files
committed
Don't get source
1 parent e5becf8 commit a69f90a

File tree

9 files changed

+220
-73
lines changed

9 files changed

+220
-73
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,19 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
6161
* JSON. So String is really the only format that could be used, but the consumers of time
6262
* are expecting a number.
6363
*
64-
* @param epoch The
64+
* @param epoch The epoch value as a string. This may contain a fractional component.
6565
* @return The epoch value.
6666
*/
67-
public static Long parseToEpochMs(String epoch) {
67+
public static long parseToEpochMs(String epoch) {
6868
int dotPos = epoch.indexOf('.');
69-
long epochMs = 0L;
7069
if (dotPos == -1) {
71-
epochMs = Long.parseLong(epoch);
70+
return Long.parseLong(epoch);
7271
} else if (dotPos > 0) {
73-
epochMs = Long.parseLong(epoch.substring(0, dotPos));
72+
return Long.parseLong(epoch.substring(0, dotPos));
73+
} else {
74+
// The first character is '.' so round down to 0
75+
return 0L;
7476
}
75-
// else the first character is '.' so round down to 0
76-
77-
return epochMs;
7877
}
7978

8079
/**

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ public void testDateStringToEpoch() {
7373
}
7474

7575
public void testParseToEpochMs() {
76-
assertEquals(Long.valueOf(1462096800000L), TimeUtils.parseToEpochMs("1462096800000"));
77-
assertEquals(Long.valueOf(1462096800000L), TimeUtils.parseToEpochMs("1462096800000.005"));
78-
assertEquals(Long.valueOf(0L), TimeUtils.parseToEpochMs(".005"));
76+
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000"));
77+
assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005"));
78+
assertEquals(0L, TimeUtils.parseToEpochMs(".005"));
7979
}
8080

8181
public void testCheckMultiple_GivenMultiples() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public Object[] value(SearchHit hit) {
4545
return value;
4646
}
4747
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
48-
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
48+
value[0] = TimeUtils.parseToEpochMs((String)value[0]);
4949
} else if (value[0] instanceof Long == false) { // pre-6.0 field
5050
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
5151
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414
import org.elasticsearch.action.search.SearchResponse;
1515
import org.elasticsearch.action.support.ThreadedActionListener;
1616
import org.elasticsearch.client.OriginSettingClient;
17-
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
18-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
19-
import org.elasticsearch.common.xcontent.XContentFactory;
20-
import org.elasticsearch.common.xcontent.XContentParser;
21-
import org.elasticsearch.common.xcontent.XContentType;
2217
import org.elasticsearch.index.query.BoolQueryBuilder;
2318
import org.elasticsearch.index.query.QueryBuilder;
2419
import org.elasticsearch.index.query.QueryBuilders;
@@ -30,6 +25,7 @@
3025
import org.elasticsearch.search.SearchHits;
3126
import org.elasticsearch.search.builder.SearchSourceBuilder;
3227
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
3329
import org.elasticsearch.xpack.core.ml.job.config.Job;
3430
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
3531
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@@ -38,8 +34,6 @@
3834
import org.elasticsearch.xpack.core.ml.job.results.Result;
3935
import org.elasticsearch.xpack.ml.MachineLearning;
4036

41-
import java.io.IOException;
42-
import java.io.InputStream;
4337
import java.time.Clock;
4438
import java.time.Instant;
4539
import java.util.ArrayList;
@@ -84,7 +78,11 @@ public void remove(float requestsPerSec, ActionListener<Boolean> listener, Suppl
8478
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
8579
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
8680
source.size(MAX_FORECASTS);
87-
source.trackTotalHits(true);
81+
source.fetchSource(false);
82+
source.docValueField(Job.ID.getPreferredName(), null);
83+
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
84+
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");
85+
8886

8987
// _doc is the most efficient sort order and will also disable scoring
9088
source.sort(ElasticsearchMappings.ES_DOC);
@@ -101,11 +99,9 @@ private void deleteForecasts(
10199
ActionListener<Boolean> listener,
102100
Supplier<Boolean> isTimedOutSupplier
103101
) {
104-
List<ForecastRequestStats> forecastsToDelete;
105-
try {
106-
forecastsToDelete = findForecastsToDelete(searchResponse);
107-
} catch (IOException e) {
108-
listener.onFailure(e);
102+
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
103+
if (forecastsToDelete.isEmpty()) {
104+
listener.onResponse(true);
109105
return;
110106
}
111107

@@ -117,7 +113,7 @@ private void deleteForecasts(
117113
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete)
118114
.setRequestsPerSecond(requestsPerSec)
119115
.setAbortOnVersionConflict(false);
120-
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
116+
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
121117
@Override
122118
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
123119
try {
@@ -138,39 +134,51 @@ public void onFailure(Exception e) {
138134
});
139135
}
140136

141-
private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
142-
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
137+
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
138+
List<JobForecastId> forecastsToDelete = new ArrayList<>();
143139

144140
SearchHits hits = searchResponse.getHits();
145141
if (hits.getTotalHits().value > MAX_FORECASTS) {
146142
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
147143
}
148144

149145
for (SearchHit hit : hits.getHits()) {
150-
try (InputStream stream = hit.getSourceRef().streamInput();
151-
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
152-
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
153-
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
154-
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
155-
forecastsToDelete.add(forecastRequestStats);
146+
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
147+
if (expiryTime == null) {
148+
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
149+
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
150+
continue;
151+
}
152+
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
153+
if (expiryMs < cutoffEpochMs) {
154+
JobForecastId idPair = new JobForecastId(
155+
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
156+
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));
157+
158+
if (idPair.hasNullValue() == false) {
159+
forecastsToDelete.add(idPair);
156160
}
161+
157162
}
163+
158164
}
159165
return forecastsToDelete;
160166
}
161167

162-
private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
168+
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
163169
DeleteByQueryRequest request = new DeleteByQueryRequest();
164170
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
165171

166172
request.indices(RESULTS_INDEX_PATTERN);
167173
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
168174
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
169175
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
170-
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
171-
boolQuery.should(QueryBuilders.boolQuery()
172-
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
173-
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
176+
for (JobForecastId jobForecastId : ids) {
177+
if (jobForecastId.hasNullValue() == false) {
178+
boolQuery.should(QueryBuilders.boolQuery()
179+
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
180+
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
181+
}
174182
}
175183
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
176184
request.setQuery(query);
@@ -180,4 +188,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
180188

181189
return request;
182190
}
191+
192+
private static class JobForecastId {
193+
private final String jobId;
194+
private final String forecastId;
195+
196+
private JobForecastId(String jobId, String forecastId) {
197+
this.jobId = jobId;
198+
this.forecastId = forecastId;
199+
}
200+
201+
boolean hasNullValue() {
202+
return jobId == null || forecastId == null;
203+
}
204+
}
183205
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.search.sort.SortBuilder;
2525
import org.elasticsearch.search.sort.SortOrder;
2626
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
2728
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
2829
import org.elasticsearch.xpack.core.ml.job.config.Job;
2930
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -104,13 +105,16 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<CutoffDe
104105
private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
105106
SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
106107
QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
107-
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
108+
.filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()))
109+
.filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName()));
108110

109111
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
110112
searchSourceBuilder.sort(sortBuilder);
111113
searchSourceBuilder.query(snapshotQuery);
112114
searchSourceBuilder.size(1);
113115
searchSourceBuilder.trackTotalHits(false);
116+
searchSourceBuilder.fetchSource(false);
117+
searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
114118

115119
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
116120
SearchRequest searchRequest = new SearchRequest(indexName);
@@ -124,8 +128,14 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener
124128
// no snapshots found
125129
listener.onResponse(null);
126130
} else {
127-
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
128-
listener.onResponse(snapshot.getTimestamp().getTime());
131+
String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName());
132+
if (timestamp == null) {
133+
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId());
134+
listener.onResponse(null);
135+
} else {
136+
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
137+
listener.onResponse(timestampMs);
138+
}
129139
}
130140
},
131141
listener::onFailure)
@@ -159,8 +169,15 @@ protected void removeDataBefore(
159169
.mustNot(activeSnapshotFilter)
160170
.mustNot(retainFilter);
161171

162-
searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE)
163-
.sort(ModelSnapshot.TIMESTAMP.getPreferredName()));
172+
SearchSourceBuilder source = new SearchSourceBuilder();
173+
source.query(query);
174+
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
175+
source.sort(ModelSnapshot.TIMESTAMP.getPreferredName());
176+
source.fetchSource(false);
177+
source.docValueField(Job.ID.getPreferredName(), null);
178+
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
179+
source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis");
180+
searchRequest.source(source);
164181

165182
long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null)
166183
? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis();
@@ -175,19 +192,29 @@ private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, lo
175192
public void onResponse(SearchResponse searchResponse) {
176193
long nextToKeepMs = deleteAllBeforeMs;
177194
try {
178-
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
195+
List<JobSnapshotId> snapshotIds = new ArrayList<>();
179196
for (SearchHit hit : searchResponse.getHits()) {
180-
ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef());
181-
long timestampMs = modelSnapshot.getTimestamp().getTime();
197+
String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName());
198+
if (timestamp == null) {
199+
LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId());
200+
continue;
201+
}
202+
long timestampMs = TimeUtils.parseToEpochMs(timestamp);
182203
if (timestampMs >= nextToKeepMs) {
183204
do {
184205
nextToKeepMs += MS_IN_ONE_DAY;
185206
} while (timestampMs >= nextToKeepMs);
186207
continue;
187208
}
188-
modelSnapshots.add(modelSnapshot);
209+
JobSnapshotId idPair = new JobSnapshotId(
210+
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
211+
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));
212+
213+
if (idPair.hasNullValue() == false) {
214+
snapshotIds.add(idPair);
215+
}
189216
}
190-
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
217+
deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
191218
} catch (Exception e) {
192219
onFailure(e);
193220
}
@@ -200,15 +227,15 @@ public void onFailure(Exception e) {
200227
};
201228
}
202229

203-
private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
230+
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
204231
if (modelSnapshotIterator.hasNext() == false) {
205232
listener.onResponse(true);
206233
return;
207234
}
208-
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
209-
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
210-
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
211-
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
235+
JobSnapshotId idPair = modelSnapshotIterator.next();
236+
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
237+
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
238+
client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<>() {
212239
@Override
213240
public void onResponse(AcknowledgedResponse response) {
214241
try {
@@ -220,9 +247,23 @@ public void onResponse(AcknowledgedResponse response) {
220247

221248
@Override
222249
public void onFailure(Exception e) {
223-
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
224-
+ modelSnapshot.getSnapshotId() + "]", e));
250+
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
251+
+ idPair.snapshotId + "]", e));
225252
}
226253
});
227254
}
255+
256+
static class JobSnapshotId {
257+
private final String jobId;
258+
private final String snapshotId;
259+
260+
JobSnapshotId(String jobId, String snapshotId) {
261+
this.jobId = jobId;
262+
this.snapshotId = snapshotId;
263+
}
264+
265+
boolean hasNullValue() {
266+
return jobId == null || snapshotId == null;
267+
}
268+
}
228269
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,29 @@
66
package org.elasticsearch.xpack.ml.job.retention;
77

88
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.common.document.DocumentField;
10+
import org.elasticsearch.search.SearchHit;
911

1012
import java.util.function.Supplier;
1113

1214
public interface MlDataRemover {
1315
void remove(float requestsPerSecond, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
16+
17+
/**
18+
* Extract {@code fieldName} from {@code hit} and if it is a string
19+
* return the string else {@code null}.
20+
* @param hit The search hit
21+
* @param fieldName Field to find
22+
* @return value iff the docfield is present and it is a string. Otherwise {@code null}
23+
*/
24+
default String stringFieldValueOrNull(SearchHit hit, String fieldName) {
25+
DocumentField docField = hit.field(fieldName);
26+
if (docField != null) {
27+
Object value = docField.getValue();
28+
if (value instanceof String) {
29+
return (String)value;
30+
}
31+
}
32+
return null;
33+
}
1434
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ static SearchResponse createSearchResponse(List<? extends ToXContent> toXContent
9393
return createSearchResponse(toXContents, toXContents.size());
9494
}
9595

96+
static SearchResponse createSearchResponseFromHits(List<SearchHit> hits) {
97+
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}),
98+
new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f);
99+
SearchResponse searchResponse = mock(SearchResponse.class);
100+
when(searchResponse.getHits()).thenReturn(searchHits);
101+
return searchResponse;
102+
}
103+
96104
@SuppressWarnings("unchecked")
97105
static void givenJobs(Client client, List<Job> jobs) throws IOException {
98106
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);

0 commit comments

Comments
 (0)