Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@
* A Trie structure for analysing byte streams for duplicate sequences. Bytes
* from a stream are added one at a time using the addByte method and the number
* of times it has been seen as part of a sequence is returned.
*
* <p>
* The minimum required length for a duplicate sequence detected is 6 bytes.
*
* <p>
* The design goals are to maximize speed of lookup while minimizing the space
* required to do so. This has led to a hybrid solution for representing the
* bytes that make up a sequence in the trie.
*
* <p>
* If we have 6 bytes in sequence e.g. abcdef then they are represented as
* object nodes in the tree as follows:
* <p>
* (a)-(b)-(c)-(def as an int)
* <p>
*
*
* {@link RootTreeNode} objects are used for the first two levels of the tree
* (representing bytes a and b in the example sequence). The combinations of
* objects at these 2 levels are few so internally these objects allocate an
Expand All @@ -61,11 +59,9 @@
* reached
* <li>halting any growth of the tree
* </ol>
*
* Tests on real-world-text show that the size of the tree is a multiple of the
* input text where that multiplier varies between 10 and 5 times as the content
* size increased from 10 to 100 megabytes of content.
*
*/
public class DuplicateByteSequenceSpotter {
public static final int TREE_DEPTH = 6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,23 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.LongConsumer;

/**
* An aggregator of string values that hashes the strings on the fly rather
* than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
*/
public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
private final CollectorSource collectorSource;
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource valuesSource;
private final BytesKeyedBucketOrds bucketOrds;
private final IncludeExclude.StringFilter includeExclude;

public MapStringTermsAggregator(
String name,
AggregatorFactories factories,
CollectorSource collectorSource,
Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
ValuesSource valuesSource,
BucketOrder order,
DocValueFormat format,
BucketCountThresholds bucketCountThresholds,
Expand All @@ -75,56 +76,39 @@ public MapStringTermsAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
this.collectorSource = collectorSource;
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
if (collectorSource.needsScores()) {
return ScoreMode.COMPLETE;
}
return super.scoreMode();
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();

// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return resultStrategy.wrapCollector(
collectorSource.getLeafCollector(
includeExclude,
ctx,
sub,
this::addRequestCircuitBreakerBytes,
(s, doc, owningBucketOrd, bytes) -> {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
collectExistingBucket(s, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
collectBucket(s, doc, bucketOrdinal);
}
previous.copyBytes(bytes);
}
}
});
)
);
}

@Override
Expand All @@ -146,7 +130,82 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {

@Override
public void doClose() {
Releasables.close(bucketOrds, resultStrategy);
Releasables.close(collectorSource, resultStrategy, bucketOrds);
}

/**
* Abstaction on top of building collectors to fetch values.
*/
public interface CollectorSource extends Releasable {
boolean needsScores();

LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException;
}
@FunctionalInterface
public interface CollectConsumer {
void accept(LeafBucketCollector sub, int doc, long owningBucketOrd, BytesRef bytes) throws IOException;
}

/**
* Fetch values from a {@link ValuesSource}.
*/
public static class ValuesSourceCollectorSource implements CollectorSource {
private final ValuesSource valuesSource;

public ValuesSourceCollectorSource(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}

@Override
public boolean needsScores() {
return valuesSource.needsScores();
}

@Override
public LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();

// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
previous.copyBytes(bytes);
consumer.accept(sub, doc, owningBucketOrd, bytes);
}
}
};
}

@Override
public void close() {}
}

/**
Expand Down Expand Up @@ -270,6 +329,12 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
* Builds results for the standard {@code terms} aggregation.
*/
class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket> {
private final ValuesSource valuesSource;

StandardTermsResults(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}

@Override
String describe() {
return "terms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;

Expand All @@ -62,14 +63,17 @@ interface BackgroundFrequencyForLong extends Releasable {
}

private final QueryShardContext context;
private final ValuesSourceConfig config;
private final MappedFieldType fieldType;
private final DocValueFormat format;
private final Query backgroundFilter;
private final int supersetNumDocs;
private TermsEnum termsEnum;

SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException {
SignificanceLookup(QueryShardContext context, MappedFieldType fieldType, DocValueFormat format, QueryBuilder backgroundFilter)
throws IOException {
this.context = context;
this.config = config;
this.fieldType = fieldType;
this.format = format;
this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context);
/*
* We need to use a superset size that includes deleted docs or we
Expand Down Expand Up @@ -129,7 +133,7 @@ public void close() {
* Get the background frequency of a {@link BytesRef} term.
*/
private long getBackgroundFrequency(BytesRef term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}

/**
Expand Down Expand Up @@ -174,7 +178,7 @@ public void close() {
* Get the background frequency of a {@code long} term.
*/
private long getBackgroundFrequency(long term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}

private long getBackgroundFrequency(Query query) throws IOException {
Expand All @@ -201,7 +205,7 @@ private TermsEnum getTermsEnum(String field) throws IOException {
return termsEnum;
}
IndexReader reader = context.getIndexReader();
termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter);
termsEnum = new FilterableTermsEnum(reader, fieldType.name(), PostingsEnum.NONE, backgroundFilter);
return termsEnum;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,12 @@ protected Aggregator doCreateInternal(SearchContext searchContext,
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter);
SignificanceLookup lookup = new SignificanceLookup(
queryShardContext,
config.fieldContext().fieldType(),
config.format(),
backgroundFilter
);

return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(),
bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
Expand Down Expand Up @@ -256,8 +261,8 @@ Aggregator create(String name,
return new MapStringTermsAggregator(
name,
factories,
new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
valuesSource,
null,
format,
bucketCountThresholds,
Expand Down
Loading