Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -238,5 +239,9 @@ public SearchHit getHit() {
public boolean shouldSkip() {
return values == null;
}

public int getChecksum() {
return Arrays.hashCode(values);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ static ExtractedFields detectExtractedFields(FieldCapabilitiesResponse fieldCapa
Set<String> fields = fieldCapabilitiesResponse.get().keySet();
fields.removeAll(IGNORE_FIELDS);
removeFieldsWithIncompatibleTypes(fields, fieldCapabilitiesResponse);
ExtractedFields extractedFields = ExtractedFields.build(new ArrayList<>(fields), Collections.emptySet(), fieldCapabilitiesResponse)
List<String> sortedFields = new ArrayList<>(fields);
// We sort the fields to ensure the checksum for each document is deterministic
Collections.sort(sortedFields);
ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse)
.filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
if (extractedFields.getAllFields().isEmpty()) {
throw ExceptionsHelper.badRequestException("No compatible fields could be detected");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private void processData(String jobId, DataFrameDataExtractor dataExtractor, Ana
}

private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException {
// The extra field is the control field (should be an empty string)
String[] record = new String[dataExtractor.getFieldNames().size() + 1];
// The extra fields are for the doc hash and the control field (should be an empty string)
String[] record = new String[dataExtractor.getFieldNames().size() + 2];
// The value of the control field should be an empty string for data frame rows
record[record.length - 1] = "";

Expand All @@ -90,6 +90,7 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces
if (row.shouldSkip() == false) {
String[] rowValues = row.getValues();
System.arraycopy(rowValues, 0, record, 0, rowValues.length);
record[record.length - 2] = String.valueOf(row.getChecksum());
process.writeRecord(record);
}
}
Expand All @@ -99,11 +100,16 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces

private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException {
List<String> fieldNames = dataExtractor.getFieldNames();
String[] headerRecord = new String[fieldNames.size() + 1];

// We add 2 extra fields, both named dot:
// - the document hash
// - the control message
String[] headerRecord = new String[fieldNames.size() + 2];
for (int i = 0; i < fieldNames.size(); i++) {
headerRecord[i] = fieldNames.get(i);
}
// The field name of the control field is dot

headerRecord[headerRecord.length - 2] = ".";
headerRecord[headerRecord.length - 1] = ".";
process.writeRecord(headerRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@
public class AnalyticsResult implements ToXContentObject {

public static final ParseField TYPE = new ParseField("analytics_result");
public static final ParseField ID_HASH = new ParseField("id_hash");
public static final ParseField CHECKSUM = new ParseField("checksum");
public static final ParseField RESULTS = new ParseField("results");

static final ConstructingObjectParser<AnalyticsResult, Void> PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(),
a -> new AnalyticsResult((String) a[0], (Map<String, Object>) a[1]));
a -> new AnalyticsResult((Integer) a[0], (Map<String, Object>) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_HASH);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), CHECKSUM);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS);
}

private final String idHash;
private final int checksum;
private final Map<String, Object> results;

public AnalyticsResult(String idHash, Map<String, Object> results) {
this.idHash = Objects.requireNonNull(idHash);
public AnalyticsResult(int checksum, Map<String, Object> results) {
this.checksum = Objects.requireNonNull(checksum);
this.results = Objects.requireNonNull(results);
}

public String getIdHash() {
return idHash;
public int getChecksum() {
return checksum;
}

public Map<String, Object> getResults() {
Expand All @@ -47,7 +47,7 @@ public Map<String, Object> getResults() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID_HASH.getPreferredName(), idHash);
builder.field(CHECKSUM.getPreferredName(), checksum);
builder.field(RESULTS.getPreferredName(), results);
builder.endObject();
return builder;
Expand All @@ -63,11 +63,11 @@ public boolean equals(Object other) {
}

AnalyticsResult that = (AnalyticsResult) other;
return Objects.equals(idHash, that.idHash) && Objects.equals(results, that.results);
return checksum == that.checksum && Objects.equals(results, that.results);
}

@Override
public int hashCode() {
return Objects.hash(idHash, results);
return Objects.hash(checksum, results);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ private void joinCurrentResults() {
continue;
}
AnalyticsResult result = currentResults.get(i);
checkChecksumsMatch(row, result);

SearchHit hit = row.getHit();
Map<String, Object> source = new LinkedHashMap(hit.getSourceAsMap());
source.putAll(result.getResults());
Expand All @@ -112,4 +114,14 @@ private void joinCurrentResults() {
}
}
}

private void checkChecksumsMatch(DataFrameDataExtractor.Row row, AnalyticsResult result) {
if (row.getChecksum() != result.getChecksum()) {
String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ";
msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
throw new IllegalStateException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessCon
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
true, false, true, true, false, false);

// The extra 1 is the control field
int numberOfFields = analyticsProcessConfig.cols() + 1;
// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;

createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -89,6 +91,28 @@ public void testDetectExtractedFields_GivenIgnoredField() {
assertThat(e.getMessage(), equalTo("No compatible fields could be detected"));
}

public void testDetectExtractedFields_ShouldSortFieldsAlphabetically() {
int fieldCount = randomIntBetween(10, 20);
List<String> fields = new ArrayList<>();
for (int i = 0; i < fieldCount; i++) {
fields.add(randomAlphaOfLength(20));
}
List<String> sortedFields = new ArrayList<>(fields);
Collections.sort(sortedFields);

MockFieldCapsResponseBuilder mockFieldCapsResponseBuilder = new MockFieldCapsResponseBuilder();
for (String field : fields) {
mockFieldCapsResponseBuilder.addAggregatableField(field, "float");
}
FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build();

ExtractedFields extractedFields = DataFrameDataExtractorFactory.detectExtractedFields(fieldCapabilities);

List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
.collect(Collectors.toList());
assertThat(extractedFieldNames, equalTo(sortedFields));
}

private static class MockFieldCapsResponseBuilder {

private final Map<String, Map<String, FieldCapabilities>> fieldCaps = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public void testProcess_GivenSingleRowAndResult() throws IOException {

String dataDoc = "{\"f_1\": \"foo\", \"f_2\": 42.0}";
String[] dataValues = {"42.0"};
DataFrameDataExtractor.Row row = newRow(newHit("1", dataDoc), dataValues);
DataFrameDataExtractor.Row row = newRow(newHit(dataDoc), dataValues, 1);
givenSingleDataFrameBatch(Arrays.asList(row));

Map<String, Object> resultFields = new HashMap<>();
resultFields.put("a", "1");
resultFields.put("b", "2");
AnalyticsResult result = new AnalyticsResult("some_hash", resultFields);
AnalyticsResult result = new AnalyticsResult(1, resultFields);
givenProcessResults(Arrays.asList(result));

AnalyticsResultProcessor resultProcessor = createResultProcessor();
Expand All @@ -90,6 +90,28 @@ public void testProcess_GivenSingleRowAndResult() throws IOException {
assertThat(indexedDocSource.get("b"), equalTo("2"));
}

public void testProcess_GivenSingleRowAndResultWithMismatchingIdHash() throws IOException {
givenClientHasNoFailures();

String dataDoc = "{\"f_1\": \"foo\", \"f_2\": 42.0}";
String[] dataValues = {"42.0"};
DataFrameDataExtractor.Row row = newRow(newHit(dataDoc), dataValues, 1);
givenSingleDataFrameBatch(Arrays.asList(row));

Map<String, Object> resultFields = new HashMap<>();
resultFields.put("a", "1");
resultFields.put("b", "2");
AnalyticsResult result = new AnalyticsResult(2, resultFields);
givenProcessResults(Arrays.asList(result));

AnalyticsResultProcessor resultProcessor = createResultProcessor();

resultProcessor.process(process);
resultProcessor.awaitForCompletion();

verifyNoMoreInteractions(client);
}

private void givenProcessResults(List<AnalyticsResult> results) {
when(process.readAnalyticsResults()).thenReturn(results.iterator());
}
Expand All @@ -99,16 +121,17 @@ private void givenSingleDataFrameBatch(List<DataFrameDataExtractor.Row> batch) t
when(dataExtractor.next()).thenReturn(Optional.of(batch)).thenReturn(Optional.empty());
}

private static SearchHit newHit(String id, String json) {
SearchHit hit = new SearchHit(42, id, new Text("doc"), Collections.emptyMap());
private static SearchHit newHit(String json) {
SearchHit hit = new SearchHit(randomInt(), randomAlphaOfLength(10), new Text("doc"), Collections.emptyMap());
hit.sourceRef(new BytesArray(json));
return hit;
}

private static DataFrameDataExtractor.Row newRow(SearchHit hit, String[] values) {
private static DataFrameDataExtractor.Row newRow(SearchHit hit, String[] values, int checksum) {
DataFrameDataExtractor.Row row = mock(DataFrameDataExtractor.Row.class);
when(row.getHit()).thenReturn(hit);
when(row.getValues()).thenReturn(values);
when(row.getChecksum()).thenReturn(checksum);
return row;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ public class AnalyticsResultTests extends AbstractXContentTestCase<AnalyticsResu

@Override
protected AnalyticsResult createTestInstance() {
String idHash = randomAlphaOfLength(20);
int checksum = randomInt();
Map<String, Object> results = new HashMap<>();
int resultsSize = randomIntBetween(1, 10);
for (int i = 0; i < resultsSize; i++) {
String resultField = randomAlphaOfLength(20);
Object resultValue = randomBoolean() ? randomAlphaOfLength(20) : randomDouble();
results.put(resultField, resultValue);
}
return new AnalyticsResult(idHash, results);
return new AnalyticsResult(checksum, results);
}

@Override
Expand Down