Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.analyses.DataFrameAnalysesUtils;
import org.elasticsearch.xpack.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -51,7 +51,9 @@ public void runJob(DataFrameAnalyticsConfig config, DataFrameDataExtractorFactor
DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor(false);
AnalyticsProcess process = createProcess(config.getId(), createProcessConfig(config, dataExtractor));
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
AnalyticsResultProcessor resultProcessor = new AnalyticsResultProcessor(client, dataExtractorFactory.newExtractor(true));
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true));
AnalyticsResultProcessor resultProcessor = new AnalyticsResultProcessor(dataFrameRowsJoiner);
executorService.execute(() -> resultProcessor.process(process));
executorService.execute(() -> processData(config.getId(), dataExtractor, process, resultProcessor, finishHandler));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,38 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class AnalyticsResult implements ToXContentObject {

public static final ParseField TYPE = new ParseField("analytics_result");
public static final ParseField CHECKSUM = new ParseField("checksum");
public static final ParseField RESULTS = new ParseField("results");

static final ConstructingObjectParser<AnalyticsResult, Void> PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(),
a -> new AnalyticsResult((Integer) a[0], (Map<String, Object>) a[1]));
a -> new AnalyticsResult((RowResults) a[0]));

static {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), CHECKSUM);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), RowResults.PARSER, RowResults.TYPE);
}

private final int checksum;
private final Map<String, Object> results;
private final RowResults rowResults;

public AnalyticsResult(int checksum, Map<String, Object> results) {
this.checksum = Objects.requireNonNull(checksum);
this.results = Objects.requireNonNull(results);
public AnalyticsResult(RowResults rowResults) {
this.rowResults = rowResults;
}

public int getChecksum() {
return checksum;
}

public Map<String, Object> getResults() {
return results;
public RowResults getRowResults() {
return rowResults;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CHECKSUM.getPreferredName(), checksum);
builder.field(RESULTS.getPreferredName(), results);
if (rowResults != null) {
builder.field(RowResults.TYPE.getPreferredName(), rowResults);
}
builder.endObject();
return builder;
}
Expand All @@ -63,11 +55,11 @@ public boolean equals(Object other) {
}

AnalyticsResult that = (AnalyticsResult) other;
return checksum == that.checksum && Objects.equals(results, that.results);
return Objects.equals(rowResults, that.rowResults);
}

@Override
public int hashCode() {
return Objects.hash(checksum, results);
return Objects.hash(rowResults);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,22 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AnalyticsResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AnalyticsResultProcessor.class);

private final Client client;
private final DataFrameDataExtractor dataExtractor;
private List<DataFrameDataExtractor.Row> currentDataFrameRows;
private List<AnalyticsResult> currentResults;
private final DataFrameRowsJoiner dataFrameRowsJoiner;
private final CountDownLatch completionLatch = new CountDownLatch(1);

public AnalyticsResultProcessor(Client client, DataFrameDataExtractor dataExtractor) {
this.client = Objects.requireNonNull(client);
this.dataExtractor = Objects.requireNonNull(dataExtractor);
public AnalyticsResultProcessor(DataFrameRowsJoiner dataFrameRowsJoiner) {
this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner);
}

public void awaitForCompletion() {
Expand All @@ -57,28 +41,8 @@ public void process(AnalyticsProcess process) {
try {
Iterator<AnalyticsResult> iterator = process.readAnalyticsResults();
while (iterator.hasNext()) {
try {
AnalyticsResult result = iterator.next();
if (dataExtractor.hasNext() == false) {
return;
}
if (currentDataFrameRows == null) {
Optional<List<DataFrameDataExtractor.Row>> nextBatch = dataExtractor.next();
if (nextBatch.isPresent() == false) {
return;
}
currentDataFrameRows = nextBatch.get();
currentResults = new ArrayList<>(currentDataFrameRows.size());
}
currentResults.add(result);
if (currentResults.size() == currentDataFrameRows.size()) {
joinCurrentResults();
currentDataFrameRows = null;
}
} catch (Exception e) {
LOGGER.warn("Error processing data frame analytics result", e);
}

AnalyticsResult result = iterator.next();
processResult(result);
}
} catch (Exception e) {
LOGGER.error("Error parsing data frame analytics output", e);
Expand All @@ -88,40 +52,10 @@ public void process(AnalyticsProcess process) {
}
}

private void joinCurrentResults() {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < currentDataFrameRows.size(); i++) {
DataFrameDataExtractor.Row row = currentDataFrameRows.get(i);
if (row.shouldSkip()) {
continue;
}
AnalyticsResult result = currentResults.get(i);
checkChecksumsMatch(row, result);

SearchHit hit = row.getHit();
Map<String, Object> source = new LinkedHashMap(hit.getSourceAsMap());
source.putAll(result.getResults());
IndexRequest indexRequest = new IndexRequest(hit.getIndex(), hit.getType(), hit.getId());
indexRequest.source(source);
indexRequest.opType(DocWriteRequest.OpType.INDEX);
bulkRequest.add(indexRequest);
}
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("Failures while writing data frame");
// TODO Better error handling
}
}
}

private void checkChecksumsMatch(DataFrameDataExtractor.Row row, AnalyticsResult result) {
if (row.getChecksum() != result.getChecksum()) {
String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ";
msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
throw new IllegalStateException(msg);
private void processResult(AnalyticsResult result) {
RowResults rowResults = result.getRowResults();
if (rowResults != null) {
dataFrameRowsJoiner.processRowResults(rowResults);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe.process;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

class DataFrameRowsJoiner {

private static final Logger LOGGER = LogManager.getLogger(DataFrameRowsJoiner.class);

private final String analyticsId;
private final Client client;
private final DataFrameDataExtractor dataExtractor;
private List<DataFrameDataExtractor.Row> currentDataFrameRows;
private List<RowResults> currentResults;
private boolean failed;

DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) {
this.analyticsId = Objects.requireNonNull(analyticsId);
this.client = Objects.requireNonNull(client);
this.dataExtractor = Objects.requireNonNull(dataExtractor);
}

void processRowResults(RowResults rowResults) {
if (failed) {
// If we are in failed state we drop the results but we let the processor
// parse the output
return;
}

try {
addResultAndJoinIfEndOfBatch(rowResults);
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e);
failed = true;
}
}

private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
if (currentDataFrameRows == null) {
Optional<List<DataFrameDataExtractor.Row>> nextBatch = getNextBatch();
if (nextBatch.isPresent() == false) {
return;
}
currentDataFrameRows = nextBatch.get();
currentResults = new ArrayList<>(currentDataFrameRows.size());
}
currentResults.add(rowResults);
if (currentResults.size() == currentDataFrameRows.size()) {
joinCurrentResults();
currentDataFrameRows = null;
}
}

private Optional<List<DataFrameDataExtractor.Row>> getNextBatch() {
try {
return dataExtractor.next();
} catch (IOException e) {
// TODO Implement recovery strategy or better error reporting
LOGGER.error("Error reading next batch of data frame rows", e);
return Optional.empty();
}
}

private void joinCurrentResults() {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < currentDataFrameRows.size(); i++) {
DataFrameDataExtractor.Row row = currentDataFrameRows.get(i);
if (row.shouldSkip()) {
continue;
}
RowResults result = currentResults.get(i);
checkChecksumsMatch(row, result);

SearchHit hit = row.getHit();
Map<String, Object> source = new LinkedHashMap(hit.getSourceAsMap());
source.putAll(result.getResults());
new IndexRequest(hit.getIndex());
IndexRequest indexRequest = new IndexRequest(hit.getIndex());
indexRequest.id(hit.getId());
indexRequest.source(source);
indexRequest.opType(DocWriteRequest.OpType.INDEX);
bulkRequest.add(indexRequest);
}
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet();
if (bulkResponse.hasFailures()) {
LOGGER.error("Failures while writing data frame");
// TODO Better error handling
}
}
}

private void checkChecksumsMatch(DataFrameDataExtractor.Row row, RowResults result) {
if (row.getChecksum() != result.getChecksum()) {
String msg = "Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ";
msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
throw new RuntimeException(msg);
// TODO Communicate this error to the user as effectively the analytics have failed (e.g. FAILED state, audit error, etc.)
}
}
}
Loading