Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,25 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;

public class BucketSelectorPipelineAggregator extends PipelineAggregator {
private GapPolicy gapPolicy;
private final GapPolicy gapPolicy;

private Script script;
private final Script script;

private Map<String, String> bucketsPathsMap;
private final Predicate<Object> keepBucket;

private final Map<String, String> bucketsPathsMap;

public BucketSelectorPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, GapPolicy gapPolicy,
Map<String, Object> metadata) {
super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata);
this.bucketsPathsMap = bucketsPathsMap;
this.script = script;
keepBucket = buildKeepBucket(script);
this.gapPolicy = gapPolicy;
}

Expand All @@ -60,10 +64,19 @@ public BucketSelectorPipelineAggregator(String name, Map<String, String> buckets
public BucketSelectorPipelineAggregator(StreamInput in) throws IOException {
super(in);
script = new Script(in);
keepBucket = buildKeepBucket(script);
gapPolicy = GapPolicy.readFrom(in);
bucketsPathsMap = (Map<String, String>) in.readGenericValue();
}

private static Predicate<Object> buildKeepBucket(Script script) {
if ("expression".equals(script.getLang())) {
return val -> (double) val == 1.0;
} else {
return val -> (boolean) val;
}
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
script.writeTo(out);
Expand Down Expand Up @@ -98,15 +111,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
// TODO: can we use one instance of the script for all buckets? it should be stateless?
ExecutableScript executableScript = factory.newInstance(vars);
Object scriptReturnValue = executableScript.run();
final boolean keepBucket;
// TODO: WTF!!!!!
if ("expression".equals(script.getLang())) {
double scriptDoubleValue = (double) scriptReturnValue;
keepBucket = scriptDoubleValue == 1.0;
} else {
keepBucket = (boolean) scriptReturnValue;
}
if (keepBucket) {
if (keepBucket.test(scriptReturnValue)) {
newBuckets.add(bucket);
}
}
Expand Down