-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add a flag to control in index order execution mode to aggregations #82129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a flag to control in index order execution mode to aggregations #82129
Conversation
Adds a flag to control in index order execution mode to aggregations and introduces the basic time_series aggregation that triggers that mode. Relates to elastic#74660
|
Pinging @elastic/es-analytics-geo (Team:Analytics) |
romseygeek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass over the lucene-specific parts and left some comments; I think we may be able to do this using a combination of CollectorManager and leaf segment sorters, but I need to think about it carefully.
server/src/main/java/org/apache/lucene/search/ConcurrentTopScoreDocCollector.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/lucene/search/ConcurrentTopScoreDocCollector.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
Show resolved
Hide resolved
|
My main concern with this approach is that the Collector API has a general expectation that doc IDs get collected in global order. So reusing the Collector API as-is this way creates room for bugs since one could mistakenly use a random collector from Lucene assuming that it would work, but it actually wouldn't because documents wouldn't get collected the way it expects. I guess there are two main ways how we could avoid this issue.
|
|
@romseygeek, @nik9000 I have made some changes as we discussed. Could you take another look? |
|
@nik9000 it has been a week, do you think you can take a look at some time soon? |
nik9000
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a few small things. Otherwise, it's what i expected us to do.
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
Outdated
Show resolved
Hide resolved
...pi-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/450_time_series.yml
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java
Outdated
Show resolved
Hide resolved
| TimeSeriesAggregationBuilder.PARSER | ||
| ).addResultReader(InternalTimeSeries::new), | ||
| builder | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for anyone else scanning this PR - this skips registering the agg if the feature flag is not enabled. The agg is still registered regardless of the index's mode.
| */ | ||
| public static TimeSeriesAggregationBuilder timeSeries(String name) { | ||
| return new TimeSeriesAggregationBuilder(name); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to keep this class around any more? I thought it was mostly for the transport client and I've been sort of ignoring it for years.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is nice for IT tests that I really needed here to make sure that whole thing works. I am ok with removing it as a class, but we probably need a bigger discussion and do it outside of this PR.
.../src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
Outdated
Show resolved
Hide resolved
| docId = iterator.nextDoc(); | ||
| if (docId != DocIdSetIterator.NO_MORE_DOCS && (liveDocs == null || liveDocs.get(docId))) { | ||
| if (tsids.advanceExact(docId)) { | ||
| BytesRef tsid = tsids.lookupOrd(tsids.nextOrd()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice be able to avoid ord lookups if we matched the old one, I think. I've not dug into the lookup code in a while though. But, either way, that can wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to replace this whole thing with generic FieldComparator and index order stuff.
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java
Show resolved
Hide resolved
| @Override | ||
| public void collect(int doc, long bucket) throws IOException { | ||
| if (tsids.advanceExact(doc)) { | ||
| BytesRef newTsid = tsids.nextValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not do value lookups on every document, since these are costly operations. We could take advantage of in-order iteration to keep track of the previous segment ordinal and the previous bucket ordinal. When the current segment ordinal is the same as the previous segment ordinal, we know that the bucket ordinal is the same as well, and we only need to call lookupOrd whenever the ordinal changes, which is bounded by the unique number of time series that exist in the segment (which should be a much smaller number than the number of docs in the segment)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I would like to replace this whole part in a follow up with getting data based on the actually specified index sort order instead of hard coded tsids and timestamps. Are you ok, if I address these optimizations there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fine with a follow-up.
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
Outdated
Show resolved
Hide resolved
| docId = iterator.nextDoc(); | ||
| if (docId != DocIdSetIterator.NO_MORE_DOCS && (liveDocs == null || liveDocs.get(docId))) { | ||
| if (tsids.advanceExact(docId)) { | ||
| BytesRef tsid = tsids.lookupOrd(tsids.nextOrd()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid doing lookupOrd on each document? E.g. we could keep track of the previous ordinal and only reload the binary tsid if the ordinal is different from the previous one. Or use global ordinals.
| queue.pop(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we'll probably want to optimize the case when there is a single leaf in the priority queue to do a simple for loop, which would happen if:
- the time range filter is thin and matches a single segment
- the shard has been force-merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking a bit deeper into this: the current implementation of this PR calls updateTop or pop on every document. But these are not cheap because they force at least one comparison, which is not free as the tsid values are arbitrary-length BytesRefs objects that could have long common prefixes.
We expect many docs to have the same TSID, so we could do better by popping the top entry of the priority queue. Then two cases:
- If the priority queue is empty or the new top entry of the PQ has a different TSID, then we can iterate all documents of the current entry without doing any comparison.
- If the new top entry of the PQ has the same TSID, we look at its timestamp value. And we know that we can collect all documents of the current entry until we find a timestamp that is greater than the timestamp of the top entry. And for all these documents, we only had to compare the timestamp (cheap), not the TSID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd mentioned something vague about "we should use tsid's ordinals to skip the tsid comparison" which sounds like the thing you are talking about in the second point.
I'm wondering if these are "for a followup" things. They are 100% good things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A follow-up would be totally fine with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned before I would like to switch to actual index settings here and make it more generic.
.../src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java
Show resolved
Hide resolved
| queue.pop(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that the way we are doing search by-passes checks whether the query has been cancelled or timed out, so we'll need to add this logic to this search implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I hadn't thought about cancelation when I was looking last.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we will need to add this logic. I will do it as a follow up.
nik9000
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few tiny things. I think @jpountz's comments are more important. Though I'm not sure if they all should block merging or be important follow ups. We should totally do all the things he mentioned though.
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
Outdated
Show resolved
Hide resolved
| queue.pop(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd mentioned something vague about "we should use tsid's ordinals to skip the tsid comparison" which sounds like the thing you are talking about in the second point.
I'm wondering if these are "for a followup" things. They are 100% good things.
| queue.pop(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I hadn't thought about cancelation when I was looking last.
|
@elasticmachine update branch |
…PR, and remove the local TimeSeriesIndexSearcher
|
|
||
| /** | ||
| * An IndexSearcher wrapper that executes the searches in time-series indices by traversing them by tsid and timestamp | ||
| * TODO: Convert it to use index sort instead of hard-coded tsid and timestamp values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: it is now difficult to check if a sorted field is a multi-valued field, sorted multi-valued fields are based on index.sort.mode, not all values in a doc are sorted, LeafWalker maybe can not work well in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I don't click summit review before, Now I start the review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I describe more clearly. Now It's in the time_series sense, the index is sort by _tsid and @timestamp, and the _tsid and @timestamp must be a single value field, so it's ok.
But in a more common index sort sense, it may meet the multi-valued fields problem. e.g:
index is sort by foo, the setting is
index.sort.field = foo.
index.sort.order = asc.
index.sort.mode = min.
the sample doc:
doc1: {"foo":[1,3,5,7]}
doc2: {"foo":[2,4,6,8]}
doc3: {"foo":[3,4,5]}
As index.sort.mode = min, To get the sorted doc, the order is doc1->doc2->doc3, but doc2\doc3 have some value that are small than doc1.
so traveling the foo field is not in a real sequential way.
The issue (#80825) is doing to set the value to single-valued, when the sorted field is set to be a single-valued, it's ok to travel them sequential.
Adds a flag to control in index order execution mode to aggregations and
introduces the basic time_series aggregation that triggers that mode.
Relates to #74660