-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Reduce memory for big aggs run against many shards #54758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This changes the behavior of aggregations when search is performed against enough shards to enable "batch reduce" mode. In this case we force always store aggregations in serialized form rather than a traditional java reference. This should shrink the memory usage of large aggregations at the cost of slightly slowing down aggregations where the coordinating node is also a data node. Because we're only doing this when there are many shards this is likely to be fairly rare. As a side effect this lets us add logs for the memory usage of the aggs buffer: ``` [2020-04-03T17:03:57,052][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1320->448] max [1320] [2020-04-03T17:03:57,089][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,102][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,103][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,105][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [888] max [1328] ``` These are useful, but you need to keep some things in mind before trusting them: 1. The buffers are oversized ala Lucene's ArrayUtils. This means that we are using more space than we need, but probably not much more. 2. Before they are merged the aggregations are inflated into their traditional Java objects which *probably* take up a lot more space than the serialized form. That is, after all, the reason why we store them in serialized form in the first place. And, just because I can, here is another example of the log: ``` [2020-04-03T17:06:18,731][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,750][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,809][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,827][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,829][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [98352] max [147528] ``` I got that last one by building a ten shard index with a million docs in it and running a `sum` in three layers of `terms` aggregations, all on `long` fields, and with a `batched_reduce_size` of `3`.
|
Pinging @elastic/es-analytics-geo (:Analytics/Aggregations) |
|
Just getting the logs from this seem super useful. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I should add the task id to the output. That'd help a bit with debugging because setting the task manager to trace logging logs the query. Not that it is a good choice on a busy system, but it could be useful.
I did look into returning this data in other ways but I couldn't come up with the "right" way. And it is super useful to be able to see the partial reduction memory usage. I mean, it'd probably be useful in production. But I think it'll be super useful for me when I'm just hacking on things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have SearchProgressListener I think it'd make more sense to forward this information through that interface and have a default implementation that logs the reduction. That is a little more work but makes it so I can test that we make these calls in a sane way which is nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to move this information into SearchTask so you can see it in the tasks API. That seems useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other other hand maybe this is a good start and adding it to the SearchTask would be a good change for a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to add in the SearchTask as a follow up
|
Woah! That test failure is because ScriptedMetric can't be serialized without first being reduced. I wonder if that is a bug in CCS. |
being final reduced. |
I've verified that this is indeed a bug with scripted metric agg used in cross cluster search. I'll fix that in a separate PR because it'll also fix this. |
`scripted_metric` did not work with cross cluster search because it assumed that you'd never perform a partial reduction, serialize the results, and then perform a final reduction. That serialized-after-partial-reduction step was broken. This is also required to support elastic#54758.
|
Blocked on #54776. |
|
run elasticsearch-ci/docs |
`scripted_metric` did not work with cross cluster search because it assumed that you'd never perform a partial reduction, serialize the results, and then perform a final reduction. That serialized-after-partial-reduction step was broken. This is also required to support #54758.
`scripted_metric` did not work with cross cluster search because it assumed that you'd never perform a partial reduction, serialize the results, and then perform a final reduction. That serialized-after-partial-reduction step was broken. This is also required to support elastic#54758.
`scripted_metric` did not work with cross cluster search because it assumed that you'd never perform a partial reduction, serialize the results, and then perform a final reduction. That serialized-after-partial-reduction step was broken. This is also required to support #54758.
| if (hasAggs || hasTopDocs) { | ||
| progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards), | ||
| topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases); | ||
| topDocsStats.getTotalHits(), reducedAggs, numReducePhases); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked to @jimczi and @javanna and this line is a release blocker. We're pretty ok merging it, but not releasing it. Because async_search keeps a hard reference to the aggs passed to it. Actually async search has all kinds of trouble with aggs because it doesn't perform the final reduction until sync search would. But it does return aggs without the final reduction applied if you get the "progress" of the search. These aggs are going to be "funny". They'll be missing pipeline aggs, for instant. And scripted_metric will be borked in some way. As will a lot of other things. But you'll mostly get something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually async search has all kinds of trouble with aggs because it doesn't perform the final reduction until sync search would. But it does return aggs without the final reduction applied if you get the "progress" of the search. These aggs are going to be "funny". They'll be missing pipeline aggs, for instant. And scripted_metric will be borked in some way. As will a lot of other things. But you'll mostly get something.
Scratch that - it does perform the final reduction when you fetch the result. You could still get weird results because things are missing, but they'll be a lot less weird than I was thinking.
jimczi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks good to me.
As discussed offline, we'll need to expose the serialized result to the search progress listener in order to avoid doubling the memory on async search.
This changes the behavior of aggregations when search is performed against enough shards to enable "batch reduce" mode. In this case we force always store aggregations in serialized form rather than a traditional java reference. This should shrink the memory usage of large aggregations at the cost of slightly slowing down aggregations where the coordinating node is also a data node. Because we're only doing this when there are many shards this is likely to be fairly rare. As a side effect this lets us add logs for the memory usage of the aggs buffer: ``` [2020-04-03T17:03:57,052][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1320->448] max [1320] [2020-04-03T17:03:57,089][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,102][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,103][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,105][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [888] max [1328] ``` These are useful, but you need to keep some things in mind before trusting them: 1. The buffers are oversized ala Lucene's ArrayUtils. This means that we are using more space than we need, but probably not much more. 2. Before they are merged the aggregations are inflated into their traditional Java objects which *probably* take up a lot more space than the serialized form. That is, after all, the reason why we store them in serialized form in the first place. And, just because I can, here is another example of the log: ``` [2020-04-03T17:06:18,731][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,750][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,809][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,827][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,829][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [98352] max [147528] ``` I got that last one by building a ten shard index with a million docs in it and running a `sum` in three layers of `terms` aggregations, all on `long` fields, and with a `batched_reduce_size` of `3`.
This changes the behavior of aggregations when search is performed against enough shards to enable "batch reduce" mode. In this case we force always store aggregations in serialized form rather than a traditional java reference. This should shrink the memory usage of large aggregations at the cost of slightly slowing down aggregations where the coordinating node is also a data node. Because we're only doing this when there are many shards this is likely to be fairly rare. As a side effect this lets us add logs for the memory usage of the aggs buffer: ``` [2020-04-03T17:03:57,052][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1320->448] max [1320] [2020-04-03T17:03:57,089][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,102][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,103][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [1328->448] max [1328] [2020-04-03T17:03:57,105][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [888] max [1328] ``` These are useful, but you need to keep some things in mind before trusting them: 1. The buffers are oversized ala Lucene's ArrayUtils. This means that we are using more space than we need, but probably not much more. 2. Before they are merged the aggregations are inflated into their traditional Java objects which *probably* take up a lot more space than the serialized form. That is, after all, the reason why we store them in serialized form in the first place. And, just because I can, here is another example of the log: ``` [2020-04-03T17:06:18,731][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,750][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,809][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,827][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs partial reduction [147528->49176] max [147528] [2020-04-03T17:06:18,829][TRACE][o.e.a.s.SearchPhaseController] [runTask-0] aggs final reduction [98352] max [147528] ``` I got that last one by building a ten shard index with a million docs in it and running a `sum` in three layers of `terms` aggregations, all on `long` fields, and with a `batched_reduce_size` of `3`.
|
The PR description for this isn't quite right. I mean it is all true, but the most important part of this PR is missing. It should be more like this: This attempts to save memory on the coordinating node by delaying deserializaion of the shard results for the aggregation until the last second. This is nice because it makes the shard-aggregations results "short lived" garbage. It also should shrink the memory usage of aggs when they are "waiting" to be merged. Additionally, when the search is in "batched reduce mode" we force the results to be serialized between batch reduces in an attempt to keep the memory usage as low as possible between reductions. |
NOTE: See this comment for a clarification of the PR. What follows isn't 100% correct.
This changes the behavior of aggregations when search is performed
against enough shards to enable "batch reduce" mode. In this case we
force always store aggregations in serialized form rather than a
traditional java reference. This should shrink the memory usage of large
aggregations at the cost of slightly slowing down aggregations where the
coordinating node is also a data node. Because we're only doing this
when there are many shards this is likely to be fairly rare.
As a side effect this lets us add logs for the memory usage of the aggs
buffer:
These are useful, but you need to keep some things in mind before
trusting them:
are using more space than we need, but probably not much more.
traditional Java objects which probably take up a lot more space
than the serialized form. That is, after all, the reason why we store
them in serialized form in the first place.
And, just because I can, here is another example of the log:
I got that last one by building a ten shard index with a million docs in
it and running a
sumin three layers oftermsaggregations, all onlongfields, and with abatched_reduce_sizeof3.