diff --git a/server/src/main/java/org/apache/lucene/analysis/miscellaneous/DuplicateByteSequenceSpotter.java b/server/src/main/java/org/apache/lucene/analysis/miscellaneous/DuplicateByteSequenceSpotter.java index 7a58eaa1375f1..25ceb67211abd 100644 --- a/server/src/main/java/org/apache/lucene/analysis/miscellaneous/DuplicateByteSequenceSpotter.java +++ b/server/src/main/java/org/apache/lucene/analysis/miscellaneous/DuplicateByteSequenceSpotter.java @@ -25,20 +25,18 @@ * A Trie structure for analysing byte streams for duplicate sequences. Bytes * from a stream are added one at a time using the addByte method and the number * of times it has been seen as part of a sequence is returned. - * + *

* The minimum required length for a duplicate sequence detected is 6 bytes. - * + *

* The design goals are to maximize speed of lookup while minimizing the space * required to do so. This has led to a hybrid solution for representing the * bytes that make up a sequence in the trie. - * + *

* If we have 6 bytes in sequence e.g. abcdef then they are represented as * object nodes in the tree as follows: *

* (a)-(b)-(c)-(def as an int) *

- * - * * {@link RootTreeNode} objects are used for the first two levels of the tree * (representing bytes a and b in the example sequence). The combinations of * objects at these 2 levels are few so internally these objects allocate an @@ -61,11 +59,9 @@ * reached *

  • halting any growth of the tree * - * * Tests on real-world-text show that the size of the tree is a multiple of the * input text where that multiplier varies between 10 and 5 times as the content * size increased from 10 to 100 megabytes of content. - * */ public class DuplicateByteSequenceSpotter { public static final int TREE_DEPTH = 6; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 05dbdfdb965f0..b467d6cefd375 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -47,22 +47,23 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.LongConsumer; /** * An aggregator of string values that hashes the strings on the fly rather * than up front like the {@link GlobalOrdinalsStringTermsAggregator}. */ public class MapStringTermsAggregator extends AbstractStringTermsAggregator { + private final CollectorSource collectorSource; private final ResultStrategy resultStrategy; - private final ValuesSource valuesSource; private final BytesKeyedBucketOrds bucketOrds; private final IncludeExclude.StringFilter includeExclude; public MapStringTermsAggregator( String name, AggregatorFactories factories, + CollectorSource collectorSource, Function> resultStrategy, - ValuesSource valuesSource, BucketOrder order, DocValueFormat format, BucketCountThresholds bucketCountThresholds, @@ -75,56 +76,39 @@ public MapStringTermsAggregator( Map metadata ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); + this.collectorSource = collectorSource; this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. - this.valuesSource = valuesSource; this.includeExclude = includeExclude; bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); } @Override public ScoreMode scoreMode() { - if (valuesSource != null && valuesSource.needsScores()) { + if (collectorSource.needsScores()) { return ScoreMode.COMPLETE; } return super.scoreMode(); } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - SortedBinaryDocValues values = valuesSource.bytesValues(ctx); - return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) { - final BytesRefBuilder previous = new BytesRefBuilder(); - - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (false == values.advanceExact(doc)) { - return; - } - int valuesCount = values.docValueCount(); - - // SortedBinaryDocValues don't guarantee uniqueness so we - // need to take care of dups - previous.clear(); - for (int i = 0; i < valuesCount; ++i) { - final BytesRef bytes = values.nextValue(); - if (includeExclude != null && false == includeExclude.accept(bytes)) { - continue; - } - if (i > 0 && previous.get().equals(bytes)) { - continue; - } + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + return resultStrategy.wrapCollector( + collectorSource.getLeafCollector( + includeExclude, + ctx, + sub, + this::addRequestCircuitBreakerBytes, + (s, doc, owningBucketOrd, bytes) -> { long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes); if (bucketOrdinal < 0) { // already seen bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); + collectExistingBucket(s, doc, bucketOrdinal); } else { - collectBucket(sub, doc, bucketOrdinal); + collectBucket(s, doc, bucketOrdinal); } - previous.copyBytes(bytes); } - } - }); + ) + ); } @Override @@ -146,7 +130,82 @@ public void collectDebugInfo(BiConsumer add) { @Override public void doClose() { - Releasables.close(bucketOrds, resultStrategy); + Releasables.close(collectorSource, resultStrategy, bucketOrds); + } + + /** + * Abstaction on top of building collectors to fetch values. + */ + public interface CollectorSource extends Releasable { + boolean needsScores(); + + LeafBucketCollector getLeafCollector( + IncludeExclude.StringFilter includeExclude, + LeafReaderContext ctx, + LeafBucketCollector sub, + LongConsumer addRequestCircuitBreakerBytes, + CollectConsumer consumer + ) throws IOException; + } + @FunctionalInterface + public interface CollectConsumer { + void accept(LeafBucketCollector sub, int doc, long owningBucketOrd, BytesRef bytes) throws IOException; + } + + /** + * Fetch values from a {@link ValuesSource}. + */ + public static class ValuesSourceCollectorSource implements CollectorSource { + private final ValuesSource valuesSource; + + public ValuesSourceCollectorSource(ValuesSource valuesSource) { + this.valuesSource = valuesSource; + } + + @Override + public boolean needsScores() { + return valuesSource.needsScores(); + } + + @Override + public LeafBucketCollector getLeafCollector( + IncludeExclude.StringFilter includeExclude, + LeafReaderContext ctx, + LeafBucketCollector sub, + LongConsumer addRequestCircuitBreakerBytes, + CollectConsumer consumer + ) throws IOException { + SortedBinaryDocValues values = valuesSource.bytesValues(ctx); + return new LeafBucketCollectorBase(sub, values) { + final BytesRefBuilder previous = new BytesRefBuilder(); + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (false == values.advanceExact(doc)) { + return; + } + int valuesCount = values.docValueCount(); + + // SortedBinaryDocValues don't guarantee uniqueness so we + // need to take care of dups + previous.clear(); + for (int i = 0; i < valuesCount; ++i) { + BytesRef bytes = values.nextValue(); + if (includeExclude != null && false == includeExclude.accept(bytes)) { + continue; + } + if (i > 0 && previous.get().equals(bytes)) { + continue; + } + previous.copyBytes(bytes); + consumer.accept(sub, doc, owningBucketOrd, bytes); + } + } + }; + } + + @Override + public void close() {} } /** @@ -270,6 +329,12 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws * Builds results for the standard {@code terms} aggregation. */ class StandardTermsResults extends ResultStrategy { + private final ValuesSource valuesSource; + + StandardTermsResults(ValuesSource valuesSource) { + this.valuesSource = valuesSource; + } + @Override String describe() { return "terms"; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificanceLookup.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificanceLookup.java index 4a6764fb4acd2..dbe48bb040f92 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificanceLookup.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificanceLookup.java @@ -36,10 +36,11 @@ import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; @@ -62,14 +63,17 @@ interface BackgroundFrequencyForLong extends Releasable { } private final QueryShardContext context; - private final ValuesSourceConfig config; + private final MappedFieldType fieldType; + private final DocValueFormat format; private final Query backgroundFilter; private final int supersetNumDocs; private TermsEnum termsEnum; - SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException { + SignificanceLookup(QueryShardContext context, MappedFieldType fieldType, DocValueFormat format, QueryBuilder backgroundFilter) + throws IOException { this.context = context; - this.config = config; + this.fieldType = fieldType; + this.format = format; this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context); /* * We need to use a superset size that includes deleted docs or we @@ -129,7 +133,7 @@ public void close() { * Get the background frequency of a {@link BytesRef} term. */ private long getBackgroundFrequency(BytesRef term) throws IOException { - return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context)); + return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context)); } /** @@ -174,7 +178,7 @@ public void close() { * Get the background frequency of a {@code long} term. */ private long getBackgroundFrequency(long term) throws IOException { - return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context)); + return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context)); } private long getBackgroundFrequency(Query query) throws IOException { @@ -201,7 +205,7 @@ private TermsEnum getTermsEnum(String field) throws IOException { return termsEnum; } IndexReader reader = context.getIndexReader(); - termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter); + termsEnum = new FilterableTermsEnum(reader, fieldType.name(), PostingsEnum.NONE, backgroundFilter); return termsEnum; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 5e92cc9edee1a..174a4c1fcef98 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -227,7 +227,12 @@ protected Aggregator doCreateInternal(SearchContext searchContext, bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } - SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter); + SignificanceLookup lookup = new SignificanceLookup( + queryShardContext, + config.fieldContext().fieldType(), + config.format(), + backgroundFilter + ); return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(), bucketCountThresholds, includeExclude, executionHint, searchContext, parent, @@ -256,8 +261,8 @@ Aggregator create(String name, return new MapStringTermsAggregator( name, factories, + new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource), a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket), - valuesSource, null, format, bucketCountThresholds, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java deleted file mode 100644 index 87036bf69a987..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter; -import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter; -import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BytesRefHash; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; -import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.lookup.SourceLookup; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static java.util.Collections.emptyList; - -public class SignificantTextAggregator extends BucketsAggregator { - - private final StringFilter includeExclude; - protected final BucketCountThresholds bucketCountThresholds; - protected long numCollectedDocs; - private final BytesRefHash bucketOrds; - private final SignificanceHeuristic significanceHeuristic; - private SignificantTextAggregatorFactory termsAggFactory; - private final DocValueFormat format = DocValueFormat.RAW; - private final String fieldName; - private final String[] sourceFieldNames; - private DuplicateByteSequenceSpotter dupSequenceSpotter = null ; - private long lastTrieSize; - private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000; - - - - public SignificantTextAggregator(String name, AggregatorFactories factories, - SearchContext context, Aggregator parent, - BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude, - SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory, - String fieldName, String [] sourceFieldNames, boolean filterDuplicateText, - Map metadata) throws IOException { - super(name, factories, context, parent, metadata); - this.bucketCountThresholds = bucketCountThresholds; - this.includeExclude = includeExclude; - this.significanceHeuristic = significanceHeuristic; - this.termsAggFactory = termsAggFactory; - this.fieldName = fieldName; - this.sourceFieldNames = sourceFieldNames; - bucketOrds = new BytesRefHash(1, context.bigArrays()); - if(filterDuplicateText){ - dupSequenceSpotter = new DuplicateByteSequenceSpotter(); - lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes(); - } - } - - - - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - final BytesRefBuilder previous = new BytesRefBuilder(); - return new LeafBucketCollectorBase(sub, null) { - - @Override - public void collect(int doc, long bucket) throws IOException { - collectFromSource(doc, bucket, fieldName, sourceFieldNames); - numCollectedDocs++; - if (dupSequenceSpotter != null) { - dupSequenceSpotter.startNewSequence(); - } - } - - private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText) - throws IOException{ - if (dupSequenceSpotter != null) { - ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter); - } - CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class); - ts.reset(); - try { - while (ts.incrementToken()) { - if (dupSequenceSpotter != null) { - long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes(); - long growth = newTrieSize - lastTrieSize; - // Only update the circuitbreaker after - if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) { - addRequestCircuitBreakerBytes(growth); - lastTrieSize = newTrieSize; - } - } - previous.clear(); - previous.copyChars(termAtt); - BytesRef bytes = previous.get(); - if (inDocTerms.add(bytes) >= 0) { - if (includeExclude == null || includeExclude.accept(bytes)) { - long bucketOrdinal = bucketOrds.add(bytes); - if (bucketOrdinal < 0) { // already seen - bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); - } else { - collectBucket(sub, doc, bucketOrdinal); - } - } - } - } - - } finally{ - ts.close(); - } - } - - private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException { - MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName); - if(fieldType == null){ - throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName - +"] since it is not present"); - } - - SourceLookup sourceLookup = context.lookup().source(); - sourceLookup.setSegmentAndDocument(ctx, doc); - BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays()); - - try { - for (String sourceField : sourceFieldNames) { - List textsToHighlight = sourceLookup.extractRawValues(sourceField); - textsToHighlight = textsToHighlight.stream().map(obj -> { - if (obj instanceof BytesRef) { - return fieldType.valueForDisplay(obj).toString(); - } else { - return obj; - } - }).collect(Collectors.toList()); - - Analyzer analyzer = fieldType.indexAnalyzer(); - for (Object fieldValue : textsToHighlight) { - String fieldText = fieldValue.toString(); - TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText); - processTokenStream(doc, bucket, ts, inDocTerms, fieldText); - } - } - } finally{ - Releasables.close(inDocTerms); - } - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - - final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - long supersetSize = termsAggFactory.getSupersetNumDocs(); - long subsetSize = numCollectedDocs; - - BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); - SignificantStringTerms.Bucket spare = null; - for (int i = 0; i < bucketOrds.size(); i++) { - final int docCount = bucketDocCount(i); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - - if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); - } - - bucketOrds.get(i, spare.termBytes); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); - spare.supersetSize = supersetSize; - // During shard-local down-selection we use subset/superset stats - // that are for this shard only - // Back at the central reducer these properties will be updated with - // global stats - spare.updateScore(significanceHeuristic); - - spare.bucketOrd = i; - spare = ordered.insertWithOverflow(spare); - } - - final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point - list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes); - } - buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - - return new InternalAggregation[] { new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)) - }; - } - - - @Override - public SignificantStringTerms buildEmptyAggregation() { - // We need to account for the significance of a miss in our global stats - provide corpus size as context - ContextIndexSearcher searcher = context.searcher(); - IndexReader topReader = searcher.getIndexReader(); - int supersetSize = topReader.numDocs(); - return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, 0, supersetSize, significanceHeuristic, emptyList()); - } - - @Override - public void doClose() { - Releasables.close(bucketOrds, termsAggFactory); - } - -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java index 65972b4890bee..69af282e75b54 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java @@ -19,52 +19,56 @@ package org.elasticsearch.search.aggregations.bucket.terms; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.PostingsEnum; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.BooleanClause.Occur; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.TermQuery; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter; +import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lucene.index.FilterableTermsEnum; -import org.elasticsearch.common.lucene.index.FreqTermsEnum; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter; +import org.elasticsearch.search.aggregations.bucket.terms.MapStringTermsAggregator.CollectConsumer; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; +import java.util.Iterator; import java.util.Map; +import java.util.function.LongConsumer; -public class SignificantTextAggregatorFactory extends AggregatorFactory - implements Releasable { +public class SignificantTextAggregatorFactory extends AggregatorFactory { + private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000; private final IncludeExclude includeExclude; - private String indexedFieldName; - private MappedFieldType fieldType; + private final String indexedFieldName; + private final MappedFieldType fieldType; private final String[] sourceFieldNames; - private FilterableTermsEnum termsEnum; - private int numberOfAggregatorsCreated; - private final Query filter; - private final int supersetNumDocs; + private final QueryBuilder backgroundFilter; private final TermsAggregator.BucketCountThresholds bucketCountThresholds; private final SignificanceHeuristic significanceHeuristic; - private final DocValueFormat format = DocValueFormat.RAW; private final boolean filterDuplicateText; public SignificantTextAggregatorFactory(String name, IncludeExclude includeExclude, - QueryBuilder filterBuilder, + QueryBuilder backgroundFilter, TermsAggregator.BucketCountThresholds bucketCountThresholds, SignificanceHeuristic significanceHeuristic, QueryShardContext queryShardContext, @@ -84,97 +88,18 @@ public SignificantTextAggregatorFactory(String name, "requires an analyzed field"); } this.indexedFieldName = fieldType != null ? fieldType.name() : fieldName; - this.sourceFieldNames = sourceFieldNames == null - ? new String[] { indexedFieldName } - : sourceFieldNames; + this.sourceFieldNames = sourceFieldNames == null ? new String[] { indexedFieldName } : sourceFieldNames; this.includeExclude = includeExclude; - this.filter = filterBuilder == null - ? null - : filterBuilder.toQuery(queryShardContext); + this.backgroundFilter = backgroundFilter; this.filterDuplicateText = filterDuplicateText; - IndexSearcher searcher = queryShardContext.searcher(); - // Important - need to use the doc count that includes deleted docs - // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951 - this.supersetNumDocs = filter == null - ? searcher.getIndexReader().maxDoc() - : searcher.count(filter); this.bucketCountThresholds = bucketCountThresholds; this.significanceHeuristic = significanceHeuristic; } - /** - * Get the number of docs in the superset. - */ - public long getSupersetNumDocs() { - return supersetNumDocs; - } - - private FilterableTermsEnum getTermsEnum(String field) throws IOException { - if (termsEnum != null) { - return termsEnum; - } - IndexReader reader = queryShardContext.getIndexReader(); - if (numberOfAggregatorsCreated > 1) { - termsEnum = new FreqTermsEnum(reader, field, true, false, filter, queryShardContext.bigArrays()); - } else { - termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter); - } - return termsEnum; - } - - private long getBackgroundFrequency(String value) throws IOException { - // fieldType can be null if the field is unmapped, but theoretically this method should only be called - // when constructing buckets. Assert to ensure this is the case - // TODO this is a bad setup and it should be refactored - assert fieldType != null; - Query query = fieldType.termQuery(value, queryShardContext); - if (query instanceof TermQuery) { - // for types that use the inverted index, we prefer using a caching terms - // enum that will do a better job at reusing index inputs - Term term = ((TermQuery) query).getTerm(); - FilterableTermsEnum termsEnum = getTermsEnum(term.field()); - if (termsEnum.seekExact(term.bytes())) { - return termsEnum.docFreq(); - } else { - return 0; - } - } - // otherwise do it the naive way - if (filter != null) { - query = new BooleanQuery.Builder() - .add(query, Occur.FILTER) - .add(filter, Occur.FILTER) - .build(); - } - return queryShardContext.searcher().count(query); - } - - public long getBackgroundFrequency(BytesRef termBytes) throws IOException { - String value = format.format(termBytes).toString(); - return getBackgroundFrequency(value); - } - - - @Override - public void close() { - try { - if (termsEnum instanceof Releasable) { - ((Releasable) termsEnum).close(); - } - } finally { - termsEnum = null; - } - } - @Override protected Aggregator createInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket, Map metadata) throws IOException { - if (collectsFromSingleBucket == false) { - return asMultiBucketAggregator(this, searchContext, parent); - } - - numberOfAggregatorsCreated++; BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) { // The user has not made a shardSize selection. @@ -194,8 +119,166 @@ protected Aggregator createInternal(SearchContext searchContext, Aggregator pare IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null: includeExclude.convertToStringFilter(DocValueFormat.RAW); - return new SignificantTextAggregator(name, factories, searchContext, parent, bucketCountThresholds, - incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metadata); + MapStringTermsAggregator.CollectorSource collectorSource = new SignificantTextCollectorSource( + queryShardContext.lookup().source(), + queryShardContext.bigArrays(), + fieldType, + sourceFieldNames, + filterDuplicateText + ); + SignificanceLookup lookup = new SignificanceLookup(queryShardContext, fieldType, DocValueFormat.RAW, backgroundFilter); + return new MapStringTermsAggregator( + name, + factories, + collectorSource, + a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket), + null, + DocValueFormat.RAW, + bucketCountThresholds, + incExcFilter, + searchContext, + parent, + SubAggCollectionMode.BREADTH_FIRST, + false, + collectsFromSingleBucket, + metadata + ); + } + + private static class SignificantTextCollectorSource implements MapStringTermsAggregator.CollectorSource { + private final SourceLookup sourceLookup; + private final BigArrays bigArrays; + private final MappedFieldType fieldType; + private final String[] sourceFieldNames; + private ObjectArray dupSequenceSpotters; + + SignificantTextCollectorSource( + SourceLookup sourceLookup, + BigArrays bigArrays, + MappedFieldType fieldType, + String[] sourceFieldNames, + boolean filterDuplicateText + ) { + this.sourceLookup = sourceLookup; + this.bigArrays = bigArrays; + this.fieldType = fieldType; + this.sourceFieldNames = sourceFieldNames; + dupSequenceSpotters = filterDuplicateText ? bigArrays.newObjectArray(1) : null; + } + + @Override + public boolean needsScores() { + return false; + } + + @Override + public LeafBucketCollector getLeafCollector( + StringFilter includeExclude, + LeafReaderContext ctx, + LeafBucketCollector sub, + LongConsumer addRequestCircuitBreakerBytes, + CollectConsumer consumer + ) throws IOException { + return new LeafBucketCollectorBase(sub, null) { + private final BytesRefBuilder scratch = new BytesRefBuilder(); + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (dupSequenceSpotters == null) { + collectFromSource(doc, owningBucketOrd, null); + return; + } + dupSequenceSpotters = bigArrays.grow(dupSequenceSpotters, owningBucketOrd + 1); + DuplicateByteSequenceSpotter spotter = dupSequenceSpotters.get(owningBucketOrd); + if (spotter == null) { + spotter = new DuplicateByteSequenceSpotter(); + dupSequenceSpotters.set(owningBucketOrd, spotter); + } + collectFromSource(doc, owningBucketOrd, spotter); + spotter.startNewSequence(); + } + + private void collectFromSource(int doc, long owningBucketOrd, DuplicateByteSequenceSpotter spotter) throws IOException { + sourceLookup.setSegmentAndDocument(ctx, doc); + BytesRefHash inDocTerms = new BytesRefHash(256, bigArrays); + + try { + for (String sourceField : sourceFieldNames) { + Iterator itr = sourceLookup.extractRawValues(sourceField).stream() + .map(obj -> { + if (obj == null) { + return null; + } + if (obj instanceof BytesRef) { + return fieldType.valueForDisplay(obj).toString(); + } + return obj.toString(); + }) + .iterator(); + Analyzer analyzer = fieldType.indexAnalyzer(); + while (itr.hasNext()) { + TokenStream ts = analyzer.tokenStream(fieldType.name(), itr.next()); + processTokenStream(doc, owningBucketOrd, ts, inDocTerms, spotter); + } + } + } finally { + Releasables.close(inDocTerms); + } + } + private void processTokenStream( + int doc, + long owningBucketOrd, + TokenStream ts, + BytesRefHash inDocTerms, + DuplicateByteSequenceSpotter spotter + ) throws IOException { + long lastTrieSize = 0; + if (spotter != null) { + lastTrieSize = spotter.getEstimatedSizeInBytes(); + ts = new DeDuplicatingTokenFilter(ts, spotter); + } + CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class); + ts.reset(); + try { + while (ts.incrementToken()) { + if (spotter != null) { + long newTrieSize = spotter.getEstimatedSizeInBytes(); + long growth = newTrieSize - lastTrieSize; + // Only update the circuitbreaker after + if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) { + addRequestCircuitBreakerBytes.accept(growth); + lastTrieSize = newTrieSize; + } + } + + scratch.clear(); + scratch.copyChars(termAtt); + BytesRef bytes = scratch.get(); + if (includeExclude != null && includeExclude.accept(bytes)) { + continue; + } + if (inDocTerms.add(bytes) < 0) { + continue; + } + consumer.accept(sub, doc, owningBucketOrd, bytes); + } + } finally { + ts.close(); + } + if (spotter != null) { + long growth = spotter.getEstimatedSizeInBytes() - lastTrieSize; + if (growth > 0) { + addRequestCircuitBreakerBytes.accept(growth); + } + } + } + }; + } + + @Override + public void close() { + Releasables.close(dupSequenceSpotters); + } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 0a728f439339d..ed0ebd42854f0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -311,8 +311,8 @@ Aggregator create(String name, return new MapStringTermsAggregator( name, factories, - a -> a.new StandardTermsResults(), - valuesSource, + new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource), + a -> a.new StandardTermsResults(valuesSource), order, format, bucketCountThresholds, diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java index 28efafacff9bf..59bdda66dea41 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.mapper.BinaryFieldMapper; -import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType; @@ -56,7 +55,6 @@ import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregatorFactory.ExecutionMode; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.ValuesSourceType; -import org.junit.Before; import java.io.IOException; import java.util.Arrays; @@ -70,15 +68,6 @@ import static org.hamcrest.Matchers.equalTo; public class SignificantTermsAggregatorTests extends AggregatorTestCase { - - private MappedFieldType fieldType; - - @Before - public void setUpTest() throws Exception { - super.setUp(); - fieldType = new KeywordFieldMapper.KeywordFieldType("field"); - } - @Override protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { return new SignificantTermsAggregationBuilder("foo").field(fieldName); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java index db39925db9da9..543c9429cdd36 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -29,6 +30,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; @@ -102,7 +104,7 @@ public void testSignificance() throws IOException { indexWriterConfig.setMaxBufferedDocs(100); indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) { - indexDocuments(w, textFieldType); + indexDocuments(w); SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true); if(randomBoolean()){ @@ -150,7 +152,7 @@ public void testFieldAlias() throws IOException { indexWriterConfig.setMaxBufferedDocs(100); indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) { - indexDocuments(w, textFieldType); + indexDocuments(w); SignificantTextAggregationBuilder agg = significantText("sig_text", "text") .filterDuplicateText(true); @@ -193,14 +195,50 @@ public void testFieldAlias() throws IOException { } } - private void indexDocuments(IndexWriter writer, TextFieldType textFieldType) throws IOException { + public void testInsideTermsAgg() throws IOException { + TextFieldType textFieldType = new TextFieldType("text"); + textFieldType.setIndexAnalyzer(new NamedAnalyzer("my_analyzer", AnalyzerScope.GLOBAL, new StandardAnalyzer())); + + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + indexWriterConfig.setMaxBufferedDocs(100); + indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment + try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) { + indexDocuments(w); + + SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true); + TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field("kwd").subAggregation(sigAgg); + + try (IndexReader reader = DirectoryReader.open(w)) { + assertEquals("test expects a single segment", 1, reader.leaves().size()); + IndexSearcher searcher = new IndexSearcher(reader); + + StringTerms terms = searchAndReduce(searcher, new MatchAllDocsQuery(), aggBuilder, textFieldType, keywordField("kwd")); + SignificantTerms sigOdd = terms.getBucketByKey("odd").getAggregations().get("sig_text"); + assertNull(sigOdd.getBucketByKey("even")); + assertNull(sigOdd.getBucketByKey("duplicate")); + assertNull(sigOdd.getBucketByKey("common")); + assertNotNull(sigOdd.getBucketByKey("odd")); + + SignificantStringTerms sigEven = terms.getBucketByKey("even").getAggregations().get("sig_text"); + assertNull(sigEven.getBucketByKey("odd")); + assertNull(sigEven.getBucketByKey("duplicate")); + assertNull(sigEven.getBucketByKey("common")); + assertNull(sigEven.getBucketByKey("separator2")); + assertNull(sigEven.getBucketByKey("separator4")); + assertNull(sigEven.getBucketByKey("separator6")); + assertNotNull(sigEven.getBucketByKey("even")); + } + } + } + + private void indexDocuments(IndexWriter writer) throws IOException { for (int i = 0; i < 10; i++) { Document doc = new Document(); StringBuilder text = new StringBuilder("common "); if (i % 2 == 0) { - text.append("odd "); - } else { text.append("even separator" + i + " duplicate duplicate duplicate duplicate duplicate duplicate "); + } else { + text.append("odd "); } doc.add(new Field("text", text.toString(), TextFieldMapper.Defaults.FIELD_TYPE)); @@ -208,6 +246,7 @@ private void indexDocuments(IndexWriter writer, TextFieldType textFieldType) thr " \"json_only_field\" : \"" + text.toString() + "\"" + " }"; doc.add(new StoredField("_source", new BytesRef(json))); + doc.add(new SortedSetDocValuesField("kwd", i % 2 == 0 ? new BytesRef("even") : new BytesRef("odd"))); writer.addDocument(doc); } }