1313import org .elasticsearch .search .aggregations .Aggregator ;
1414import org .elasticsearch .search .aggregations .LeafBucketCollector ;
1515import org .elasticsearch .search .aggregations .LeafBucketCollectorBase ;
16+ import org .elasticsearch .search .aggregations .bucket .DocCountProvider ;
1617import org .elasticsearch .search .aggregations .metrics .CompensatedSum ;
1718import org .elasticsearch .search .aggregations .support .AggregationContext ;
1819import org .elasticsearch .search .aggregations .support .ValuesSource ;
2223import java .util .Map ;
2324
2425public class NumericRateAggregator extends AbstractRateAggregator {
26+
27+ private final DocCountProvider docCountProvider ;
28+
2529 public NumericRateAggregator (
2630 String name ,
2731 ValuesSourceConfig valuesSourceConfig ,
@@ -32,42 +36,68 @@ public NumericRateAggregator(
3236 Map <String , Object > metadata
3337 ) throws IOException {
3438 super (name , valuesSourceConfig , rateUnit , rateMode , context , parent , metadata );
39+ docCountProvider = computeWithDocCount ? new DocCountProvider () : null ;
3540 }
3641
3742 @ Override
3843 public LeafBucketCollector getLeafCollector (LeafReaderContext ctx , final LeafBucketCollector sub ) throws IOException {
3944 final CompensatedSum kahanSummation = new CompensatedSum (0 , 0 );
40- final SortedNumericDoubleValues values = ((ValuesSource .Numeric ) valuesSource ).doubleValues (ctx );
41- return new LeafBucketCollectorBase (sub , values ) {
42- @ Override
43- public void collect (int doc , long bucket ) throws IOException {
44- sums = bigArrays ().grow (sums , bucket + 1 );
45- compensations = bigArrays ().grow (compensations , bucket + 1 );
46-
47- if (values .advanceExact (doc )) {
48- final int valuesCount = values .docValueCount ();
45+ if (computeWithDocCount ) {
46+ // No field or script has been set at the rate agg. So, rate will be computed based on the doc_counts.
47+ // This implementation hard-wires the DocCountProvider and reads the _doc_count fields when available.
48+ // A better approach would be to create a DOC_COUNT ValuesSource type and use that as valuesSource
49+ // In that case the computeRateOnDocs variable and this branch of the if-statement are not required.
50+ docCountProvider .setLeafReaderContext (ctx );
51+ return new LeafBucketCollectorBase (sub , null ) {
52+ @ Override
53+ public void collect (int doc , long bucket ) throws IOException {
54+ sums = bigArrays ().grow (sums , bucket + 1 );
55+ compensations = bigArrays ().grow (compensations , bucket + 1 );
4956 // Compute the sum of double values with Kahan summation algorithm which is more
5057 // accurate than naive summation.
5158 double sum = sums .get (bucket );
5259 double compensation = compensations .get (bucket );
5360 kahanSummation .reset (sum , compensation );
54- switch (rateMode ) {
55- case SUM :
56- for (int i = 0 ; i < valuesCount ; i ++) {
57- kahanSummation .add (values .nextValue ());
58- }
59- break ;
60- case VALUE_COUNT :
61- kahanSummation .add (valuesCount );
62- break ;
63- default :
64- throw new IllegalArgumentException ("Unsupported rate mode " + rateMode );
65- }
6661
62+ final int docCount = docCountProvider .getDocCount (doc );
63+ kahanSummation .add (docCount );
6764 compensations .set (bucket , kahanSummation .delta ());
6865 sums .set (bucket , kahanSummation .value ());
6966 }
70- }
71- };
67+ };
68+ } else {
69+ final SortedNumericDoubleValues values = ((ValuesSource .Numeric ) valuesSource ).doubleValues (ctx );
70+ return new LeafBucketCollectorBase (sub , values ) {
71+ @ Override
72+ public void collect (int doc , long bucket ) throws IOException {
73+ sums = bigArrays ().grow (sums , bucket + 1 );
74+ compensations = bigArrays ().grow (compensations , bucket + 1 );
75+
76+ if (values .advanceExact (doc )) {
77+ final int valuesCount = values .docValueCount ();
78+ // Compute the sum of double values with Kahan summation algorithm which is more
79+ // accurate than naive summation.
80+ double sum = sums .get (bucket );
81+ double compensation = compensations .get (bucket );
82+ kahanSummation .reset (sum , compensation );
83+ switch (rateMode ) {
84+ case SUM :
85+ for (int i = 0 ; i < valuesCount ; i ++) {
86+ kahanSummation .add (values .nextValue ());
87+ }
88+ break ;
89+ case VALUE_COUNT :
90+ kahanSummation .add (valuesCount );
91+ break ;
92+ default :
93+ throw new IllegalArgumentException ("Unsupported rate mode " + rateMode );
94+ }
95+
96+ compensations .set (bucket , kahanSummation .delta ());
97+ sums .set (bucket , kahanSummation .value ());
98+ }
99+ }
100+ };
101+ }
72102 }
73103}
0 commit comments