[WIP] [SPARK-4273] [SQL] Providing ExternalSet to avoid OOM when count(distinct) #3137
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Some task may OOM when count(distinct) if it needs to process many records. CombineSetsAndCountFunction puts all records into an OpenHashSet, if it fetchs many records, it may occupy large memory.
I think a data structure ExternalSet like ExternalAppendOnlyMap could be provided to store OpenHashSet data in disks when it's capacity exceeds some threshold.
For example, OpenHashSet1(ohs1) has [d, b, c, a]. It is spilled to file1 with hashCode sorted, then the file1 contains [a, b, c, d]. The procedure could be indicated as follows:
ohs1 [d, b, c, a] => [a, b, c, d] => file1
ohs2 [e, f, g, a] => [a, e, f, g] => file2
ohs3 [e, h, i, g] => [e, g, h, i] => file3
ohs4 [j, h, a] => [a, h, j] => sortedSet
When output, all keys with the same hashCode will be put into a OpenHashSet, then the iterator of this OpenHashSet is accessing. The procedure could be indicated as follows:
file1-> a -> ohsA; file2 -> a -> ohsA; sortedSet -> a -> ohsA; ohsA -> a;
file1 -> b -> ohsB; ohsB -> b;
file1 -> c -> ohsC; ohsC -> c;
file1 -> d -> ohsD; ohsD -> d;
file2 > e -> ohsE; file3 -> e -> ohsE; ohsE> e;
...
I think using the ExternalSet could avoid OOM when count(distinct). Welcomes comments.