diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java index 3d47aeff1b8b5..ee03e7660ef87 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -238,5 +239,9 @@ public SearchHit getHit() { public boolean shouldSkip() { return values == null; } + + public int getChecksum() { + return Arrays.hashCode(values); + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java index 57c2b44f769c4..c3622b49dab82 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java @@ -109,7 +109,10 @@ static ExtractedFields detectExtractedFields(FieldCapabilitiesResponse fieldCapa Set fields = fieldCapabilitiesResponse.get().keySet(); fields.removeAll(IGNORE_FIELDS); removeFieldsWithIncompatibleTypes(fields, fieldCapabilitiesResponse); - ExtractedFields extractedFields = ExtractedFields.build(new ArrayList<>(fields), Collections.emptySet(), fieldCapabilitiesResponse) + List sortedFields = new ArrayList<>(fields); + // We sort the fields to ensure the checksum for each document is deterministic + Collections.sort(sortedFields); + ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse) .filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); if (extractedFields.getAllFields().isEmpty()) { throw ExceptionsHelper.badRequestException("No compatible fields could be detected"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java index bf079348c7e55..2b55e8c648d1d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java @@ -78,8 +78,8 @@ private void processData(String jobId, DataFrameDataExtractor dataExtractor, Ana } private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { - // The extra field is the control field (should be an empty string) - String[] record = new String[dataExtractor.getFieldNames().size() + 1]; + // The extra fields are for the doc hash and the control field (should be an empty string) + String[] record = new String[dataExtractor.getFieldNames().size() + 2]; // The value of the control field should be an empty string for data frame rows record[record.length - 1] = ""; @@ -90,6 +90,7 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces if (row.shouldSkip() == false) { String[] rowValues = row.getValues(); System.arraycopy(rowValues, 0, record, 0, rowValues.length); + record[record.length - 2] = String.valueOf(row.getChecksum()); process.writeRecord(record); } } @@ -99,11 +100,16 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { List fieldNames = dataExtractor.getFieldNames(); - String[] headerRecord = new String[fieldNames.size() + 1]; + + // We add 2 extra fields, both named dot: + // - the document hash + // - the control message + String[] headerRecord = new String[fieldNames.size() + 2]; for (int i = 0; i < fieldNames.size(); i++) { headerRecord[i] = fieldNames.get(i); } - // The field name of the control field is dot + + headerRecord[headerRecord.length - 2] = "."; headerRecord[headerRecord.length - 1] = "."; process.writeRecord(headerRecord); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java index 1f9ef71da8fb1..3b34537a56e81 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java @@ -17,27 +17,27 @@ public class AnalyticsResult implements ToXContentObject { public static final ParseField TYPE = new ParseField("analytics_result"); - public static final ParseField ID_HASH = new ParseField("id_hash"); + public static final ParseField CHECKSUM = new ParseField("checksum"); public static final ParseField RESULTS = new ParseField("results"); static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(), - a -> new AnalyticsResult((String) a[0], (Map) a[1])); + a -> new AnalyticsResult((Integer) a[0], (Map) a[1])); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_HASH); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), CHECKSUM); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS); } - private final String idHash; + private final int checksum; private final Map results; - public AnalyticsResult(String idHash, Map results) { - this.idHash = Objects.requireNonNull(idHash); + public AnalyticsResult(int checksum, Map results) { + this.checksum = Objects.requireNonNull(checksum); this.results = Objects.requireNonNull(results); } - public String getIdHash() { - return idHash; + public int getChecksum() { + return checksum; } public Map getResults() { @@ -47,7 +47,7 @@ public Map getResults() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(ID_HASH.getPreferredName(), idHash); + builder.field(CHECKSUM.getPreferredName(), checksum); builder.field(RESULTS.getPreferredName(), results); builder.endObject(); return builder; @@ -63,11 +63,11 @@ public boolean equals(Object other) { } AnalyticsResult that = (AnalyticsResult) other; - return Objects.equals(idHash, that.idHash) && Objects.equals(results, that.results); + return checksum == that.checksum && Objects.equals(results, that.results); } @Override public int hashCode() { - return Objects.hash(idHash, results); + return Objects.hash(checksum, results); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java index bdb1526b1b78a..0dbbf1b8b22d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java @@ -96,6 +96,8 @@ private void joinCurrentResults() { continue; } AnalyticsResult result = currentResults.get(i); + checkChecksumsMatch(row, result); + SearchHit hit = row.getHit(); Map source = new LinkedHashMap(hit.getSourceAsMap()); source.putAll(result.getResults()); @@ -112,4 +114,14 @@ private void joinCurrentResults() { } } } + + private void checkChecksumsMatch(DataFrameDataExtractor.Row row, AnalyticsResult result) { + if (row.getChecksum() != result.getChecksum()) { + String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; "; + msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; "; + msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. "; + msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable."; + throw new IllegalStateException(msg); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java index 3943e7fd7924f..0b16bdb715bb3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcessFactory.java @@ -45,8 +45,8 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessCon ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, true, true, false, false); - // The extra 1 is the control field - int numberOfFields = analyticsProcessConfig.cols() + 1; + // The extra 2 are for the checksum and the control field + int numberOfFields = analyticsProcessConfig.cols() + 2; createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactoryTests.java index 1a43b2893baef..efbc19563cc4d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactoryTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -89,6 +91,28 @@ public void testDetectExtractedFields_GivenIgnoredField() { assertThat(e.getMessage(), equalTo("No compatible fields could be detected")); } + public void testDetectExtractedFields_ShouldSortFieldsAlphabetically() { + int fieldCount = randomIntBetween(10, 20); + List fields = new ArrayList<>(); + for (int i = 0; i < fieldCount; i++) { + fields.add(randomAlphaOfLength(20)); + } + List sortedFields = new ArrayList<>(fields); + Collections.sort(sortedFields); + + MockFieldCapsResponseBuilder mockFieldCapsResponseBuilder = new MockFieldCapsResponseBuilder(); + for (String field : fields) { + mockFieldCapsResponseBuilder.addAggregatableField(field, "float"); + } + FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build(); + + ExtractedFields extractedFields = DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities); + + List extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName) + .collect(Collectors.toList()); + assertThat(extractedFieldNames, equalTo(sortedFields)); + } + private static class MockFieldCapsResponseBuilder { private final Map> fieldCaps = new HashMap<>(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java index 3d71b4d430e2b..e4de63908a5f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java @@ -63,13 +63,13 @@ public void testProcess_GivenSingleRowAndResult() throws IOException { String dataDoc = "{\"f_1\": \"foo\", \"f_2\": 42.0}"; String[] dataValues = {"42.0"}; - DataFrameDataExtractor.Row row = newRow(newHit("1", dataDoc), dataValues); + DataFrameDataExtractor.Row row = newRow(newHit(dataDoc), dataValues, 1); givenSingleDataFrameBatch(Arrays.asList(row)); Map resultFields = new HashMap<>(); resultFields.put("a", "1"); resultFields.put("b", "2"); - AnalyticsResult result = new AnalyticsResult("some_hash", resultFields); + AnalyticsResult result = new AnalyticsResult(1, resultFields); givenProcessResults(Arrays.asList(result)); AnalyticsResultProcessor resultProcessor = createResultProcessor(); @@ -90,6 +90,28 @@ public void testProcess_GivenSingleRowAndResult() throws IOException { assertThat(indexedDocSource.get("b"), equalTo("2")); } + public void testProcess_GivenSingleRowAndResultWithMismatchingIdHash() throws IOException { + givenClientHasNoFailures(); + + String dataDoc = "{\"f_1\": \"foo\", \"f_2\": 42.0}"; + String[] dataValues = {"42.0"}; + DataFrameDataExtractor.Row row = newRow(newHit(dataDoc), dataValues, 1); + givenSingleDataFrameBatch(Arrays.asList(row)); + + Map resultFields = new HashMap<>(); + resultFields.put("a", "1"); + resultFields.put("b", "2"); + AnalyticsResult result = new AnalyticsResult(2, resultFields); + givenProcessResults(Arrays.asList(result)); + + AnalyticsResultProcessor resultProcessor = createResultProcessor(); + + resultProcessor.process(process); + resultProcessor.awaitForCompletion(); + + verifyNoMoreInteractions(client); + } + private void givenProcessResults(List results) { when(process.readAnalyticsResults()).thenReturn(results.iterator()); } @@ -99,16 +121,17 @@ private void givenSingleDataFrameBatch(List batch) t when(dataExtractor.next()).thenReturn(Optional.of(batch)).thenReturn(Optional.empty()); } - private static SearchHit newHit(String id, String json) { - SearchHit hit = new SearchHit(42, id, new Text("doc"), Collections.emptyMap()); + private static SearchHit newHit(String json) { + SearchHit hit = new SearchHit(randomInt(), randomAlphaOfLength(10), new Text("doc"), Collections.emptyMap()); hit.sourceRef(new BytesArray(json)); return hit; } - private static DataFrameDataExtractor.Row newRow(SearchHit hit, String[] values) { + private static DataFrameDataExtractor.Row newRow(SearchHit hit, String[] values, int checksum) { DataFrameDataExtractor.Row row = mock(DataFrameDataExtractor.Row.class); when(row.getHit()).thenReturn(hit); when(row.getValues()).thenReturn(values); + when(row.getChecksum()).thenReturn(checksum); return row; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java index 6250a96cd3284..c243e4c871d78 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java @@ -16,7 +16,7 @@ public class AnalyticsResultTests extends AbstractXContentTestCase results = new HashMap<>(); int resultsSize = randomIntBetween(1, 10); for (int i = 0; i < resultsSize; i++) { @@ -24,7 +24,7 @@ protected AnalyticsResult createTestInstance() { Object resultValue = randomBoolean() ? randomAlphaOfLength(20) : randomDouble(); results.put(resultField, resultValue); } - return new AnalyticsResult(idHash, results); + return new AnalyticsResult(checksum, results); } @Override