From 6ff76cbcbe1d33cd371c3d078906d25d0dcf194c Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 3 Dec 2018 16:37:08 +0000 Subject: [PATCH 1/5] [FEATURE][ML] Parse results and join them in the data-frame copy index --- .../action/TransportRunAnalyticsAction.java | 3 +- .../ml/analytics/DataFrameDataExtractor.java | 22 +- .../DataFrameDataExtractorContext.java | 4 +- .../DataFrameDataExtractorFactory.java | 6 +- .../analytics/process/AnalyticsProcess.java | 14 + .../process/AnalyticsProcessManager.java | 52 ++- .../ml/analytics/process/AnalyticsResult.java | 73 +++ .../process/AnalyticsResultProcessor.java | 115 +++++ .../process/NativeAnalyticsProcess.java | 9 + .../autodetect/NativeAutodetectProcess.java | 19 +- .../NativeAutodetectProcessFactory.java | 9 +- .../ml/process/AbstractNativeProcess.java | 12 + .../ProcessResultsParser.java} | 38 +- .../process/AnalyticsResultTests.java | 39 ++ .../NativeAutodetectProcessTests.java | 13 +- .../output/AutodetectResultsParserTests.java | 422 ------------------ .../ml/process/ProcessResultsParserTests.java | 113 +++++ 17 files changed, 470 insertions(+), 493 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/{job/process/autodetect/output/AutodetectResultsParser.java => process/ProcessResultsParser.java} (68%) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessResultsParserTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java index 7d7b194de8ae6..967d03b6aa65a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java @@ -178,8 +178,7 @@ private void runPipelineAnalytics(String index, ActionListener dataExtractorFactoryListener = ActionListener.wrap( dataExtractorFactory -> { - DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor(); - analyticsProcessManager.processData(jobId, dataExtractor); + analyticsProcessManager.runJob(jobId, dataExtractorFactory); listener.onResponse(new AcknowledgedResponse(true)); }, listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java index b0d5a032665f8..3d47aeff1b8b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractor.java @@ -98,7 +98,7 @@ private SearchRequestBuilder buildSearchRequest() { .setIndices(context.indices) .setSize(context.scrollSize) .setQuery(context.query) - .setFetchSource(false); + .setFetchSource(context.includeSource); for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) { searchRequestBuilder.addDocValueField(docValueField.getName(), docValueField.getDocValueFormat()); @@ -149,7 +149,7 @@ private Row createRow(SearchHit hit) { break; } } - return new Row(extractedValues); + return new Row(extractedValues, hit); } private List continueScroll() throws IOException { @@ -196,10 +196,11 @@ public DataSummary collectDataSummary() { SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE) .setIndices(context.indices) .setSize(0) - .setQuery(context.query); + .setQuery(context.query) + .setTrackTotalHits(true); SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder); - return new DataSummary(searchResponse.getHits().getTotalHits(), context.extractedFields.getAllFields().size()); + return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size()); } public static class DataSummary { @@ -215,16 +216,27 @@ public DataSummary(long rows, int cols) { public static class Row { + private SearchHit hit; + @Nullable private String[] values; - private Row(String[] values) { + private Row(String[] values, SearchHit hit) { this.values = values; + this.hit = hit; } @Nullable public String[] getValues() { return values; } + + public SearchHit getHit() { + return hit; + } + + public boolean shouldSkip() { + return values == null; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorContext.java index 2acba64197c4b..d1b52bdac0351 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorContext.java @@ -20,14 +20,16 @@ public class DataFrameDataExtractorContext { final QueryBuilder query; final int scrollSize; final Map headers; + final boolean includeSource; DataFrameDataExtractorContext(String jobId, ExtractedFields extractedFields, List indices, QueryBuilder query, int scrollSize, - Map headers) { + Map headers, boolean includeSource) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); this.query = Objects.requireNonNull(query); this.scrollSize = scrollSize; this.headers = headers; + this.includeSource = includeSource; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java index 35085d282c87f..57c2b44f769c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/DataFrameDataExtractorFactory.java @@ -63,14 +63,16 @@ private DataFrameDataExtractorFactory(Client client, String index, ExtractedFiel this.extractedFields = Objects.requireNonNull(extractedFields); } - public DataFrameDataExtractor newExtractor() { + public DataFrameDataExtractor newExtractor(boolean includeSource) { DataFrameDataExtractorContext context = new DataFrameDataExtractorContext( "ml-analytics-" + index, extractedFields, Arrays.asList(index), QueryBuilders.matchAllQuery(), 1000, - Collections.emptyMap()); + Collections.emptyMap(), + includeSource + ); return new DataFrameDataExtractor(client, context); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java index 932d144cf6973..dc07d688a67ae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcess.java @@ -8,6 +8,7 @@ import org.elasticsearch.xpack.ml.process.NativeProcess; import java.io.IOException; +import java.util.Iterator; public interface AnalyticsProcess extends NativeProcess { @@ -17,4 +18,17 @@ public interface AnalyticsProcess extends NativeProcess { * @throws IOException If an error occurs writing to the process */ void writeEndOfDataMessage() throws IOException; + + /** + * @return stream of analytics results. + */ + Iterator readAnalyticsResults(); + + /** + * Read anything left in the stream before + * closing the stream otherwise if the process + * tries to write more after the close it gets + * a SIGPIPE + */ + void consumeAndCloseOutputStream(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java index 2671f49293863..bf079348c7e55 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsProcessManager.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.analytics.DataFrameAnalysis; import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; +import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractorFactory; import java.io.IOException; import java.util.List; @@ -41,28 +42,39 @@ public AnalyticsProcessManager(Client client, Environment environment, ThreadPoo this.processFactory = Objects.requireNonNull(analyticsProcessFactory); } - public void processData(String jobId, DataFrameDataExtractor dataExtractor) { + public void runJob(String jobId, DataFrameDataExtractorFactory dataExtractorFactory) { threadPool.generic().execute(() -> { - AnalyticsProcess process = createProcess(jobId, dataExtractor); - try { - writeHeaderRecord(dataExtractor, process); - writeDataRows(dataExtractor, process); - process.writeEndOfDataMessage(); - process.flushStream(); + DataFrameDataExtractor dataExtractor = dataExtractorFactory.newExtractor(false); + AnalyticsProcess process = createProcess(jobId, createProcessConfig(dataExtractor)); + ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); + AnalyticsResultProcessor resultProcessor = new AnalyticsResultProcessor(client, dataExtractorFactory.newExtractor(true)); + executorService.execute(() -> resultProcessor.process(process)); + executorService.execute(() -> processData(jobId, dataExtractor, process, resultProcessor)); + }); + } + + private void processData(String jobId, DataFrameDataExtractor dataExtractor, AnalyticsProcess process, + AnalyticsResultProcessor resultProcessor) { + try { + writeHeaderRecord(dataExtractor, process); + writeDataRows(dataExtractor, process); + process.writeEndOfDataMessage(); + process.flushStream(); - LOGGER.debug("[{}] Closing process", jobId); + LOGGER.info("[{}] Waiting for result processor to complete", jobId); + resultProcessor.awaitForCompletion(); + LOGGER.info("[{}] Result processor has completed", jobId); + } catch (IOException e) { + LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", jobId), e); + } finally { + LOGGER.info("[{}] Closing process", jobId); + try { process.close(); LOGGER.info("[{}] Closed process", jobId); } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", jobId), e); - } finally { - try { - process.close(); - } catch (IOException e) { - LOGGER.error("[{}] Error closing data frame analyzer process", jobId); - } + LOGGER.error("[{}] Error closing data frame analyzer process", jobId); } - }); + } } private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process) throws IOException { @@ -75,8 +87,8 @@ private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProces Optional> rows = dataExtractor.next(); if (rows.isPresent()) { for (DataFrameDataExtractor.Row row : rows.get()) { - String[] rowValues = row.getValues(); - if (rowValues != null) { + if (row.shouldSkip() == false) { + String[] rowValues = row.getValues(); System.arraycopy(rowValues, 0, record, 0, rowValues.length); process.writeRecord(record); } @@ -96,10 +108,10 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr process.writeRecord(headerRecord); } - private AnalyticsProcess createProcess(String jobId, DataFrameDataExtractor dataExtractor) { + private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) { // TODO We should rename the thread pool to reflect its more general use now, e.g. JOB_PROCESS_THREAD_POOL_NAME ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, createProcessConfig(dataExtractor), executorService); + AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start analytics process"); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java new file mode 100644 index 0000000000000..1f9ef71da8fb1 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResult.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics.process; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class AnalyticsResult implements ToXContentObject { + + public static final ParseField TYPE = new ParseField("analytics_result"); + public static final ParseField ID_HASH = new ParseField("id_hash"); + public static final ParseField RESULTS = new ParseField("results"); + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(), + a -> new AnalyticsResult((String) a[0], (Map) a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_HASH); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, context) -> p.map(), RESULTS); + } + + private final String idHash; + private final Map results; + + public AnalyticsResult(String idHash, Map results) { + this.idHash = Objects.requireNonNull(idHash); + this.results = Objects.requireNonNull(results); + } + + public String getIdHash() { + return idHash; + } + + public Map getResults() { + return results; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ID_HASH.getPreferredName(), idHash); + builder.field(RESULTS.getPreferredName(), results); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + AnalyticsResult that = (AnalyticsResult) other; + return Objects.equals(idHash, that.idHash) && Objects.equals(results, that.results); + } + + @Override + public int hashCode() { + return Objects.hash(idHash, results); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java new file mode 100644 index 0000000000000..6f55c53ea7770 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class AnalyticsResultProcessor { + + private static final Logger LOGGER = LogManager.getLogger(AnalyticsResultProcessor.class); + + private final Client client; + private final DataFrameDataExtractor dataExtractor; + private List currentDataFrameRows; + private List currentResults; + private final CountDownLatch completionLatch = new CountDownLatch(1); + + public AnalyticsResultProcessor(Client client, DataFrameDataExtractor dataExtractor) { + this.client = Objects.requireNonNull(client); + this.dataExtractor = Objects.requireNonNull(dataExtractor); + } + + public void awaitForCompletion() { + try { + if (completionLatch.await(30, TimeUnit.MINUTES) == false) { + LOGGER.warn("Timeout waiting for results processor to complete"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("Interrupted waiting for results processor to complete"); + } + } + + public void process(AnalyticsProcess process) { + + try { + Iterator iterator = process.readAnalyticsResults(); + while (iterator.hasNext()) { + try { + AnalyticsResult result = iterator.next(); + if (dataExtractor.hasNext() == false) { + return; + } + if (currentDataFrameRows == null) { + Optional> nextBatch = dataExtractor.next(); + if (nextBatch.isPresent() == false) { + return; + } + currentDataFrameRows = nextBatch.get(); + currentResults = new ArrayList<>(currentDataFrameRows.size()); + } + currentResults.add(result); + if (currentResults.size() == currentDataFrameRows.size()) { + joinCurrentResults(); + currentDataFrameRows = null; + } + } catch (Exception e) { + LOGGER.warn("Error processing analytics result", e); + } + + } + } catch (Exception e) { + LOGGER.error("Error parsing analytics output", e); + } finally { + completionLatch.countDown(); + process.consumeAndCloseOutputStream(); + } + } + + private void joinCurrentResults() { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < currentDataFrameRows.size(); i++) { + DataFrameDataExtractor.Row row = currentDataFrameRows.get(i); + if (row.shouldSkip()) { + continue; + } + AnalyticsResult result = currentResults.get(i); + SearchHit hit = row.getHit(); + Map source = new HashMap(hit.getSourceAsMap()); + source.putAll(result.getResults()); + IndexRequest indexRequest = new IndexRequest(hit.getIndex(), hit.getType(), hit.getId()); + indexRequest.source(source); + indexRequest.opType(DocWriteRequest.OpType.INDEX); + bulkRequest.add(indexRequest); + } + if (bulkRequest.numberOfActions() > 0) { + BulkResponse bulkResponse = client.execute(BulkAction.INSTANCE, bulkRequest).actionGet(); + if (bulkResponse.hasFailures()) { + LOGGER.error("Failures while writing data frame"); + // TODO Better error handling + } + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcess.java index b8a387fd6e2b2..5f0f58e8b7b8a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/NativeAnalyticsProcess.java @@ -6,17 +6,21 @@ package org.elasticsearch.xpack.ml.analytics.process; import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; +import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; public class NativeAnalyticsProcess extends AbstractNativeProcess implements AnalyticsProcess { private static final String NAME = "analytics"; + private final ProcessResultsParser resultsParser = new ProcessResultsParser<>(AnalyticsResult.PARSER); + protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, Runnable onProcessCrash) { @@ -37,4 +41,9 @@ public void persistState() { public void writeEndOfDataMessage() throws IOException { new AnalyticsControlMessageWriter(recordWriter(), numberOfFields()).writeEndOfData(); } + + @Override + public Iterator readAnalyticsResults() { + return resultsParser.parseResults(processOutStream()); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index 69ed0d66c8606..4c7c4d553bcbc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -14,13 +14,13 @@ import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; +import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import java.io.IOException; import java.io.InputStream; @@ -38,11 +38,11 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec private static final String NAME = "autodetect"; - private final AutodetectResultsParser resultsParser; + private final ProcessResultsParser resultsParser; NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - AutodetectResultsParser resultsParser, Runnable onProcessCrash) { + ProcessResultsParser resultsParser, Runnable onProcessCrash) { super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); this.resultsParser = resultsParser; } @@ -117,17 +117,4 @@ public Iterator readAutodetectResults() { private AutodetectControlMsgWriter newMessageWriter() { return new AutodetectControlMsgWriter(recordWriter(), numberOfFields()); } - - @Override - public void consumeAndCloseOutputStream() { - try { - byte[] buff = new byte[512]; - while (processOutStream().read(buff) >= 0) { - // Do nothing - } - processOutStream().close(); - } catch (IOException e) { - throw new RuntimeException("Error closing result parser input stream", e); - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 3185ebc6f1c7d..68aaa8a81c4b6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -16,11 +16,12 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.process.NativeController; -import org.elasticsearch.xpack.ml.process.ProcessPipes; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; +import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.IOException; @@ -68,7 +69,7 @@ public AutodetectProcess createAutodetectProcess(Job job, int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1; AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId()); - AutodetectResultsParser resultsParser = new AutodetectResultsParser(); + ProcessResultsParser resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER); NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index b84bfdd38e19a..8325d09b47050 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -262,4 +262,16 @@ protected LengthEncodedWriter recordWriter() { protected boolean isProcessKilled() { return processKilled; } + + public void consumeAndCloseOutputStream() { + try { + byte[] buff = new byte[512]; + while (processOutStream().read(buff) >= 0) { + // Do nothing + } + processOutStream().close(); + } catch (IOException e) { + throw new RuntimeException("Error closing result parser input stream", e); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java similarity index 68% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java index 2ec37a0f86e5d..f8d0c4f746c6a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParser.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java @@ -3,31 +3,41 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.autodetect.output; +package org.elasticsearch.xpack.ml.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; +import java.util.Objects; /** - * Parses the JSON output of the autodetect program. + * Parses the JSON output of a process. *

- * Expects an array of buckets so the first element will always be the + * Expects an array of objects so the first element will always be the * start array symbol and the data must be terminated with the end array symbol. */ -public class AutodetectResultsParser { - public Iterator parseResults(InputStream in) throws ElasticsearchParseException { +public class ProcessResultsParser { + + private static final Logger LOGGER = LogManager.getLogger(ProcessResultsParser.class); + + private final ConstructingObjectParser resultParser; + + public ProcessResultsParser(ConstructingObjectParser resultParser) { + this.resultParser = Objects.requireNonNull(resultParser); + } + + public Iterator parseResults(InputStream in) throws ElasticsearchParseException { try { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, in); @@ -36,21 +46,19 @@ public Iterator parseResults(InputStream in) throws Elasticsea if (token != XContentParser.Token.START_ARRAY) { throw new ElasticsearchParseException("unexpected token [" + token + "]"); } - return new AutodetectResultIterator(in, parser); + return new ResultIterator(in, parser); } catch (IOException e) { throw new ElasticsearchParseException(e.getMessage(), e); } } - private static class AutodetectResultIterator implements Iterator { - - private static final Logger logger = LogManager.getLogger(AutodetectResultIterator.class); + private class ResultIterator implements Iterator { private final InputStream in; private final XContentParser parser; private XContentParser.Token token; - private AutodetectResultIterator(InputStream in, XContentParser parser) { + private ResultIterator(InputStream in, XContentParser parser) { this.in = in; this.parser = parser; token = parser.currentToken(); @@ -61,21 +69,21 @@ public boolean hasNext() { try { token = parser.nextToken(); } catch (IOException e) { - logger.debug("io error while parsing", e); + LOGGER.debug("io error while parsing", e); return false; } if (token == XContentParser.Token.END_ARRAY) { return false; } else if (token != XContentParser.Token.START_OBJECT) { - logger.error("Expecting Json Field name token after the Start Object token"); + LOGGER.error("Expecting Json Field name token after the Start Object token"); throw new ElasticsearchParseException("unexpected token [" + token + "]"); } return true; } @Override - public AutodetectResult next() { - return AutodetectResult.PARSER.apply(parser, null); + public T next() { + return resultParser.apply(parser, null); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java new file mode 100644 index 0000000000000..6250a96cd3284 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics.process; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class AnalyticsResultTests extends AbstractXContentTestCase { + + @Override + protected AnalyticsResult createTestInstance() { + String idHash = randomAlphaOfLength(20); + Map results = new HashMap<>(); + int resultsSize = randomIntBetween(1, 10); + for (int i = 0; i < resultsSize; i++) { + String resultField = randomAlphaOfLength(20); + Object resultValue = randomBoolean() ? randomAlphaOfLength(20) : randomDouble(); + results.put(resultField, resultValue); + } + return new AnalyticsResult(idHash, results); + } + + @Override + protected AnalyticsResult doParseInstance(XContentParser parser) throws IOException { + return AnalyticsResult.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 8542061c761a2..0b7a0273345d3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -7,12 +7,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter; +import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.process.ProcessResultsParser; import org.junit.Assert; import org.junit.Before; @@ -58,7 +59,7 @@ public void testProcessStartTime() throws Exception { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, mock(OutputStream.class), outputStream, mock(OutputStream.class), NUMBER_FIELDS, null, - new AutodetectResultsParser(), mock(Runnable.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Runnable.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -80,7 +81,7 @@ public void testWriteRecord() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Runnable.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); @@ -114,7 +115,7 @@ public void testFlush() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Runnable.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); @@ -147,7 +148,7 @@ public void testConsumeAndCloseOutputStream() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new ProcessResultsParser(AutodetectResult.PARSER), mock(Runnable.class))) { process.consumeAndCloseOutputStream(); assertThat(processOutStream.available(), equalTo(0)); @@ -162,7 +163,7 @@ private void testWriteMessage(CheckedConsumer writeFunc ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new AutodetectResultsParser(), mock(Runnable.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Runnable.class))) { process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java deleted file mode 100644 index 1118453154ed8..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultsParserTests.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.process.autodetect.output; - -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.xcontent.XContentParseException; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.core.ml.job.results.Bucket; -import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; -import org.elasticsearch.xpack.ml.job.results.AutodetectResult; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Tests for parsing the JSON output of autodetect - */ -public class AutodetectResultsParserTests extends ESTestCase { - private static final double EPSILON = 0.000001; - - private static final String METRIC_OUTPUT_SAMPLE = "[{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359450000000," - + "\"bucket_span\":22, \"records\":[]," - + "\"anomaly_score\":0,\"event_count\":806,\"bucket_influencers\":[" - + "{\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0," - + "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\"," - + "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.1, normalizer 2" + - ".1]\",\"timestamp\":1359450000000}}" - + ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":" - + "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"job_id\":\"foo\",\"probability\":0.0637541," - + "\"by_field_name\":\"airline\",\"by_field_value\":\"JZA\", \"typical\":[1020.08],\"actual\":[1042.14]," - + "\"field_name\":\"responsetime\",\"function\":\"max\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"}," - + "{\"timestamp\":1359453600000,\"bucket_span\":22,\"job_id\":\"foo\",\"probability\":0.00748292," - + "\"by_field_name\":\"airline\",\"by_field_value\":\"AMX\", " - + "\"typical\":[20.2137],\"actual\":[22.8855],\"field_name\":\"responsetime\",\"function\":\"max\"," - + "\"partition_field_name\":\"\",\"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22," - + "\"job_id\":\"foo\",\"probability\":0.023494,\"by_field_name\":\"airline\"," - + "\"by_field_value\":\"DAL\", \"typical\":[382.177],\"actual\":[358.934],\"field_name\":\"responsetime\",\"function\":\"min\"," - + "\"partition_field_name\":\"\", \"partition_field_value\":\"\"},{\"timestamp\":1359453600000,\"bucket_span\":22," - + "\"job_id\":\"foo\"," - + "\"probability\":0.0473552,\"by_field_name\":\"airline\",\"by_field_value\":\"SWA\", \"typical\":[152.148]," - + "\"actual\":[96.6425],\"field_name\":\"responsetime\",\"function\":\"min\",\"partition_field_name\":\"\"," - + "\"partition_field_value\":\"\"}]," - + "\"initial_anomaly_score\":0.0140005, \"anomaly_score\":20.22688," - + "\"event_count\":820,\"bucket_influencers\":[{\"timestamp\":1359453600000,\"bucket_span\":22," - + "\"job_id\":\"foo\", \"raw_anomaly_score\":0.0140005, \"probability\":0.01,\"influencer_field_name\":\"bucket_time\"," - + "\"initial_anomaly_score\":20.22688,\"anomaly_score\":20.22688} ,{\"timestamp\":1359453600000,\"bucket_span\":22," - + "\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03," - + "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": " - + "{\"job_id\":\"foo\",\"timestamp\":1359453600000," - + "\"quantile_state\":\"[normalizer 1.2, normalizer 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ," - + "{\"quantiles\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"quantile_state\":\"[normalizer 1.3, normalizer 2.3]\"}} ]"; - - private static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\"," - + "\"function\":\"max\"," - + "\"causes\":[{\"probability\":1.38951e-08,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"mail.google.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[9.19027e+07]}]," - + "\"record_score\":100,\"anomaly_score\":44.7324},{\"probability\":3.86587e-07,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"armmf.adobe.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":3.86587e-07,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"armmf.adobe.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[3.20093e+07]}]," - + "\"record_score\":89.5834,\"anomaly_score\":44.7324},{\"probability\":0.00500083,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"0.docs.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.00500083,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"0.docs.google.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[6.61812e+06]}]," - + "\"record_score\":1.19856,\"anomaly_score\":44.7324},{\"probability\":0.0152333,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"emea.salesforce.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0152333,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"emea.salesforce.com\",\"function\":\"max\",\"typical\":[101534],\"actual\":[5.36373e+06]}]," - + "\"record_score\":0.303996,\"anomaly_score\":44.7324}],\"raw_anomaly_score\":1.30397,\"anomaly_score\":44.7324," - + "\"event_count\":1235}" + ",{\"flush\":\"testing2\"}" - + ",{\"timestamp\":1379590800,\"records\":[{\"probability\":1.9008e-08,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":1.9008e-08,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"mail.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[1.1498e+08]}]," - + "\"record_score\":93.6213,\"anomaly_score\":1.19192},{\"probability\":1.01013e-06,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"armmf.adobe.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":1.01013e-06,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"armmf.adobe.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[3.25808e+07]}]," - + "\"record_score\":86.5825,\"anomaly_score\":1.19192},{\"probability\":0.000386185,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"0.docs.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.000386185,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"0.docs.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[3.22855e+06]}]," - + "\"record_score\":17.1179,\"anomaly_score\":1.19192},{\"probability\":0.00208033,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"docs.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.00208033,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"docs.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[1.43328e+06]}]," - + "\"record_score\":3.0692,\"anomaly_score\":1.19192},{\"probability\":0.00312988,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"booking2.airasia.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.00312988,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"booking2.airasia.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[1.15764e+06]}]," - + "\"record_score\":1.99532,\"anomaly_score\":1.19192},{\"probability\":0.00379229,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.facebook.com\",\"function\":\"max\",\"causes\":[" - + "{\"probability\":0.00379229,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.facebook.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[1.0443e+06]}]," - + "\"record_score\":1.62352,\"anomaly_score\":1.19192},{\"probability\":0.00623576,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.airasia.com\",\"function\":\"max\",\"causes\":[" - + "{\"probability\":0.00623576,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.airasia.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[792699]}]," - + "\"record_score\":0.935134,\"anomaly_score\":1.19192},{\"probability\":0.00665308,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.google.com\",\"function\":\"max\",\"causes\":[" - + "{\"probability\":0.00665308,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[763985]}]," - + "\"record_score\":0.868119,\"anomaly_score\":1.19192},{\"probability\":0.00709315,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"0.drive.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.00709315,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"0.drive.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[736442]}]," - + "\"record_score\":0.805994,\"anomaly_score\":1.19192},{\"probability\":0.00755789,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"resources2.news.com.au\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.00755789,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"resources2.news.com.au\",\"function\":\"max\",\"typical\":[31356],\"actual\":[709962]}]," - + "\"record_score\":0.748239,\"anomaly_score\":1.19192},{\"probability\":0.00834974,\"field_name\":" - + "\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.calypso.net.au\",\"function\":\"max\"," - + "\"causes\":[{\"probability\":0.00834974,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.calypso.net.au\",\"function\":\"max\",\"typical\":[31356],\"actual\":[669968]}]," - + "\"record_score\":0.664644,\"anomaly_score\":1.19192},{\"probability\":0.0107711,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"ad.yieldmanager.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0107711,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"ad.yieldmanager.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[576067]}]," - + "\"record_score\":0.485277,\"anomaly_score\":1.19192},{\"probability\":0.0123367,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.google-analytics.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0123367,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.google-analytics.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[530594]}]," - + "\"record_score\":0.406783,\"anomaly_score\":1.19192},{\"probability\":0.0125647,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"bs.serving-sys.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0125647,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"bs.serving-sys.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[524690]}]," - + "\"record_score\":0.396986,\"anomaly_score\":1.19192},{\"probability\":0.0141652,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.google.com.au\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0141652,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.google.com.au\",\"function\":\"max\",\"typical\":[31356],\"actual\":[487328]}]," - + "\"record_score\":0.337075,\"anomaly_score\":1.19192},{\"probability\":0.0141742,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"resources1.news.com.au\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0141742,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"resources1.news.com.au\",\"function\":\"max\",\"typical\":[31356],\"actual\":[487136]}]," - + "\"record_score\":0.336776,\"anomaly_score\":1.19192},{\"probability\":0.0145263,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"b.mail.google.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0145263,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"b.mail.google.com\",\"function\":\"max\",\"typical\":[31356],\"actual\":[479766]}]," - + "\"record_score\":0.325385,\"anomaly_score\":1.19192},{\"probability\":0.0151447,\"field_name\":\"sum_cs_bytes_\"," - + "\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.rei.com\",\"function\":\"max\",\"causes\":[{" - + "\"probability\":0.0151447,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.rei" + - ".com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[467450]}],\"record_score\":0.306657,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0164073,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"s3.amazonaws.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0164073," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"s3.amazonaws.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[444511]}],\"record_score\":0.272805,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0201927,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"0-p-06-ash2.channel.facebook.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0201927," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"0-p-06-ash2.channel.facebook.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[389243]}],\"record_score\":0.196685,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0218721,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"booking.airasia.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0218721," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"booking.airasia.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[369509]}],\"record_score\":0.171353," - + "\"anomaly_score\":1.19192},{\"probability\":0.0242411,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.yammer.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0242411," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.yammer.com\"," + - "\"function\":\"max\"," - + "\"typical\":[31356],\"actual\":[345295]}],\"record_score\":0.141585,\"anomaly_score\":1.19192}," - + "{\"probability\":0.0258232,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"safebrowsing-cache.google.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0258232," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"safebrowsing-cache.google.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[331051]}],\"record_score\":0.124748,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0259695,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"fbcdn-profile-a.akamaihd.net\",\"function\":\"max\",\"causes\":[{\"probability\":0.0259695," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"fbcdn-profile-a.akamaihd.net\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[329801]}],\"record_score\":0.123294,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0268874,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.oag.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0268874," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.oag.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[322200]}],\"record_score\":0.114537," - + "\"anomaly_score\":1.19192},{\"probability\":0.0279146,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"booking.qatarairways.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0279146," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"booking.qatarairways.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[314153]}],\"record_score\":0.105419,\"anomaly_score\":1" + - ".19192}," - + "{\"probability\":0.0309351,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"resources3.news.com.au\",\"function\":\"max\",\"causes\":[{\"probability\":0.0309351," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"resources3.news.com.au\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[292918]}],\"record_score\":0.0821156,\"anomaly_score\":1" + - ".19192}" - + ",{\"probability\":0.0335204,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"resources0.news.com.au\",\"function\":\"max\",\"causes\":[{\"probability\":0.0335204," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"resources0.news.com.au\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[277136]}],\"record_score\":0.0655063,\"anomaly_score\":1" + - ".19192}" - + ",{\"probability\":0.0354927,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.southwest.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0354927," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.southwest.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[266310]}],\"record_score\":0.0544615," - + "\"anomaly_score\":1.19192},{\"probability\":0.0392043,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"syndication.twimg.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0392043," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"syndication.twimg.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[248276]}],\"record_score\":0.0366913,\"anomaly_score\":1" + - ".19192}" - + ",{\"probability\":0.0400853,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"" - + ",\"over_field_value\":\"mts0.google.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0400853," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mts0.google.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[244381]}],\"record_score\":0.0329562," - + "\"anomaly_score\":1.19192},{\"probability\":0.0407335,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"www.onthegotours.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0407335," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"www.onthegotours.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[241600]}],\"record_score\":0.0303116," - + "\"anomaly_score\":1.19192},{\"probability\":0.0470889,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"chatenabled.mail.google.com\",\"function\":\"max\",\"causes\":[{\"probability\":0.0470889," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"chatenabled.mail.google.com\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[217573]}],\"record_score\":0.00823738," - + "\"anomaly_score\":1.19192},{\"probability\":0.0491243,\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\"," - + "\"over_field_value\":\"googleads.g.doubleclick.net\",\"function\":\"max\",\"causes\":[{\"probability\":0.0491243," - + "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"googleads.g.doubleclick.net\"," - + "\"function\":\"max\",\"typical\":[31356],\"actual\":[210926]}],\"record_score\":0.00237509," - + "\"anomaly_score\":1.19192}],\"raw_anomaly_score\":1.26918,\"anomaly_score\":1.19192," - + "\"event_count\":1159}" + "]"; - - public void testParser() throws IOException { - try (InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); - List buckets = results.stream().map(AutodetectResult::getBucket) - .filter(b -> b != null) - .collect(Collectors.toList()); - - assertEquals(2, buckets.size()); - assertEquals(new Date(1359450000000L), buckets.get(0).getTimestamp()); - - assertEquals(buckets.get(0).getEventCount(), 806); - - List bucketInfluencers = buckets.get(0).getBucketInfluencers(); - assertEquals(1, bucketInfluencers.size()); - assertEquals(0.0, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON); - assertEquals(0.0, bucketInfluencers.get(0).getAnomalyScore(), EPSILON); - assertEquals(0.0, bucketInfluencers.get(0).getProbability(), EPSILON); - assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName()); - - assertEquals(new Date(1359453600000L), buckets.get(1).getTimestamp()); - - assertEquals(buckets.get(1).getEventCount(), 820); - bucketInfluencers = buckets.get(1).getBucketInfluencers(); - assertEquals(2, bucketInfluencers.size()); - assertEquals(0.0140005, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON); - assertEquals(20.22688, bucketInfluencers.get(0).getAnomalyScore(), EPSILON); - assertEquals(0.01, bucketInfluencers.get(0).getProbability(), EPSILON); - assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName()); - assertEquals(0.005, bucketInfluencers.get(1).getRawAnomalyScore(), EPSILON); - assertEquals(10.5, bucketInfluencers.get(1).getAnomalyScore(), EPSILON); - assertEquals(0.03, bucketInfluencers.get(1).getProbability(), EPSILON); - assertEquals("foo", bucketInfluencers.get(1).getInfluencerFieldName()); - - Bucket secondBucket = buckets.get(1); - - assertEquals(0.0637541, secondBucket.getRecords().get(0).getProbability(), EPSILON); - assertEquals("airline", secondBucket.getRecords().get(0).getByFieldName()); - assertEquals("JZA", secondBucket.getRecords().get(0).getByFieldValue()); - assertEquals(1020.08, secondBucket.getRecords().get(0).getTypical().get(0), EPSILON); - assertEquals(1042.14, secondBucket.getRecords().get(0).getActual().get(0), EPSILON); - assertEquals("responsetime", secondBucket.getRecords().get(0).getFieldName()); - assertEquals("max", secondBucket.getRecords().get(0).getFunction()); - assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldName()); - assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldValue()); - - assertEquals(0.00748292, secondBucket.getRecords().get(1).getProbability(), EPSILON); - assertEquals("airline", secondBucket.getRecords().get(1).getByFieldName()); - assertEquals("AMX", secondBucket.getRecords().get(1).getByFieldValue()); - assertEquals(20.2137, secondBucket.getRecords().get(1).getTypical().get(0), EPSILON); - assertEquals(22.8855, secondBucket.getRecords().get(1).getActual().get(0), EPSILON); - assertEquals("responsetime", secondBucket.getRecords().get(1).getFieldName()); - assertEquals("max", secondBucket.getRecords().get(1).getFunction()); - assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldName()); - assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldValue()); - - assertEquals(0.023494, secondBucket.getRecords().get(2).getProbability(), EPSILON); - assertEquals("airline", secondBucket.getRecords().get(2).getByFieldName()); - assertEquals("DAL", secondBucket.getRecords().get(2).getByFieldValue()); - assertEquals(382.177, secondBucket.getRecords().get(2).getTypical().get(0), EPSILON); - assertEquals(358.934, secondBucket.getRecords().get(2).getActual().get(0), EPSILON); - assertEquals("responsetime", secondBucket.getRecords().get(2).getFieldName()); - assertEquals("min", secondBucket.getRecords().get(2).getFunction()); - assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldName()); - assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldValue()); - - assertEquals(0.0473552, secondBucket.getRecords().get(3).getProbability(), EPSILON); - assertEquals("airline", secondBucket.getRecords().get(3).getByFieldName()); - assertEquals("SWA", secondBucket.getRecords().get(3).getByFieldValue()); - assertEquals(152.148, secondBucket.getRecords().get(3).getTypical().get(0), EPSILON); - assertEquals(96.6425, secondBucket.getRecords().get(3).getActual().get(0), EPSILON); - assertEquals("responsetime", secondBucket.getRecords().get(3).getFieldName()); - assertEquals("min", secondBucket.getRecords().get(3).getFunction()); - assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldName()); - assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldValue()); - - List quantiles = results.stream().map(AutodetectResult::getQuantiles) - .filter(q -> q != null) - .collect(Collectors.toList()); - assertEquals(3, quantiles.size()); - assertEquals("foo", quantiles.get(0).getJobId()); - assertEquals(new Date(1359450000000L), quantiles.get(0).getTimestamp()); - assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState()); - assertEquals("foo", quantiles.get(1).getJobId()); - assertEquals(new Date(1359453600000L), quantiles.get(1).getTimestamp()); - assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState()); - assertEquals("foo", quantiles.get(2).getJobId()); - assertEquals(new Date(1359453600000L), quantiles.get(2).getTimestamp()); - assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState()); - } - } - - @AwaitsFix(bugUrl = "rewrite this test so it doesn't use ~200 lines of json") - public void testPopulationParser() throws IOException { - try (InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); - List buckets = results.stream().map(AutodetectResult::getBucket) - .filter(b -> b != null) - .collect(Collectors.toList()); - - assertEquals(2, buckets.size()); - assertEquals(new Date(1379590200000L), buckets.get(0).getTimestamp()); - assertEquals(buckets.get(0).getEventCount(), 1235); - - Bucket firstBucket = buckets.get(0); - assertEquals(1.38951e-08, firstBucket.getRecords().get(0).getProbability(), EPSILON); - assertEquals("sum_cs_bytes_", firstBucket.getRecords().get(0).getFieldName()); - assertEquals("max", firstBucket.getRecords().get(0).getFunction()); - assertEquals("cs_host", firstBucket.getRecords().get(0).getOverFieldName()); - assertEquals("mail.google.com", firstBucket.getRecords().get(0).getOverFieldValue()); - assertNotNull(firstBucket.getRecords().get(0).getCauses()); - - assertEquals(new Date(1379590800000L), buckets.get(1).getTimestamp()); - assertEquals(buckets.get(1).getEventCount(), 1159); - } - } - - public void testParse_GivenEmptyArray() throws ElasticsearchParseException, IOException { - String json = "[]"; - try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - assertFalse(parser.parseResults(inputStream).hasNext()); - } - } - - public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException { - String json = "[{\"model_size_stats\": {\"job_id\": \"foo\", \"model_bytes\":300}}]"; - try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - - AutodetectResultsParser parser = new AutodetectResultsParser(); - List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); - - assertEquals(1, results.size()); - assertEquals(300, results.get(0).getModelSizeStats().getModelBytes()); - } - } - - public void testParse_GivenCategoryDefinition() throws IOException { - String json = "[{\"category_definition\": {\"job_id\":\"foo\", \"category_id\":18}}]"; - try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - List results = new ArrayList<>(); - parser.parseResults(inputStream).forEachRemaining(results::add); - - - assertEquals(1, results.size()); - assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId()); - } - } - - public void testParse_GivenUnknownObject() throws ElasticsearchParseException, IOException { - String json = "[{\"unknown\":{\"id\": 18}}]"; - try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - XContentParseException e = expectThrows(XContentParseException.class, - () -> parser.parseResults(inputStream).forEachRemaining(a -> { - })); - assertEquals("[1:3] [autodetect_result] unknown field [unknown], parser not found", e.getMessage()); - } - } - - public void testParse_GivenArrayContainsAnotherArray() throws ElasticsearchParseException, IOException { - String json = "[[]]"; - try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - AutodetectResultsParser parser = new AutodetectResultsParser(); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> parser.parseResults(inputStream).forEachRemaining(a -> { - })); - assertEquals("unexpected token [START_ARRAY]", e.getMessage()); - } - } - - /** - * Ensure that we do not accept NaN values - */ - public void testParsingExceptionNaN() { - String json = "[{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":10,\"records\":" - + "[{\"timestamp\":1359453600000,\"bucket_span\":10,\"job_id\":\"foo\",\"probability\":NaN," - + "\"by_field_name\":\"airline\",\"by_field_value\":\"JZA\", \"typical\":[1020.08],\"actual\":[0]," - + "\"field_name\":\"responsetime\",\"function\":\"max\",\"partition_field_name\":\"\",\"partition_field_value\":\"\"}]}}]"; - InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - AutodetectResultsParser parser = new AutodetectResultsParser(); - - expectThrows(XContentParseException.class, - () -> parser.parseResults(inputStream).forEachRemaining(a -> {})); - } -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessResultsParserTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessResultsParserTests.java new file mode 100644 index 0000000000000..32ab15a27019f --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessResultsParserTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import com.google.common.base.Charsets; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParseException; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static org.hamcrest.Matchers.contains; + +public class ProcessResultsParserTests extends ESTestCase { + + public void testParse_GivenEmptyArray() throws IOException { + String json = "[]"; + try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { + ProcessResultsParser parser = new ProcessResultsParser<>(TestResult.PARSER); + assertFalse(parser.parseResults(inputStream).hasNext()); + } + } + + public void testParse_GivenUnknownObject() throws IOException { + String json = "[{\"unknown\":{\"id\": 18}}]"; + try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { + ProcessResultsParser parser = new ProcessResultsParser<>(TestResult.PARSER); + XContentParseException e = expectThrows(XContentParseException.class, + () -> parser.parseResults(inputStream).forEachRemaining(a -> { + })); + assertEquals("[1:3] [test_result] unknown field [unknown], parser not found", e.getMessage()); + } + } + + public void testParse_GivenArrayContainsAnotherArray() throws IOException { + String json = "[[]]"; + try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { + ProcessResultsParser parser = new ProcessResultsParser<>(TestResult.PARSER); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, + () -> parser.parseResults(inputStream).forEachRemaining(a -> { + })); + assertEquals("unexpected token [START_ARRAY]", e.getMessage()); + } + } + + public void testParseResults() throws IOException { + String input = "[{\"field_1\": \"a\", \"field_2\": 1.0}, {\"field_1\": \"b\", \"field_2\": 2.0}," + + " {\"field_1\": \"c\", \"field_2\": 3.0}]"; + try (InputStream inputStream = new ByteArrayInputStream(input.getBytes(Charsets.UTF_8))) { + + ProcessResultsParser parser = new ProcessResultsParser<>(TestResult.PARSER); + Iterator testResultIterator = parser.parseResults(inputStream); + + List parsedResults = new ArrayList<>(); + while (testResultIterator.hasNext()) { + parsedResults.add(testResultIterator.next()); + } + + assertThat(parsedResults, contains(new TestResult("a", 1.0), new TestResult("b", 2.0), new TestResult("c", 3.0))); + } + } + + private static class TestResult { + + private static final ParseField FIELD_1 = new ParseField("field_1"); + private static final ParseField FIELD_2 = new ParseField("field_2"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("test_result", + a -> new TestResult((String) a[0], (Double) a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), FIELD_1); + PARSER.declareDouble(ConstructingObjectParser.constructorArg(), FIELD_2); + } + + private final String field1; + private final double field2; + + private TestResult(String field1, double field2) { + this.field1 = field1; + this.field2 = field2; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + TestResult that = (TestResult) other; + return Objects.equals(field1, that.field1) && Objects.equals(field2, that.field2); + } + + @Override + public int hashCode() { + return Objects.hash(field1, field2); + } + } +} From 51cc25225e3ed91bfeb9c83d003fa149b3e1a3d0 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 7 Dec 2018 18:10:20 +0000 Subject: [PATCH 2/5] Remove unused import --- .../xpack/ml/action/TransportRunAnalyticsAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java index 967d03b6aa65a..f8f1226492941 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRunAnalyticsAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.xpack.core.ml.action.RunAnalyticsAction; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.analytics.DataFrameFields; import org.elasticsearch.xpack.ml.analytics.process.AnalyticsProcessManager; From 9ebedd9ef06354689f9cd493cfc00a8c09a77f48 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 10 Dec 2018 14:41:40 +0000 Subject: [PATCH 3/5] Also adjust end of data control message to incoming c++ change --- .../ml/analytics/process/AnalyticsControlMessageWriter.java | 2 +- .../analytics/process/AnalyticsControlMessageWriterTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriter.java index ff51ef1122c8d..0500b51f85b2a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriter.java @@ -18,7 +18,7 @@ public class AnalyticsControlMessageWriter extends AbstractControlMsgWriter { * but in the context of the java side it is more descriptive to call this the * end of data message. */ - private static final String END_OF_DATA_MESSAGE_CODE = "r"; + private static final String END_OF_DATA_MESSAGE_CODE = "$"; /** * Construct the control message writer with a LengthEncodedWriter diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriterTests.java index 1adf91fa884c8..3845b9df26b4c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsControlMessageWriterTests.java @@ -35,7 +35,7 @@ public void testWriteEndOfData() throws IOException { InOrder inOrder = inOrder(lengthEncodedWriter); inOrder.verify(lengthEncodedWriter).writeNumFields(4); inOrder.verify(lengthEncodedWriter, times(3)).writeField(""); - inOrder.verify(lengthEncodedWriter).writeField("r"); + inOrder.verify(lengthEncodedWriter).writeField("$"); StringBuilder spaces = new StringBuilder(); IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' ')); From 3cabd4f7355bc7b3074380cfdd195be9591f7a79 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 10 Dec 2018 18:09:24 +0000 Subject: [PATCH 4/5] Add basic unit test class for result precessor --- .../AnalyticsResultProcessorTests.java | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java new file mode 100644 index 0000000000000..f00b4db7d9644 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.analytics.process; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; +import org.junit.Before; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class AnalyticsResultProcessorTests extends ESTestCase { + + private Client client; + private AnalyticsProcess process; + private DataFrameDataExtractor dataExtractor; + private ArgumentCaptor bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + + @Before + public void setUpMocks() { + client = mock(Client.class); + process = mock(AnalyticsProcess.class); + dataExtractor = mock(DataFrameDataExtractor.class); + } + + public void testProcess_GivenNoResults() { + givenProcessResults(Collections.emptyList()); + AnalyticsResultProcessor resultProcessor = createResultProcessor(); + + resultProcessor.process(process); + resultProcessor.awaitForCompletion(); + + verifyNoMoreInteractions(client); + } + + public void testProcess_GivenSingleRowAndResult() throws IOException { + givenClientHasNoFailures(); + + String dataDoc = "{\"f_1\": \"foo\", \"f_2\": 42.0}"; + String[] dataValues = {"42.0"}; + DataFrameDataExtractor.Row row = newRow(newHit("1", dataDoc), dataValues); + givenSingleDataFrameBatch(Arrays.asList(row)); + + Map resultFields = new HashMap<>(); + resultFields.put("a", "1"); + resultFields.put("b", "2"); + AnalyticsResult result = new AnalyticsResult("some_hash", resultFields); + givenProcessResults(Arrays.asList(result)); + + AnalyticsResultProcessor resultProcessor = createResultProcessor(); + + resultProcessor.process(process); + resultProcessor.awaitForCompletion(); + + List capturedBulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(capturedBulkRequests.size(), equalTo(1)); + BulkRequest capturedBulkRequest = capturedBulkRequests.get(0); + assertThat(capturedBulkRequest.numberOfActions(), equalTo(1)); + IndexRequest indexRequest = (IndexRequest) capturedBulkRequest.requests().get(0); + Map indexedDocSource = indexRequest.sourceAsMap(); + assertThat(indexedDocSource.size(), equalTo(4)); + assertThat(indexedDocSource.get("f_1"), equalTo("foo")); + assertThat(indexedDocSource.get("f_2"), equalTo(42.0)); + assertThat(indexedDocSource.get("a"), equalTo("1")); + assertThat(indexedDocSource.get("b"), equalTo("2")); + } + + private void givenProcessResults(List results) { + when(process.readAnalyticsResults()).thenReturn(results.iterator()); + } + + private void givenSingleDataFrameBatch(List batch) throws IOException { + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.of(batch)).thenReturn(Optional.empty()); + } + + private static SearchHit newHit(String id, String json) { + SearchHit hit = new SearchHit(42, id, new Text("doc"), Collections.emptyMap()); + hit.sourceRef(new BytesArray(json)); + return hit; + } + + private static DataFrameDataExtractor.Row newRow(SearchHit hit, String[] values) { + DataFrameDataExtractor.Row row = mock(DataFrameDataExtractor.Row.class); + when(row.getHit()).thenReturn(hit); + when(row.getValues()).thenReturn(values); + return row; + } + + private void givenClientHasNoFailures() { + ActionFuture responseFuture = mock(ActionFuture.class); + when(responseFuture.actionGet()).thenReturn(new BulkResponse(new BulkItemResponse[0], 0)); + when(client.execute(same(BulkAction.INSTANCE), bulkRequestCaptor.capture())).thenReturn(responseFuture); + } + + private AnalyticsResultProcessor createResultProcessor() { + return new AnalyticsResultProcessor(client, dataExtractor); + } +} From 57649141e587368344a19e98bd6852f1ce666a96 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 11 Dec 2018 11:43:43 +0000 Subject: [PATCH 5/5] Lowercase logger and use of LinkedHashMap --- .../ml/analytics/process/AnalyticsResultProcessor.java | 4 ++-- .../xpack/ml/process/ProcessResultsParser.java | 6 +++--- .../ml/analytics/process/AnalyticsResultProcessorTests.java | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java index 6f55c53ea7770..bdb1526b1b78a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessor.java @@ -17,8 +17,8 @@ import org.elasticsearch.xpack.ml.analytics.DataFrameDataExtractor; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -97,7 +97,7 @@ private void joinCurrentResults() { } AnalyticsResult result = currentResults.get(i); SearchHit hit = row.getHit(); - Map source = new HashMap(hit.getSourceAsMap()); + Map source = new LinkedHashMap(hit.getSourceAsMap()); source.putAll(result.getResults()); IndexRequest indexRequest = new IndexRequest(hit.getIndex(), hit.getType(), hit.getId()); indexRequest.source(source); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java index f8d0c4f746c6a..609c45659dd6c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessResultsParser.java @@ -29,7 +29,7 @@ */ public class ProcessResultsParser { - private static final Logger LOGGER = LogManager.getLogger(ProcessResultsParser.class); + private static final Logger logger = LogManager.getLogger(ProcessResultsParser.class); private final ConstructingObjectParser resultParser; @@ -69,13 +69,13 @@ public boolean hasNext() { try { token = parser.nextToken(); } catch (IOException e) { - LOGGER.debug("io error while parsing", e); + logger.debug("io error while parsing", e); return false; } if (token == XContentParser.Token.END_ARRAY) { return false; } else if (token != XContentParser.Token.START_OBJECT) { - LOGGER.error("Expecting Json Field name token after the Start Object token"); + logger.error("Expecting Json Field name token after the Start Object token"); throw new ElasticsearchParseException("unexpected token [" + token + "]"); } return true; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java index f00b4db7d9644..3d71b4d430e2b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/analytics/process/AnalyticsResultProcessorTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.analytics.process; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest;