diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java index 01667f8a48160..aee26a018304e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java @@ -46,6 +46,36 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } + /** + * Safely parses a string epoch representation to a Long + * + * Commonly this function is used for parsing Date fields from doc values + * requested with the format "epoch_millis". + * + * Since nanosecond support was added epoch_millis timestamps may have a fractional component. + * We discard this, taking just whole milliseconds. Arguably it would be better to retain the + * precision here and let the downstream component decide whether it wants the accuracy, but + * that makes it hard to pass around the value as a number. The double type doesn't have + * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would + * work, but that isn't supported by the JSON parser if the number gets round-tripped through + * JSON. So String is really the only format that could be used, but the consumers of time + * are expecting a number. + * + * @param epoch The epoch value as a string. This may contain a fractional component. + * @return The epoch value. + */ + public static long parseToEpochMs(String epoch) { + int dotPos = epoch.indexOf('.'); + if (dotPos == -1) { + return Long.parseLong(epoch); + } else if (dotPos > 0) { + return Long.parseLong(epoch.substring(0, dotPos)); + } else { + // The first character is '.' so round down to 0 + return 0L; + } + } + /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java index e122202b5fa6c..0dcb245c78006 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/time/TimeUtilsTests.java @@ -72,6 +72,12 @@ public void testDateStringToEpoch() { assertEquals(1477058573500L, TimeUtils.dateStringToEpoch("1477058573500")); } + public void testParseToEpochMs() { + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); + assertEquals(0L, TimeUtils.parseToEpochMs(".005")); + } + public void testCheckMultiple_GivenMultiples() { TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.SECONDS, new ParseField("foo")); TimeUtils.checkMultiple(TimeValue.timeValueHours(1), TimeUnit.MINUTES, new ParseField("foo")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java index 24412fe6eb77c..9436dddde78db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/extractor/TimeField.java @@ -7,6 +7,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import java.util.Collections; import java.util.Objects; @@ -44,23 +45,7 @@ public Object[] value(SearchHit hit) { return value; } if (value[0] instanceof String) { // doc_value field with the epoch_millis format - // Since nanosecond support was added epoch_millis timestamps may have a fractional component. - // We discard this, taking just whole milliseconds. Arguably it would be better to retain the - // precision here and let the downstream component decide whether it wants the accuracy, but - // that makes it hard to pass around the value as a number. The double type doesn't have - // enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would - // work, but that isn't supported by the JSON parser if the number gets round-tripped through - // JSON. So String is really the only format that could be used, but the ML consumers of time - // are expecting a number. - String strVal0 = (String) value[0]; - int dotPos = strVal0.indexOf('.'); - if (dotPos == -1) { - value[0] = Long.parseLong(strVal0); - } else if (dotPos > 0) { - value[0] = Long.parseLong(strVal0.substring(0, dotPos)); - } else { - value[0] = 0L; - } + value[0] = TimeUtils.parseToEpochMs((String)value[0]); } else if (value[0] instanceof Long == false) { // pre-6.0 field throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index f4f85dc704360..dbe2dc4dea7ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.OriginSettingClient; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -30,6 +25,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -38,8 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; -import java.io.InputStream; import java.time.Clock; import java.time.Instant; import java.util.ArrayList; @@ -85,6 +79,11 @@ public void remove(float requestsPerSec, ActionListener listener, Suppl .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); source.trackTotalHits(true); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); + // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -101,11 +100,9 @@ private void deleteForecasts( ActionListener listener, Supplier isTimedOutSupplier ) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -138,8 +135,8 @@ public void onFailure(Exception e) { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits().value > MAX_FORECASTS) { @@ -147,19 +144,29 @@ private List findForecastsToDelete(SearchResponse searchRe } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (expiryTime == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); } + } + } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); @@ -167,10 +174,12 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -180,4 +189,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec return request; } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 241527325dd82..de7206d42606f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -104,13 +105,16 @@ void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener listener) { SortBuilder sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC); QueryBuilder snapshotQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())); + .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName())) + .filter(QueryBuilders.existsQuery(ModelSnapshot.TIMESTAMP.getPreferredName())); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(sortBuilder); searchSourceBuilder.query(snapshotQuery); searchSourceBuilder.size(1); searchSourceBuilder.trackTotalHits(false); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequest searchRequest = new SearchRequest(indexName); @@ -124,8 +128,14 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener listener // no snapshots found listener.onResponse(null); } else { - ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef()); - listener.onResponse(snapshot.getTimestamp().getTime()); + String timestamp = stringFieldValueOrNull(hits[0], ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hits[0].getId()); + listener.onResponse(null); + } else { + long timestampMs = TimeUtils.parseToEpochMs(timestamp); + listener.onResponse(timestampMs); + } } }, listener::onFailure) @@ -159,8 +169,15 @@ protected void removeDataBefore( .mustNot(activeSnapshotFilter) .mustNot(retainFilter); - searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE) - .sort(ModelSnapshot.TIMESTAMP.getPreferredName())); + SearchSourceBuilder source = new SearchSourceBuilder(); + source.query(query); + source.size(MODEL_SNAPSHOT_SEARCH_SIZE); + source.sort(ModelSnapshot.TIMESTAMP.getPreferredName()); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null); + source.docValueField(ModelSnapshot.TIMESTAMP.getPreferredName(), "epoch_millis"); + searchRequest.source(source); long deleteAllBeforeMs = (job.getModelSnapshotRetentionDays() == null) ? 0 : latestTimeMs - TimeValue.timeValueDays(job.getModelSnapshotRetentionDays()).getMillis(); @@ -175,19 +192,29 @@ private ActionListener expiredSnapshotsListener(String jobId, lo public void onResponse(SearchResponse searchResponse) { long nextToKeepMs = deleteAllBeforeMs; try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - ModelSnapshot modelSnapshot = ModelSnapshot.fromJson(hit.getSourceRef()); - long timestampMs = modelSnapshot.getTimestamp().getTime(); + String timestamp = stringFieldValueOrNull(hit, ModelSnapshot.TIMESTAMP.getPreferredName()); + if (timestamp == null) { + LOGGER.warn("Model snapshot document [{}] has a null timestamp field", hit.getId()); + continue; + } + long timestampMs = TimeUtils.parseToEpochMs(timestamp); if (timestampMs >= nextToKeepMs) { do { nextToKeepMs += MS_IN_ONE_DAY; } while (timestampMs >= nextToKeepMs); continue; } - modelSnapshots.add(modelSnapshot); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -200,14 +227,14 @@ public void onFailure(Exception e) { }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { @@ -220,9 +247,23 @@ public void onResponse(AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); } }); } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 34a5335da8c76..1468514569938 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 432d8ffa3e41e..a2e573ac855c2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -93,6 +93,14 @@ static SearchResponse createSearchResponse(List toXContent return createSearchResponse(toXContents, toXContents.size()); } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), + new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + @SuppressWarnings("unchecked") static void givenJobs(Client client, List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 9ee4cb177aafd..7a644a5fe6503 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -13,15 +13,19 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -91,17 +95,20 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException Date now = new Date(); Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted))); + SearchHit snapshotToBeDeleted = createModelSnapshotQueryHit("job-1", "old-snapshot", eightDaysAndOneMsAgo); - ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1))); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); + + searchResponses.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshotToBeDeleted))); + + SearchHit snapshot2_1 = createModelSnapshotQueryHit("job-1", "snapshots-1_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); @@ -178,18 +185,18 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio Date now = new Date(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); // It needs to be strictly more than 7 days before the most recent snapshot, hence the extra millisecond Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); - List snapshots1JobSnapshots = Arrays.asList( + List snapshots1JobSnapshots = Arrays.asList( snapshot1_1, - createModelSnapshot("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2", eightDaysAndOneMsAgo)); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(snapshots1JobSnapshots)); - ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1", eightDaysAndOneMsAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); @@ -208,12 +215,12 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio } @SuppressWarnings("unchecked") - public void testCalcCutoffEpochMs() throws IOException { + public void testCalcCutoffEpochMs() { List searchResponses = new ArrayList<>(); Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis()); - ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo); - searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1))); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("job-1", "newest-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1_1))); givenClientRequests(searchResponses, true, true); @@ -226,6 +233,17 @@ public void testCalcCutoffEpochMs() throws IOException { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -245,6 +263,15 @@ private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build(); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId, Date date) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + String dateAsString = Long.valueOf(date.getTime()).toString(); + hitBuilder.addField(ModelSnapshot.TIMESTAMP.getPreferredName(), Collections.singletonList(dateAsString)); + return hitBuilder.build(); + } + private void givenClientRequestsSucceed(List searchResponses) { givenClientRequests(searchResponses, true, true); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 0000000000000..465109e6934ec --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (requestsPerSecond, listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +}