diff --git a/docs/reference/search/aggregations/bucket/significantterms-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/significantterms-aggregation.asciidoc new file mode 100644 index 0000000000000..88712142e8b3b --- /dev/null +++ b/docs/reference/search/aggregations/bucket/significantterms-aggregation.asciidoc @@ -0,0 +1,420 @@ +[[search-aggregations-bucket-significantterms-aggregation]] +=== Significant Terms + +An aggregation that returns interesting or unusual occurrences of terms in a set. + +.Experimental! +[IMPORTANT] +===== +This feature is marked as experimental, and may be subject to change in the +future. If you use this feature, please let us know your experience with it! +===== + +coming[1.1.0] + + +.Example use cases: +* Suggesting "H5N1" when users search for "bird flu" in text +* Identifying the merchant that is the "common point of compromise" from the transaction history of credit card owners reporting loss +* Suggesting keywords relating to stock symbol $ATI for an automated news classifier +* Spotting the fraudulent doctor who is diagnosing more than his fair share of whiplash injuries +* Spotting the tire manufacturer who has a disproportionate number of blow-outs + +In all these cases the terms being selected are not simply the most popular terms in a set. +They are the terms that have undergone a significant change in popularity measured between a _foreground_ and _background_ set. +If the term "H5N1" only exists in 5 documents in a 10 million document index and yet is found in 4 of the 100 documents that make up a user's search results +that is significant and probably very relevant to their search. 5/10,000,000 vs 4/100 is a big swing in frequency. + +==== Single-set analysis + +In the simplest case, the _foreground_ set of interest is the search results matched by a query and the _background_ +set used for statistical comparisons is the index or indices from which the results were gathered. + +Example: + +[source,js] +-------------------------------------------------- +{ + "query" : { + "terms" : {"force" : [ "British Transport Police" ]} + }, + "aggregations" : { + "significantCrimeTypes" : { + "significant_terms" : { "field" : "crime_type" } + } + } +} +-------------------------------------------------- + +Response: + +[source,js] +-------------------------------------------------- +{ + ... + + "aggregations" : { + "significantCrimeTypes" : { + "doc_count": 47347, + "buckets" : [ + { + "key": "Bicycle theft", + "doc_count": 3640, + "score": 0.371235374214817, + "bg_count": 66799 + } + ... + ] + } + } +} +-------------------------------------------------- + +When querying an index of all crimes from all police forces, what these results show is that the British Transport Police force +stand out as a force dealing with a disproportionately large number of bicycle thefts. Ordinarily, bicycle thefts represent only 1% of crimes (66799/5064554) +but for the British Transport Police, who handle crime on railways and stations, 7% of crimes (3640/47347) is +a bike theft. This is a significant seven-fold increase in frequency and so this anomaly was highlighted as the top crime type. + +The problem with using a query to spot anomalies is it only gives us one subset to use for comparisons. +To discover all the other police forces' anomalies we would have to repeat the query for each of the different forces. + +This can be a tedious way to look for unusual patterns in an index + + + +==== Multi-set analysis +A simpler way to perform analysis across multiple categories is to use a parent-level aggregation to segment the data ready for analysis. + + +Example using a parent aggregation for segmentation: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "forces": { + "terms": {"field": "force"}, + "aggregations": { + "significantCrimeTypes": { + "significant_terms": {"field": "crime_type"} + } + } + } + } +} +-------------------------------------------------- + +Response: + +[source,js] +-------------------------------------------------- +{ + ... + + "aggregations": { + "forces": { + "buckets": [ + { + "key": "Metropolitan Police Service", + "doc_count": 894038, + "significantCrimeTypes": { + "doc_count": 894038, + "buckets": [ + { + "key": "Robbery", + "doc_count": 27617, + "score": 0.0599, + "bg_count": 53182 + }, + ... + ] + } + }, + { + "key": "British Transport Police", + "doc_count": 47347, + "significantCrimeTypes": { + "doc_count": 47347, + "buckets": [ + { + "key": "Bicycle theft", + "doc_count": 3640, + "score": 0.371, + "bg_count": 66799 + }, + ... + ] + } + } + ] + } +} + +-------------------------------------------------- + +Now we have anomaly detection for each of the police forces using a single request. + +We can use other forms of top-level aggregations to segment our data, for example segmenting by geographic +area to identify unusual hot-spots of a particular crime type: + + +[source,js] +-------------------------------------------------- +{ + "aggs": { + "hotspots": { + "geohash_grid" : { + "field":"location", + "precision":5, + }, + "aggs": { + "significantCrimeTypes": { + "significant_terms": {"field": "crime_type"} + } + } + } + } +} +-------------------------------------------------- + +This example uses the `geohash_grid` aggregation to create result buckets that represent geographic areas, and inside each +bucket we can identify anomalous levels of a crime type in these tightly-focused areas e.g. + +* Airports exhibit unusual numbers of weapon confiscations +* Universities show uplifts of bicycle thefts + +At a higher geohash_grid zoom-level with larger coverage areas we would start to see where an entire police-force may be +tackling an unusual volume of a particular crime type. + + +Obviously a time-based top-level segmentation would help identify current trends for each point in time +where a simple `terms` aggregation would typically show the very popular "constants" that persist across all time slots. + + + +.How are the scores calculated? +********************************** +The numbers returned for scores are primarily intended for ranking different suggestions sensibly rather than something easily understood by end users. +The scores are derived from the doc frequencies in _foreground_ and _background_ sets. The _absolute_ change in popularity (foregroundPercent - backgroundPercent) would favour +common terms whereas the _relative_ change in popularity (foregroundPercent/ backgroundPercent) would favour rare terms. +Rare vs common is essentially a precision vs recall balance and so the absolute and relative changes are multiplied to provide a sweet spot between precision and recall. + +********************************** + + + +=== Use on free-text fields + +The significant_terms aggregation can be used effectively on tokenized free-text fields to suggest: + +* keywords for refining end-user searches +* keywords for use in percolator queries + +WARNING: Picking a free-text field as the subject of a significant terms analysis can be expensive! It will attempt +to load every unique word into RAM. It is recommended to only use this on smaller indices. + +.Use the _"like this but not this"_ pattern +********************************** +You can spot mis-categorized content by first searching a structured field e.g. `category:adultMovie` and use significant_terms on the +free-text "movie_description" field. Take the suggested words (I'll leave them to your imagination) and then search for all movies NOT marked as category:adultMovie but containing these keywords. +You now have a ranked list of badly-categorized movies that you should reclassify or at least remove from the "familyFriendly" category. + +The significance score from each term can also provide a useful `boost` setting to sort matches. +Using the `minimum_should_match` setting of the `terms` query with the keywords will help control the balance of precision/recall in the result set i.e +a high setting would have a small number of relevant results packed full of keywords and a setting of "1" would produce a more exhaustive results set with all documents containing _any_ keyword. + +********************************** + +[TIP] +============ +.Show significant_terms in context + +Free-text significant_terms are much more easily understood when viewed in context. Take the results of `significant_terms` suggestions from a +free-text field and use them in a `terms` query on the same field with a `highlight` clause to present users with example snippets of documents. When the terms +are presented unstemmed, highlighted, with the right case, in the right order and with some context, their significance/meaning is more readily apparent. +============ + + +=== Limitations + +===== Single _background_ comparison base +The above examples show how to select the _foreground_ set for analysis using a query or parent aggregation to filter but currently there is no means of specifying +a _background_ set other than the index from which all results are ultimately drawn. Sometimes it may prove useful to use a different +background set as the basis for comparisons e.g. to first select the tweets for the TV show "XFactor" and then look +for significant terms in a subset of that content which is from this week. + +===== Significant terms must be indexed values +Unlike the terms aggregation it is currently not possible to use script-generated terms for counting purposes. +Because of the way the significant_terms aggregation must consider both _foreground_ and _background_ frequencies +it would be prohibitively expensive to use a script on the entire index to obtain background frequencies for comparisons. +Also DocValues are not supported as sources of term data for similar reasons. + +===== No analysis of floating point fields +Floating point fields are currently not supported as the subject of significant_terms analysis. +While integer or long fields can be used to represent concepts like bank account numbers or category numbers which +can be interesting to track, floating point fields are usually used to represent quantities of something. +As such, individual floating point terms are not useful for this form of frequency analysis. + +===== Use as a parent aggregation +If there is the equivalent of a `match_all` query or no query criteria providing a subset of the index the significant_terms aggregation should not be used as the +top-most aggregation - in this scenario the _foreground_ set is exactly the same as the _background_ set and +so there is no difference in document frequencies to observe and from which to make sensible suggestions. + +Another consideration is that the significant_terms aggregation produces many candidate results at shard level +that are only later pruned on the reducing node once all statistics from all shards are merged. As a result, +it can be inefficient and costly in terms of RAM to embed large child aggregations under a significant_terms +aggregation that later discards many candidate terms. It is advisable in these cases to perform two searches - the first to provide a rationalized list of +significant_terms and then add this shortlist of terms to a second query to go back and fetch the required child aggregations. + +===== Approximate counts +The counts of how many documents contain a term provided in results are based on summing the samples returned from each shard and +as such may be: + +* low if certain shards did not provide figures for a given term in their top sample +* high when considering the background frequency as it may count occurrences found in deleted documents + +Like most design decisions, this is the basis of a trade-off in which we have chosen to provide fast performance at the cost of some (typically small) inaccuracies. +However, the `size` and `shard size` settings covered in the next section provide tools to help control the accuracy levels. + + +=== Parameters + + +==== Size & Shard Size + +The `size` parameter can be set to define how many term buckets should be returned out of the overall terms list. By +default, the node coordinating the search process will request each shard to provide its own top term buckets +and once all shards respond, it will reduce the results to the final list that will then be returned to the client. +If the number of unique terms is greater than `size`, the returned list can be slightly off and not accurate +(it could be that the term counts are slightly off and it could even be that a term that should have been in the top +size buckets was not returned). + +To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard +using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter +can be used to control the volumes of candidate terms produced by each shard. + +Low-frequency terms can turn out to be the most interesting ones once all results are combined so the +significant_terms aggregation can produce higher-quality results when the `shard_size` parameter is set to +values significantly higher than the `size` setting. This ensures that a bigger volume of promising candidate terms are given +a consolidated review by the reducing node before the final selection. Obviously large candidate term lists +will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced. + +NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will + override it and reset it to be equal to `size`. + +==== Minimum document count + +It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "tags" : { + "significant_terms" : { + "field" : "tag", + "min_doc_count": 10 + } + } + } +} +-------------------------------------------------- + +The above aggregation would only return tags which have been found in 10 hits or more. Default value is `3`. + +WARNING: Setting `min_doc_count` to `1` is generally not advised as it tends to return terms that + are typos or other bizarre curiosities. Finding more than one instance of a term helps + reinforce that, while still rare, the term was not the result of a one-off accident. The + default value of 3 is used to provide a minimum weight-of-evidence. + + +==== Filtering Values + +It is possible (although rarely required) to filter the values for which buckets will be created. This can be done using the `include` and +`exclude` parameters which are based on regular expressions. This functionality mirrors the features +offered by the `terms` aggregation. + + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "tags" : { + "significant_terms" : { + "field" : "tags", + "include" : ".*sport.*", + "exclude" : "water_.*" + } + } + } +} +-------------------------------------------------- + +In the above example, buckets will be created for all the tags that has the word `sport` in them, except those starting +with `water_` (so the tag `water_sports` will no be aggregated). The `include` regular expression will determine what +values are "allowed" to be aggregated, while the `exclude` determines the values that should not be aggregated. When +both are defined, the `exclude` has precedence, meaning, the `include` is evaluated first and only then the `exclude`. + +The regular expression are based on the Java(TM) http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html[Pattern], +and as such, they it is also possible to pass in flags that will determine how the compiled regular expression will work: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "tags" : { + "terms" : { + "field" : "tags", + "include" : { + "pattern" : ".*sport.*", + "flags" : "CANON_EQ|CASE_INSENSITIVE" <1> + }, + "exclude" : { + "pattern" : "water_.*", + "flags" : "CANON_EQ|CASE_INSENSITIVE" + } + } + } + } +} +-------------------------------------------------- + +<1> the flags are concatenated using the `|` character as a separator + +The possible flags that can be used are: +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#CANON_EQ[`CANON_EQ`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#CASE_INSENSITIVE[`CASE_INSENSITIVE`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#COMMENTS[`COMMENTS`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#DOTALL[`DOTALL`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#LITERAL[`LITERAL`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#MULTILINE[`MULTILINE`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#UNICODE_CASE[`UNICODE_CASE`], +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#UNICODE_CHARACTER_CLASS[`UNICODE_CHARACTER_CLASS`] and +http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#UNIX_LINES[`UNIX_LINES`] + +==== Execution hint + +There are two mechanisms by which terms aggregations can be executed: either by using field values directly in order to aggregate +data per-bucket (`map`), or by using ordinals of the field values instead of the values themselves (`ordinals`). Although the +latter execution mode can be expected to be slightly faster, it is only available for use when the underlying data source exposes +those terms ordinals. Moreover, it may actually be slower if most field values are unique. Elasticsearch tries to have sensible +defaults when it comes to the execution mode that should be used, but in case you know that an execution mode may perform better +than the other one, you have the ability to provide Elasticsearch with a hint: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "tags" : { + "significant_terms" : { + "field" : "tags", + "execution_hint": "map" <1> + } + } + } +} +-------------------------------------------------- + +<1> the possible values are `map` and `ordinals` + +Please note that Elasticsearch will ignore this execution hint if it is not applicable. diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java index be0ad1eaa4861..d5f45f9c99d5a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeBuilder; import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceBuilder; import org.elasticsearch.search.aggregations.bucket.range.ipv4.IPv4RangeBuilder; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.aggregations.metrics.avg.AvgBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxBuilder; @@ -103,6 +104,10 @@ public static GeoHashGridBuilder geohashGrid(String name) { return new GeoHashGridBuilder(name); } + public static SignificantTermsBuilder significantTerms(String name) { + return new SignificantTermsBuilder(name); + } + public static DateHistogramBuilder dateHistogram(String name) { return new DateHistogramBuilder(name); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 3475f3de9a1c4..66ca55fefc532 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeParser; import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser; import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsParser; import org.elasticsearch.search.aggregations.bucket.terms.TermsParser; import org.elasticsearch.search.aggregations.metrics.avg.AvgParser; import org.elasticsearch.search.aggregations.metrics.max.MaxParser; @@ -65,6 +66,7 @@ public AggregationModule() { parsers.add(MissingParser.class); parsers.add(FilterParser.class); parsers.add(TermsParser.class); + parsers.add(SignificantTermsParser.class); parsers.add(RangeParser.class); parsers.add(DateRangeParser.class); parsers.add(IpRangeParser.class); diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index b97145f357ddd..5a3089304f377 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -30,6 +30,9 @@ import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange; import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance; import org.elasticsearch.search.aggregations.bucket.range.ipv4.InternalIPv4Range; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms; +import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; @@ -67,6 +70,9 @@ protected void configure() { InternalMissing.registerStreams(); StringTerms.registerStreams(); LongTerms.registerStreams(); + SignificantStringTerms.registerStreams(); + SignificantLongTerms.registerStreams(); + UnmappedSignificantTerms.registerStreams(); InternalGeoHashGrid.registerStreams(); DoubleTerms.registerStreams(); UnmappedTerms.registerStreams(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/BucketSignificancePriorityQueue.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/BucketSignificancePriorityQueue.java new file mode 100644 index 0000000000000..8cc51428c4988 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/BucketSignificancePriorityQueue.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.util.PriorityQueue; + +public class BucketSignificancePriorityQueue extends PriorityQueue { + + + public BucketSignificancePriorityQueue(int size) { + super(size); + } + + @Override + protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) { + return o1.getSignificanceScore() < o2.getSignificanceScore(); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java new file mode 100644 index 0000000000000..1466439083cf4 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -0,0 +1,283 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.elasticsearch.cache.recycler.CacheRecycler; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.util.*; + +/** + * + */ +public abstract class InternalSignificantTerms extends InternalAggregation implements SignificantTerms, ToXContent, Streamable { + + public static abstract class Bucket extends SignificantTerms.Bucket { + + long bucketOrd; + protected InternalAggregations aggregations; + double score; + + protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { + super(subsetDf, subsetSize, supersetDf, supersetSize); + this.aggregations = aggregations; + assert subsetDf <= supersetDf; + updateScore(); + } + + @Override + public long getSubsetDf(){ + return subsetDf; + } + + @Override + public long getSupersetDf(){ + return supersetDf; + } + + @Override + public long getSupersetSize(){ + return supersetSize; + } + + @Override + public long getSubsetSize(){ + return subsetSize; + } + + /** + * Calculates the significance of a term in a sample against a background of + * normal distributions by comparing the changes in frequency. This is the heart + * of the significant terms feature. + * + * TODO - allow pluggable scoring implementations + * + * @param subsetFreq The frequency of the term in the selected sample + * @param subsetSize The size of the selected sample (typically number of docs) + * @param supersetFreq The frequency of the term in the superset from which the sample was taken + * @param supersetSize The size of the superset from which the sample was taken (typically number of docs) + * @return a "significance" score + */ + public static final double getSampledTermSignificance(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) { + if ((subsetSize == 0) || (supersetSize == 0)) { + // avoid any divide by zero issues + return 0; + } + + double subsetProbability = (double) subsetFreq / (double) subsetSize; + double supersetProbability = (double) supersetFreq / (double) supersetSize; + + // Using absoluteProbabilityChange alone favours very common words e.g. you, we etc + // because a doubling in popularity of a common term is a big percent difference + // whereas a rare term would have to achieve a hundred-fold increase in popularity to + // achieve the same difference measure. + // In favouring common words as suggested features for search we would get high + // recall but low precision. + double absoluteProbabilityChange = subsetProbability - supersetProbability; + if (absoluteProbabilityChange <= 0) { + return 0; + } + // Using relativeProbabilityChange tends to favour rarer terms e.g.mis-spellings or + // unique URLs. + // A very low-probability term can very easily double in popularity due to the low + // numbers required to do so whereas a high-probability term would have to add many + // extra individual sightings to achieve the same shift. + // In favouring rare words as suggested features for search we would get high + // precision but low recall. + double relativeProbabilityChange = (subsetProbability / supersetProbability); + + // A blend of the above metrics - favours medium-rare terms to strike a useful + // balance between precision and recall. + double score = absoluteProbabilityChange * relativeProbabilityChange; + return score; + } + + public void updateScore() { + score = getSampledTermSignificance(subsetDf, subsetSize, supersetDf, supersetSize); + } + + @Override + public long getDocCount() { + return subsetDf; + } + + @Override + public Aggregations getAggregations() { + return aggregations; + } + + public Bucket reduce(List buckets, CacheRecycler cacheRecycler) { + if (buckets.size() == 1) { + return buckets.get(0); + } + Bucket reduced = null; + List aggregationsList = new ArrayList(buckets.size()); + for (Bucket bucket : buckets) { + if (reduced == null) { + reduced = bucket; + } else { + reduced.subsetDf += bucket.subsetDf; + reduced.supersetDf += bucket.supersetDf; + reduced.updateScore(); + } + aggregationsList.add(bucket.aggregations); + } + assert reduced.subsetDf <= reduced.supersetDf; + reduced.aggregations = InternalAggregations.reduce(aggregationsList, cacheRecycler); + return reduced; + } + + @Override + public double getSignificanceScore() { + return score; + } + } + + protected int requiredSize; + protected long minDocCount; + protected Collection buckets; + protected Map bucketMap; + protected long subsetSize; + protected long supersetSize; + + protected InternalSignificantTerms() {} // for serialization + + protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, Collection buckets) { + super(name); + this.requiredSize = requiredSize; + this.minDocCount = minDocCount; + this.buckets = buckets; + this.subsetSize = subsetSize; + this.supersetSize = supersetSize; + } + + @Override + public Iterator iterator() { + Object o = buckets.iterator(); + return (Iterator) o; + } + + @Override + public Collection getBuckets() { + Object o = buckets; + return (Collection) o; + } + + @Override + public SignificantTerms.Bucket getBucketByKey(String term) { + if (bucketMap == null) { + bucketMap = Maps.newHashMapWithExpectedSize(buckets.size()); + for (Bucket bucket : buckets) { + bucketMap.put(bucket.getKey(), bucket); + } + } + return bucketMap.get(term); + } + + @Override + public InternalSignificantTerms reduce(ReduceContext reduceContext) { + List aggregations = reduceContext.aggregations(); + if (aggregations.size() == 1) { + InternalSignificantTerms terms = (InternalSignificantTerms) aggregations.get(0); + terms.trimExcessEntries(); + return terms; + } + InternalSignificantTerms reduced = null; + + long globalSubsetSize = 0; + long globalSupersetSize = 0; + // Compute the overall result set size and the corpus size using the + // top-level Aggregations from each shard + for (InternalAggregation aggregation : aggregations) { + InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; + globalSubsetSize += terms.subsetSize; + globalSupersetSize += terms.supersetSize; + } + Map> buckets = null; + for (InternalAggregation aggregation : aggregations) { + InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; + if (terms instanceof UnmappedSignificantTerms) { + continue; + } + if (reduced == null) { + reduced = terms; + } + if (buckets == null) { + buckets = new HashMap>(terms.buckets.size()); + } + for (Bucket bucket : terms.buckets) { + List existingBuckets = buckets.get(bucket.getKey()); + if (existingBuckets == null) { + existingBuckets = new ArrayList(aggregations.size()); + buckets.put(bucket.getKey(), existingBuckets); + } + // Adjust the buckets with the global stats representing the + // total size of the pots from which the stats are drawn + bucket.subsetSize = globalSubsetSize; + bucket.supersetSize = globalSupersetSize; + bucket.updateScore(); + existingBuckets.add(bucket); + } + } + + if (reduced == null) { + // there are only unmapped terms, so we just return the first one + // (no need to reduce) + return (UnmappedSignificantTerms) aggregations.get(0); + } + + final int size = Math.min(requiredSize, buckets.size()); + BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); + for (Map.Entry> entry : buckets.entrySet()) { + List sameTermBuckets = entry.getValue(); + final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.cacheRecycler()); + if ((b.score > 0) && (b.subsetDf >= minDocCount)) { + ordered.insertWithOverflow(b); + } + } + Bucket[] list = new Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = (Bucket) ordered.pop(); + } + reduced.buckets = Arrays.asList(list); + reduced.subsetSize = globalSubsetSize; + reduced.supersetSize = globalSupersetSize; + return reduced; + } + + final void trimExcessEntries() { + final List newBuckets = Lists.newArrayList(); + for (Bucket b : buckets) { + if (newBuckets.size() >= requiredSize) { + break; + } + if (b.subsetDf >= minDocCount) { + newBuckets.add(b); + } + } + buckets = newBuckets; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java new file mode 100644 index 0000000000000..a6aff73cab1cb --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -0,0 +1,165 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import com.google.common.primitives.Longs; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.text.StringText; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.support.numeric.ValueFormatter; +import org.elasticsearch.search.aggregations.support.numeric.ValueFormatterStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * + */ +public class SignificantLongTerms extends InternalSignificantTerms { + + public static final Type TYPE = new Type("significant_terms", "siglterms"); + + public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public SignificantLongTerms readResult(StreamInput in) throws IOException { + SignificantLongTerms buckets = new SignificantLongTerms(); + buckets.readFrom(in); + return buckets; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + + static class Bucket extends InternalSignificantTerms.Bucket { + + long term; + + public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations) { + super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations); + this.term = term; + } + + @Override + public Text getKeyAsText() { + return new StringText(String.valueOf(term)); + } + + @Override + public Number getKeyAsNumber() { + return term; + } + + @Override + int compareTerm(SignificantTerms.Bucket other) { + return Longs.compare(term, other.getKeyAsNumber().longValue()); + } + + @Override + public String getKey() { + return Long.toString(term); + } + + } + + private ValueFormatter valueFormatter; + + SignificantLongTerms() {} // for serialization + + public SignificantLongTerms(long subsetSize, long supersetSize, String name, ValueFormatter valueFormatter, int requiredSize, long minDocCount, Collection buckets) { + super(subsetSize, supersetSize,name, requiredSize, minDocCount, buckets); + this.valueFormatter = valueFormatter; + } + + @Override + public Type type() { + return TYPE; + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + this.name = in.readString(); + this.valueFormatter = ValueFormatterStreams.readOptional(in); + this.requiredSize = readSize(in); + this.minDocCount = in.readVLong(); + this.subsetSize = in.readVLong(); + this.supersetSize = in.readVLong(); + + int size = in.readVInt(); + List buckets = new ArrayList(size); + for (int i = 0; i < size; i++) { + long subsetDf=in.readVLong(); + long supersetDf=in.readVLong(); + long term=in.readLong(); + buckets.add(new Bucket(subsetDf, subsetSize, supersetDf, + supersetSize, term, InternalAggregations.readAggregations(in))); + } + this.buckets = buckets; + this.bucketMap = null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + ValueFormatterStreams.writeOptional(valueFormatter, out); + writeSize(requiredSize, out); + out.writeVLong(minDocCount); + out.writeVLong(subsetSize); + out.writeVLong(supersetSize); + out.writeVInt(buckets.size()); + for (InternalSignificantTerms.Bucket bucket : buckets) { + out.writeVLong(((Bucket) bucket).subsetDf); + out.writeVLong(((Bucket) bucket).supersetDf); + out.writeLong(((Bucket) bucket).term); + ((InternalAggregations) bucket.getAggregations()).writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field("doc_count", subsetSize); + builder.startArray(CommonFields.BUCKETS); + for (InternalSignificantTerms.Bucket bucket : buckets) { + builder.startObject(); + builder.field(CommonFields.KEY, ((Bucket) bucket).term); + if (valueFormatter != null) { + builder.field(CommonFields.KEY_AS_STRING, valueFormatter.format(((Bucket) bucket).term)); + } + builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); + builder.field("score", bucket.score); + builder.field("bg_count", bucket.supersetDf); + ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java new file mode 100644 index 0000000000000..72db7961b0649 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource; +import org.elasticsearch.search.internal.ContextIndexSearcher; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +/** + * + */ +public class SignificantLongTermsAggregator extends LongTermsAggregator { + + public SignificantLongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, + long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, + AggregationContext aggregationContext, Aggregator parent,SignificantTermsAggregatorFactory termsAggFactory) { + super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, aggregationContext, parent); + this.termsAggFactory = termsAggFactory; + } + + protected long numCollectedDocs; + private SignificantTermsAggregatorFactory termsAggFactory; + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + super.collect(doc,owningBucketOrdinal); + numCollectedDocs++; + } + + @Override + public SignificantLongTerms buildAggregation(long owningBucketOrdinal) { + assert owningBucketOrdinal == 0; + + final int size = (int) Math.min(bucketOrds.size(), shardSize); + + ContextIndexSearcher searcher = context.searchContext().searcher(); + IndexReader topReader = searcher.getIndexReader(); + long supersetSize = topReader.numDocs(); + long subsetSize = numCollectedDocs; + + BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); + SignificantLongTerms.Bucket spare = null; + for (long i = 0; i < bucketOrds.capacity(); ++i) { + final long ord = bucketOrds.id(i); + if (ord < 0) { + // slot is not allocated + continue; + } + + if (spare == null) { + spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null); + } + spare.term = bucketOrds.key(i); + spare.subsetDf = bucketDocCount(ord); + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(topReader, spare.term); + spare.supersetSize = supersetSize; + assert spare.subsetDf <= spare.supersetDf; + // During shard-local down-selection we use subset/superset stats + // that are for this shard only + // Back at the central reducer these properties will be updated with + // global stats + spare.updateScore(); + + spare.bucketOrd = ord; + spare = (SignificantLongTerms.Bucket) ordered.insertWithOverflow(spare); + } + + final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; --i) { + final SignificantLongTerms.Bucket bucket = (SignificantLongTerms.Bucket) ordered.pop(); + bucket.aggregations = bucketAggregations(bucket.bucketOrd); + list[i] = bucket; + } + return new SignificantLongTerms(subsetSize, supersetSize, name, valuesSource.formatter(), requiredSize, minDocCount, + Arrays.asList(list)); + } + + @Override + public SignificantLongTerms buildEmptyAggregation() { + // We need to account for the significance of a miss in our global stats + // - provide corpus size as context + ContextIndexSearcher searcher = context.searchContext().searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantLongTerms(0, supersetSize, name, valuesSource.formatter(), requiredSize, minDocCount, Collections.emptyList()); + } + + @Override + public void doRelease() { + Releasables.release(bucketOrds, termsAggFactory); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java new file mode 100644 index 0000000000000..37d51c93b8f02 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.text.BytesText; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * + */ +public class SignificantStringTerms extends InternalSignificantTerms { + + public static final InternalAggregation.Type TYPE = new Type("significant_terms", "sigsterms"); + + public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public SignificantStringTerms readResult(StreamInput in) throws IOException { + SignificantStringTerms buckets = new SignificantStringTerms(); + buckets.readFrom(in); + return buckets; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + + public static class Bucket extends InternalSignificantTerms.Bucket { + + BytesRef termBytes; + + + public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize,InternalAggregations aggregations) { + super(subsetDf, subsetSize,supersetDf,supersetSize,aggregations); + this.termBytes = term; + } + + + @Override + public Text getKeyAsText() { + return new BytesText(new BytesArray(termBytes)); + } + + @Override + public Number getKeyAsNumber() { + // this method is needed for scripted numeric aggregations + return Double.parseDouble(termBytes.utf8ToString()); + } + + @Override + int compareTerm(SignificantTerms.Bucket other) { + return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes); + } + + + @Override + public String getKey() { + return termBytes.utf8ToString(); + } + + } + + SignificantStringTerms() {} // for serialization + + public SignificantStringTerms(long subsetSize, long supersetSize,String name, int requiredSize, long minDocCount, Collection buckets) { + super(subsetSize, supersetSize, name, requiredSize, minDocCount, buckets); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + this.name = in.readString(); + this.requiredSize = readSize(in); + this.minDocCount = in.readVLong(); + this.subsetSize = in.readVLong(); + this.supersetSize = in.readVLong(); + int size = in.readVInt(); + List buckets = new ArrayList(size); + for (int i = 0; i < size; i++) { + BytesRef term = in.readBytesRef(); + long subsetDf= in.readVLong(); + long supersetDf= in.readVLong(); + buckets.add(new Bucket(term,subsetDf, subsetSize, supersetDf, + supersetSize, InternalAggregations.readAggregations(in))); + } + this.buckets = buckets; + this.bucketMap = null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + writeSize(requiredSize, out); + out.writeVLong(minDocCount); + out.writeVLong(subsetSize); + out.writeVLong(supersetSize); + out.writeVInt(buckets.size()); + for (InternalSignificantTerms.Bucket bucket : buckets) { + out.writeBytesRef(((Bucket) bucket).termBytes); + out.writeVLong(((Bucket) bucket).subsetDf); + out.writeVLong(((Bucket) bucket).supersetDf); + ((InternalAggregations) bucket.getAggregations()).writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.field("doc_count", subsetSize); + builder.startArray(CommonFields.BUCKETS); + for (InternalSignificantTerms.Bucket bucket : buckets) { + //There is a condition (presumably when only one shard has a bucket?) where reduce is not called + // and I end up with buckets that contravene the user's min_doc_count criteria in my reducer + if(bucket.subsetDf>=minDocCount){ + builder.startObject(); + builder.field(CommonFields.KEY, ((Bucket) bucket).termBytes); + builder.field(CommonFields.DOC_COUNT, bucket.getDocCount()); + builder.field("score", bucket.score); + builder.field("bg_count", bucket.supersetDf); + ((InternalAggregations) bucket.getAggregations()).toXContentInternal(builder, params); + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java new file mode 100644 index 0000000000000..6b8ce91ff7f74 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java @@ -0,0 +1,188 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.fielddata.BytesValues; +import org.elasticsearch.index.fielddata.ordinals.Ordinals; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.terms.StringTermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource; +import org.elasticsearch.search.internal.ContextIndexSearcher; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +/** + * An aggregator of significant string values. + */ +public class SignificantStringTermsAggregator extends StringTermsAggregator { + + protected long numCollectedDocs; + protected SignificantTermsAggregatorFactory termsAggFactory; + + public SignificantStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, + long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, + IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) { + super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, + parent); + this.termsAggFactory=termsAggFactory; + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + super.collect(doc,owningBucketOrdinal); + numCollectedDocs++; + } + + @Override + public SignificantStringTerms buildAggregation(long owningBucketOrdinal) { + assert owningBucketOrdinal == 0; + + final int size = (int) Math.min(bucketOrds.size(), shardSize); + + ContextIndexSearcher searcher = context.searchContext().searcher(); + IndexReader topReader = searcher.getIndexReader(); + long supersetSize = topReader.numDocs(); + long subsetSize = numCollectedDocs; + + BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); + SignificantStringTerms.Bucket spare = null; + for (int i = 0; i < bucketOrds.size(); i++) { + if (spare == null) { + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null); + } + + bucketOrds.get(i, spare.termBytes); + spare.subsetDf = bucketDocCount(i); + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(topReader, spare.termBytes); + spare.supersetSize = supersetSize; + assert spare.subsetDf <= spare.supersetDf; + // During shard-local down-selection we use subset/superset stats + // that are for this shard only + // Back at the central reducer these properties will be updated with + // global stats + spare.updateScore(); + + spare.bucketOrd = i; + spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare); + } + + final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; --i) { + final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop(); + // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point + bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); + bucket.aggregations = bucketAggregations(bucket.bucketOrd); + list[i] = bucket; + } + + return new SignificantStringTerms(subsetSize, supersetSize, name, requiredSize, minDocCount, Arrays.asList(list)); + } + + @Override + public SignificantStringTerms buildEmptyAggregation() { + // We need to account for the significance of a miss in our global stats + // - provide corpus size as context + ContextIndexSearcher searcher = context.searchContext().searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections. emptyList()); + } + + @Override + public void doRelease() { + Releasables.release(bucketOrds, termsAggFactory); + } + + /** + * Extension of SignificantStringTermsAggregator that caches bucket ords using terms ordinals. + */ + public static class WithOrdinals extends SignificantStringTermsAggregator { + + private final BytesValuesSource.WithOrdinals valuesSource; + private BytesValues.WithOrdinals bytesValues; + private Ordinals.Docs ordinals; + private LongArray ordinalToBucket; + + public WithOrdinals(String name, AggregatorFactories factories, BytesValuesSource.WithOrdinals valuesSource, + long esitmatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent,SignificantTermsAggregatorFactory termsAggFactory) { + super(name, factories, valuesSource, esitmatedBucketCount, requiredSize, shardSize, minDocCount, null, aggregationContext, parent, termsAggFactory); + this.valuesSource = valuesSource; + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + bytesValues = valuesSource.bytesValues(); + ordinals = bytesValues.ordinals(); + final long maxOrd = ordinals.getMaxOrd(); + if (ordinalToBucket == null || ordinalToBucket.size() < maxOrd) { + if (ordinalToBucket != null) { + ordinalToBucket.release(); + } + ordinalToBucket = context().bigArrays().newLongArray(BigArrays.overSize(maxOrd), false); + } + ordinalToBucket.fill(0, maxOrd, -1L); + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + assert owningBucketOrdinal == 0 : "this is a per_bucket aggregator"; + numCollectedDocs++; + final int valuesCount = ordinals.setDocument(doc); + + for (int i = 0; i < valuesCount; ++i) { + final long ord = ordinals.nextOrd(); + long bucketOrd = ordinalToBucket.get(ord); + if (bucketOrd < 0) { // unlikely condition on a low-cardinality + // field + final BytesRef bytes = bytesValues.getValueByOrd(ord); + final int hash = bytesValues.currentValueHash(); + assert hash == bytes.hashCode(); + bucketOrd = bucketOrds.add(bytes, hash); + if (bucketOrd < 0) { // already seen in another segment + bucketOrd = -1 - bucketOrd; + } + ordinalToBucket.set(ord, bucketOrd); + } + + collectBucket(doc, bucketOrd); + } + } + + @Override + public void doRelease() { + Releasables.release(bucketOrds, termsAggFactory, ordinalToBucket); + } + + } + +} + diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java new file mode 100644 index 0000000000000..cff13bbfa9787 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTerms.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; + +import java.util.Collection; + +/** + * + */ +public interface SignificantTerms extends MultiBucketsAggregation, Iterable { + + + static abstract class Bucket implements MultiBucketsAggregation.Bucket { + + long subsetDf; + long subsetSize; + long supersetDf; + long supersetSize; + + Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) { + super(); + this.subsetDf = subsetDf; + this.subsetSize = subsetSize; + this.supersetDf = supersetDf; + this.supersetSize = supersetSize; + } + + public abstract Number getKeyAsNumber(); + + abstract int compareTerm(SignificantTerms.Bucket other); + + public abstract double getSignificanceScore(); + + public long getSubsetDf(){ + return subsetDf; + } + + public long getSupersetDf(){ + return supersetDf; + } + + public long getSupersetSize(){ + return supersetSize; + } + + public long getSubsetSize(){ + return subsetSize; + } + + } + + @Override + Collection getBuckets(); + + @Override + Bucket getBucketByKey(String key); + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java new file mode 100644 index 0000000000000..b4aed7d21738d --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java @@ -0,0 +1,202 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource; +import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +/** + * + */ +public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFactory implements Releasable { + + public static final String EXECUTION_HINT_VALUE_MAP = "map"; + public static final String EXECUTION_HINT_VALUE_ORDINALS = "ordinals"; + static final int INITIAL_NUM_TERM_FREQS_CACHED = 512; + + private final int requiredSize; + private final int shardSize; + private final long minDocCount; + private final IncludeExclude includeExclude; + private final String executionHint; + private String indexedFieldName; + private FieldMapper mapper; + private IntArray termDocFreqs; + private BytesRefHash cachedTermOrds; + private BigArrays bigArrays; + + public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) { + super(name, SignificantStringTerms.TYPE.name(), valueSourceConfig); + this.requiredSize = requiredSize; + this.shardSize = shardSize; + this.minDocCount = minDocCount; + this.includeExclude = includeExclude; + this.executionHint = executionHint; + if (!valueSourceConfig.unmapped()) { + this.indexedFieldName = valuesSourceConfig.fieldContext().field(); + mapper = SearchContext.current().smartNameFieldMapper(indexedFieldName); + } + bigArrays = SearchContext.current().bigArrays(); + termDocFreqs = bigArrays.newIntArray(INITIAL_NUM_TERM_FREQS_CACHED, true); + cachedTermOrds = new BytesRefHash(INITIAL_NUM_TERM_FREQS_CACHED, bigArrays); + } + + @Override + protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { + return new UnmappedSignificantTermsAggregator(name, requiredSize, minDocCount, aggregationContext, parent, this); + } + + private static boolean hasParentBucketAggregator(Aggregator parent) { + if (parent == null) { + return false; + } else if (parent.bucketAggregationMode() == BucketAggregationMode.PER_BUCKET) { + return true; + } else { + return hasParentBucketAggregator(parent.parent()); + } + } + + @Override + protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) { + long estimatedBucketCount = valuesSource.metaData().maxAtomicUniqueValuesCount(); + if (estimatedBucketCount < 0) { + // there isn't an estimation available.. 50 should be a good start + estimatedBucketCount = 50; + } + + // adding an upper bound on the estimation as some atomic field data in the future (binary doc values) and not + // going to know their exact cardinality and will return upper bounds in AtomicFieldData.getNumberUniqueValues() + // that may be largely over-estimated.. the value chosen here is arbitrary just to play nice with typical CPU cache + // + // Another reason is that it may be faster to resize upon growth than to start directly with the appropriate size. + // And that all values are not necessarily visited by the matches. + estimatedBucketCount = Math.min(estimatedBucketCount, 512); + + if (valuesSource instanceof BytesValuesSource) { + if (executionHint != null && !executionHint.equals(EXECUTION_HINT_VALUE_MAP) && !executionHint.equals(EXECUTION_HINT_VALUE_ORDINALS)) { + throw new ElasticsearchIllegalArgumentException("execution_hint can only be '" + EXECUTION_HINT_VALUE_MAP + "' or '" + EXECUTION_HINT_VALUE_ORDINALS + "', not " + executionHint); + } + String execution = executionHint; + if (!(valuesSource instanceof BytesValuesSource.WithOrdinals)) { + execution = EXECUTION_HINT_VALUE_MAP; + } else if (includeExclude != null) { + execution = EXECUTION_HINT_VALUE_MAP; + } + if (execution == null) { + if ((valuesSource instanceof BytesValuesSource.WithOrdinals) + && !hasParentBucketAggregator(parent)) { + execution = EXECUTION_HINT_VALUE_ORDINALS; + } else { + execution = EXECUTION_HINT_VALUE_MAP; + } + } + assert execution != null; + + if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) { + assert includeExclude == null; + return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this ); + } else { + return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this); + } + } + + if (includeExclude != null) { + throw new AggregationExecutionException("Aggregation [" + name + "] cannot support the include/exclude " + + "settings as it can only be applied to string values"); + } + + if (valuesSource instanceof NumericValuesSource) { + + if (((NumericValuesSource) valuesSource).isFloatingPoint()) { + throw new UnsupportedOperationException("No support for examining floating point numerics"); + } + return new SignificantLongTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent,this); + } + + throw new AggregationExecutionException("sigfnificant_terms aggregation cannot be applied to field [" + valuesSourceConfig.fieldContext().field() + + "]. It can only be applied to numeric or string fields."); + } + + // Many child aggs may ask for the same docFreq information so here we cache docFreq + // values for these terms. + // TODO this should be re-factored into a more generic system for efficiently checking frequencies of things + // In future we may need to a) check the frequency in a set other than the index e.g. a subset and b) check + // the frequency of an entity other than an a single indexed term e.g. a numeric range. + // This is likely to require some careful design. + public long getBackgroundFrequency(IndexReader topReader, BytesRef termBytes) { + int result = 0; + long termOrd = cachedTermOrds.add(termBytes); + if (termOrd < 0) { // already seen, return the cached docFreq + termOrd = -1 - termOrd; + result = termDocFreqs.get(termOrd); + } else { // cache miss - read the terms' frequency in this shard and cache it + try { + result = topReader.docFreq(new Term(indexedFieldName, termBytes)); + } catch (IOException e) { + throw new ElasticsearchException("IOException reading document frequency", e); + } + termDocFreqs = bigArrays.grow(termDocFreqs, termOrd + 1); + termDocFreqs.set(termOrd, result); + } + return result; + } + + + + // Many child aggs may ask for the same docFreq information so cache docFreq + // values for these terms + public long getBackgroundFrequency(IndexReader topReader, long term) { + BytesRef indexedVal = mapper.indexedValueForSearch(term); + return getBackgroundFrequency(topReader, indexedVal); + } + + @Override + public boolean release() throws ElasticsearchException { + try { + Releasables.release(cachedTermOrds, termDocFreqs); + } finally { + cachedTermOrds = null; + termDocFreqs = null; + } + return true; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java new file mode 100644 index 0000000000000..183ef6645f958 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; + +import java.io.IOException; + +/** + * Creates an aggregation that finds interesting or unusual occurrences of terms in a result set. + * + * This feature is marked as experimental, and may be subject to change in the future. If you + * use this feature, please let us know your experience with it! + */ +public class SignificantTermsBuilder extends AggregationBuilder { + + + private String field; + private int requiredSize = SignificantTermsParser.DEFAULT_REQUIRED_SIZE; + private int shardSize = SignificantTermsParser.DEFAULT_SHARD_SIZE; + private int minDocCount = SignificantTermsParser.DEFAULT_MIN_DOC_COUNT; + + public SignificantTermsBuilder(String name) { + super(name, SignificantStringTerms.TYPE.name()); + } + + public SignificantTermsBuilder field(String field) { + this.field = field; + return this; + } + + public SignificantTermsBuilder size(int requiredSize) { + this.requiredSize = requiredSize; + return this; + } + public SignificantTermsBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + public SignificantTermsBuilder minDocCount(int minDocCount) { + this.minDocCount = minDocCount; + return this; + } + + + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (field != null) { + builder.field("field", field); + } + if (minDocCount != SignificantTermsParser.DEFAULT_MIN_DOC_COUNT) { + builder.field("minDocCount", minDocCount); + } + if (requiredSize != SignificantTermsParser.DEFAULT_REQUIRED_SIZE) { + builder.field("size", requiredSize); + } + if (shardSize != SignificantTermsParser.DEFAULT_SHARD_SIZE) { + builder.field("shard_size", shardSize); + } + + return builder.endObject(); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsParser.java new file mode 100644 index 0000000000000..1f498ea3e2da8 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsParser.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexNumericFieldData; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.core.DateFieldMapper; +import org.elasticsearch.index.mapper.ip.IpFieldMapper; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.aggregations.support.FieldContext; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource; +import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource; +import org.elasticsearch.search.aggregations.support.numeric.ValueFormatter; +import org.elasticsearch.search.aggregations.support.numeric.ValueParser; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * + */ +public class SignificantTermsParser implements Aggregator.Parser { + + @Override + public String type() { + return SignificantStringTerms.TYPE.name(); + } + + public static final int DEFAULT_REQUIRED_SIZE=10; + public static final int DEFAULT_SHARD_SIZE=0; + //Typically need more than one occurrence of something for it to be statistically significant + public static final int DEFAULT_MIN_DOC_COUNT = 3; + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + + String field = null; + int requiredSize = DEFAULT_REQUIRED_SIZE; + int shardSize = DEFAULT_SHARD_SIZE; + String format = null; + String include = null; + int includeFlags = 0; // 0 means no flags + String exclude = null; + int excludeFlags = 0; // 0 means no flags + String executionHint = null; + long minDocCount = DEFAULT_MIN_DOC_COUNT; + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("field".equals(currentFieldName)) { + field = parser.text(); + } else if ("format".equals(currentFieldName)) { + format = parser.text(); + } else if ("include".equals(currentFieldName)) { + include = parser.text(); + } else if ("exclude".equals(currentFieldName)) { + exclude = parser.text(); + } else if ("execution_hint".equals(currentFieldName) || "executionHint".equals(currentFieldName)) { + executionHint = parser.text(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("size".equals(currentFieldName)) { + requiredSize = parser.intValue(); + } else if ("shard_size".equals(currentFieldName) || "shardSize".equals(currentFieldName)) { + shardSize = parser.intValue(); + } else if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) { + minDocCount = parser.intValue(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if ("include".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("pattern".equals(currentFieldName)) { + include = parser.text(); + } else if ("flags".equals(currentFieldName)) { + includeFlags = Regex.flagsFromString(parser.text()); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("flags".equals(currentFieldName)) { + includeFlags = parser.intValue(); + } + } + } + } else if ("exclude".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("pattern".equals(currentFieldName)) { + exclude = parser.text(); + } else if ("flags".equals(currentFieldName)) { + excludeFlags = Regex.flagsFromString(parser.text()); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("flags".equals(currentFieldName)) { + excludeFlags = parser.intValue(); + } + } + } + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "]."); + } + } + + if (shardSize == DEFAULT_SHARD_SIZE) { + //The user has not made a shardSize selection . + //Use default heuristic to avoid any wrong-ranking caused by distributed counting + //but request double the usual amount. + //We typically need more than the number of "top" terms requested by other aggregations + //as the significance algorithm is in less of a position to down-select at shard-level - + //some of the things we want to find have only one occurrence on each shard and as + // such are impossible to differentiate from non-significant terms at that early stage. + shardSize = 2 * BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards()); + + } + + // shard_size cannot be smaller than size as we need to at least fetch entries from every shards in order to return + if (shardSize < requiredSize) { + shardSize = requiredSize; + } + + IncludeExclude includeExclude = null; + if (include != null || exclude != null) { + Pattern includePattern = include != null ? Pattern.compile(include, includeFlags) : null; + Pattern excludePattern = exclude != null ? Pattern.compile(exclude, excludeFlags) : null; + includeExclude = new IncludeExclude(includePattern, excludePattern); + } + + + FieldMapper mapper = context.smartNameFieldMapper(field); + if (mapper == null) { + ValuesSourceConfig config = new ValuesSourceConfig(BytesValuesSource.class); + config.unmapped(true); + return new SignificantTermsAggregatorFactory(aggregationName, config, requiredSize, shardSize, minDocCount, includeExclude, executionHint); + } + IndexFieldData indexFieldData = context.fieldData().getForField(mapper); + + ValuesSourceConfig config; + + if (mapper instanceof DateFieldMapper) { + DateFieldMapper dateMapper = (DateFieldMapper) mapper; + ValueFormatter formatter = format == null ? + new ValueFormatter.DateTime(dateMapper.dateTimeFormatter()) : + new ValueFormatter.DateTime(format); + config = new ValuesSourceConfig(NumericValuesSource.class) + .formatter(formatter) + .parser(new ValueParser.DateMath(dateMapper.dateMathParser())); + + } else if (mapper instanceof IpFieldMapper) { + config = new ValuesSourceConfig(NumericValuesSource.class) + .formatter(ValueFormatter.IPv4) + .parser(ValueParser.IPv4); + + } else if (indexFieldData instanceof IndexNumericFieldData) { + config = new ValuesSourceConfig(NumericValuesSource.class); + if (format != null) { + config.formatter(new ValueFormatter.Number.Pattern(format)); + } + + } else { + config = new ValuesSourceConfig(BytesValuesSource.class); + // TODO: it will make sense to set false instead here if the aggregator factory uses + // ordinals instead of hash tables + config.needsHashes(true); + } + + config.fieldContext(new FieldContext(field, indexFieldData)); + // We need values to be unique to be able to run terms aggs efficiently + config.ensureUnique(true); + + return new SignificantTermsAggregatorFactory(aggregationName, config, requiredSize, shardSize, minDocCount, includeExclude, executionHint); + } + + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java new file mode 100644 index 0000000000000..683c6048b5247 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +/** + * + */ +public class UnmappedSignificantTerms extends InternalSignificantTerms { + + public static final Type TYPE = new Type("significant_terms", "umsigterms"); + + private static final Collection BUCKETS = Collections.emptyList(); + private static final Map BUCKETS_MAP = Collections.emptyMap(); + + public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public UnmappedSignificantTerms readResult(StreamInput in) throws IOException { + UnmappedSignificantTerms buckets = new UnmappedSignificantTerms(); + buckets.readFrom(in); + return buckets; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + UnmappedSignificantTerms() {} // for serialization + + public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount) { + //We pass zero for index/subset sizes because for the purpose of significant term analysis + // we assume an unmapped index's size is irrelevant to the proceedings. + super(0,0,name, requiredSize, minDocCount, BUCKETS); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + this.name = in.readString(); + this.requiredSize = readSize(in); + this.minDocCount = in.readVLong(); + this.buckets = BUCKETS; + this.bucketMap = BUCKETS_MAP; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + writeSize(requiredSize, out); + out.writeVLong(minDocCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name); + builder.startArray(CommonFields.BUCKETS).endArray(); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTermsAggregator.java new file mode 100644 index 0000000000000..d415a7517f069 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTermsAggregator.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.AtomicReaderContext; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.support.AggregationContext; + +import java.io.IOException; + +/** + * + */ +public class UnmappedSignificantTermsAggregator extends Aggregator { + + private final int requiredSize; + private final long minDocCount; + private SignificantTermsAggregatorFactory termsAggFactory; + + public UnmappedSignificantTermsAggregator(String name, int requiredSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) { + super(name, BucketAggregationMode.PER_BUCKET, AggregatorFactories.EMPTY, 0, aggregationContext, parent); + this.requiredSize = requiredSize; + this.minDocCount = minDocCount; + this.termsAggFactory = termsAggFactory; + } + + @Override + public boolean shouldCollect() { + return false; + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) { + assert owningBucketOrdinal == 0; + return new UnmappedSignificantTerms(name, requiredSize, minDocCount); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new UnmappedSignificantTerms(name, requiredSize, minDocCount); + } + + @Override + protected void doRelease() { + Releasables.release(termsAggFactory); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java index 798f4e3c8fc3e..dff1a184cfeef 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.fielddata.LongValues; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; @@ -39,11 +40,11 @@ public class LongTermsAggregator extends BucketsAggregator { private final InternalOrder order; - private final int requiredSize; - private final int shardSize; - private final long minDocCount; - private final NumericValuesSource valuesSource; - private final LongHash bucketOrds; + protected final int requiredSize; + protected final int shardSize; + protected final long minDocCount; + protected final NumericValuesSource valuesSource; + protected final LongHash bucketOrds; private LongValues values; public LongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount, @@ -83,7 +84,7 @@ public void collect(int doc, long owningBucketOrdinal) throws IOException { } @Override - public LongTerms buildAggregation(long owningBucketOrdinal) { + public InternalAggregation buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; if (minDocCount == 0 && (order != InternalOrder.COUNT_DESC || bucketOrds.size() < requiredSize)) { @@ -130,7 +131,7 @@ public LongTerms buildAggregation(long owningBucketOrdinal) { } @Override - public LongTerms buildEmptyAggregation() { + public InternalAggregation buildEmptyAggregation() { return new LongTerms(name, order, valuesSource.formatter(), requiredSize, minDocCount, Collections.emptyList()); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java index 74f1f5a65fe1f..1e7d48f936ff5 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; @@ -50,9 +51,9 @@ public class StringTermsAggregator extends BucketsAggregator { private final ValuesSource valuesSource; private final InternalOrder order; - private final int requiredSize; - private final int shardSize; - private final long minDocCount; + protected final int requiredSize; + protected final int shardSize; + protected final long minDocCount; protected final BytesRefHash bucketOrds; private final IncludeExclude includeExclude; private BytesValues values; @@ -142,7 +143,7 @@ public BytesRef next() { } @Override - public StringTerms buildAggregation(long owningBucketOrdinal) { + public InternalAggregation buildAggregation(long owningBucketOrdinal) { assert owningBucketOrdinal == 0; if (minDocCount == 0 && (order != InternalOrder.COUNT_DESC || bucketOrds.size() < requiredSize)) { @@ -244,7 +245,7 @@ public boolean apply(BytesRef input) { } @Override - public StringTerms buildEmptyAggregation() { + public InternalAggregation buildEmptyAggregation() { return new StringTerms(name, order, requiredSize, minDocCount, Collections.emptyList()); } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsTests.java new file mode 100644 index 0000000000000..89cf5e46a6374 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsTests.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms.Bucket; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +public class SignificantTermsTests extends ElasticsearchIntegrationTest { + + @Override + public Settings indexSettings() { + return ImmutableSettings.builder() + .put("index.number_of_shards", between(1, 5)) + .put("index.number_of_replicas", between(0, 1)) + .build(); + } + + public static final int MUSIC_CATEGORY=1; + public static final int OTHER_CATEGORY=2; + public static final int SNOWBOARDING_CATEGORY=3; + + @Before + public void init() throws Exception { + assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, 5, SETTING_NUMBER_OF_REPLICAS, 0).addMapping("fact", + "_routing", "required=true,path=routing_id", "routing_id", "type=string,index=not_analyzed", "fact_category", + "type=integer,index=not_analyzed", "description", "type=string,index=analyzed")); + createIndex("idx_unmapped"); + + ensureGreen(); + String data[] = { + "A\t1\tpaul weller was lead singer of the jam before the style council", + "B\t1\tpaul weller left the jam to form the style council", + "A\t2\tpaul smith is a designer in the fashion industry", + "B\t1\tthe stranglers are a group originally from guildford", + "A\t1\tafter disbanding the style council in 1985 paul weller became a solo artist", + "B\t1\tjean jaques burnel is a bass player in the stranglers and has a black belt in karate", + "A\t1\tmalcolm owen was the lead singer of the ruts", + "B\t1\tpaul weller has denied any possibility of a reunion of the jam", + "A\t1\tformer frontman of the jam paul weller became the father of twins", + "B\t2\tex-england football star paul gascoigne has re-emerged following recent disappearance", + "A\t2\tdavid smith has recently denied connections with the mafia", + "B\t1\tthe damned's new rose single was considered the first 'punk' single in the UK", + "A\t1\tthe sex pistols broke up after a few short years together", + "B\t1\tpaul gascoigne was a midfielder for england football team", + "A\t3\tcraig kelly became the first world champion snowboarder and has a memorial at baldface lodge", + "B\t3\tterje haakonsen has credited craig kelly as his snowboard mentor", + "A\t3\tterje haakonsen and craig kelly were some of the first snowboarders sponsored by burton snowboards", + "B\t3\tlike craig kelly before him terje won the mt baker banked slalom many times - once riding switch", + "A\t3\tterje haakonsen has been a team rider for burton snowboards for over 20 years" + }; + + for (int i = 0; i < data.length; i++) { + String[] parts = data[i].split("\t"); + client().prepareIndex("test", "fact", "" + i) + .setSource("routing_id", parts[0], "fact_category", parts[1], "description", parts[2]).get(); + } + client().admin().indices().refresh(new RefreshRequest("test")).get(); + } + + @Test + public void structuredAnalysis() throws Exception { + SearchResponse response = client().prepareSearch("test") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("_all", "terje")) + .setFrom(0).setSize(60).setExplain(true) + .addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category") + .minDocCount(2)) + .execute() + .actionGet(); + SignificantTerms topTerms = response.getAggregations().get("mySignificantTerms"); + Number topCategory = topTerms.getBuckets().iterator().next().getKeyAsNumber(); + assertTrue(topCategory.equals(new Long(SNOWBOARDING_CATEGORY))); + } + + @Test + public void unmapped() throws Exception { + SearchResponse response = client().prepareSearch("idx_unmapped") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("_all", "terje")) + .setFrom(0).setSize(60).setExplain(true) + .addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category") + .minDocCount(2)) + .execute() + .actionGet(); + SignificantTerms topTerms = response.getAggregations().get("mySignificantTerms"); + assertThat(topTerms.getBuckets().size(), equalTo(0)); + } + + @Test + public void textAnalysis() throws Exception { + SearchResponse response = client().prepareSearch("test") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("_all", "terje")) + .setFrom(0).setSize(60).setExplain(true) + .addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("description") + .minDocCount(2)) + .execute() + .actionGet(); + SignificantTerms topTerms = response.getAggregations().get("mySignificantTerms"); + checkExpectedStringTermsFound(topTerms); + } + + @Test + public void partiallyUnmapped() throws Exception { + SearchResponse response = client().prepareSearch("idx_unmapped","test") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("_all", "terje")) + .setFrom(0).setSize(60).setExplain(true) + .addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("description") + .minDocCount(2)) + .execute() + .actionGet(); + SignificantTerms topTerms = response.getAggregations().get("mySignificantTerms"); + checkExpectedStringTermsFound(topTerms); + } + + + private void checkExpectedStringTermsFound(SignificantTerms topTerms) { + HashMaptopWords=new HashMap(); + for (Bucket topTerm : topTerms ){ + topWords.put(topTerm.getKey(),topTerm); + } + assertTrue( topWords.containsKey("haakonsen")); + assertTrue( topWords.containsKey("craig")); + assertTrue( topWords.containsKey("kelly")); + assertTrue( topWords.containsKey("burton")); + assertTrue( topWords.containsKey("snowboards")); + Bucket kellyTerm=topWords.get("kelly"); + assertEquals(3, kellyTerm.getSubsetDf()); + assertEquals(4, kellyTerm.getSupersetDf()); + } + + + +}