-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap #16844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @JoshRosen, @viirya |
|
Test build #72541 has finished for PR 16844 at commit
|
|
LGTM |
| isDefined = true; | ||
|
|
||
| if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { | ||
| if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The re-allocated space might not be used if no further insertion. Shall we do growAndRehash at the beginning of append when numKeys == growthThreshold && !isDefined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we can't grow in the beginning, otherwise the pos will be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. LGTM.
if it fail to grow once (stay as intial capacity).
|
@viirya Addressed your comment, also fixed another bug (updated PR description). |
|
Test build #72594 has finished for PR 16844 at commit
|
|
Test build #3566 has finished for PR 16844 at commit
|
| try { | ||
| growAndRehash(); | ||
| } catch (OutOfMemoryError oom) { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but this OutOfMemoryError will not be useful - atleast not in yarn mode.
It will simply cause the jvm to exit.
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { | ||
| if (longArray.size() / 2 == MAX_CAPACITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look correct as per documentation of MAX_CAPACITY.
Actual number of keys == MAX_CAPACITY (so that total number of entries in longArray is MAX_CAPACITY * 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We grow the array when numKeys >= growthThreshold and growthThreshold = capacity * 0.5. But we actually allocate capacity * 2 spaces for the array.
So actually numKeys < growthThreshold = capacity * 0.5 < array length = capacity * 2 should hold true.
Because numKeys < growthThreshold is always true, if numKeys == MAX_CAPACITY, the capacity would be MAX_CAPACITY * 2 at least and the length of array will be more than MAX_CAPACITY * 4.
But in allocate, there is an assert of capacity <= MAX_CAPACITY. Looks like those condition are inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we need to move the appropriate validation check into growAndRehash() and not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two reason it will fail to grow: 1) current capacity (longArray.size() / 2) reach MAX_CAPACITY 2) can't allocate a array (OOM).
So, I think the checking here is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies You are right that the check for longArray.size() / 2 == MAX_CAPACITY is the upper bound beyond which we cant grow. It is simply confusing it do it outside growAndRehash - which is what threw me off.
Please move the check into growAndRehash() and have it return true in case it could successfully grow the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm Do that means we should also rename the growAndRehash to tryGrowAndRehash? I think those are not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The invariant in question is for growAndRehash() - not append, and as such should live there.
Code evolution causing grow to be invoked from elsewhere will require duplication of the invariant everywhere.
Btw, this is in line with all other data structures spark (and other frameworks) have.
| longArray.set(pos * 2 + 1, keyHashcode); | ||
| isDefined = true; | ||
|
|
||
| if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This longArray.size() < MAX_CAPACITY should be wrong condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
longArray.size() is the next capacity for current grow strategy, it should be longArray.size() <= MAX_CAPACITY
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { | ||
| if (longArray.size() / 2 == MAX_CAPACITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is MAX_CAPACITY still the maximum number of keys as per documentation of it? If we can have longArray.size() / 2 == MAX_CAPACITY at most for the capacity, the actually numKeys should be MAX_CAPACITY / 2, because we need two long array entries per key, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies is correct; but it is a slightly unintuitive way to write the condition.
val currentSize = longArray.size()
val newSize = currentSize * 2
val currentKeysLen = currentSize / 2
val newKeysLen = currentKeysLen * 2
if (newKeysLen > MAX_CAPACITY) then fail.
that is if (currentKeysLen == MAX_CAPACITY) then fail // Since we allow only power of 2's for all these values.
that is if (longArray.size() / 2 == MAX_CAPACITY)
Particularly given its location (in append as opposed to grow), it serves to be a bit more confusing that expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the currentKeysLen above is the number of keys, it never equals to currentSize / 2. currentSize / 2 is actually the capacity we want to allocate (but actually we allocate double of it for the array).
Once the number of keys reaches growthThreshold (i.e., capacity * 0.5), we go to grow the array or fail the append. So the number of keys is always less than or equal to capacity * 0.5 which is currentSize * 0.5 * 0.5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, the length's @davies and I mentioned are not actual number of keys in the map, but maximum number of keys possible in the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. It makes sense.
|
Test build #3571 has finished for PR 16844 at commit
|
| // then we don't try to grow again if hit the `growthThreshold`. | ||
| || !canGrowArray && numKeys > growthThreshold) { | ||
| return false; | ||
| if (numKeys >= growthThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to grow the array only if isDefined == false.
…ever grow" This reverts commit d9aa208.
|
Test build #72946 has finished for PR 16844 at commit
|
|
retest this please. |
|
Test build #72977 has finished for PR 16844 at commit
|
JoshRosen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well.
| // The map could be reused from last spill (because of no enough memory to grow), | ||
| // then we don't try to grow again if hit the `growthThreshold`. | ||
| || !canGrowArray && numKeys > growthThreshold) { | ||
| || !canGrowArray && numKeys >= growthThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes sense to me because growthThreshold's Scaladoc says "The map will be expanded once the number of keys exceeds this threshold" and here we're considering the impact of adding an additional key (so this could have also been written as (numKeys + 1) > growthThreshold).
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <[email protected]> Closes #16844 from davies/off_by_one. (cherry picked from commit 3d0c3af) Signed-off-by: Davies Liu <[email protected]>
|
Merging into master, 2.1, 2.0 branch. |
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <[email protected]> Closes #16844 from davies/off_by_one. (cherry picked from commit 3d0c3af) Signed-off-by: Davies Liu <[email protected]>
What changes were proposed in this pull request?
Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).
This PR fix the off-by-one bug in BytesToBytesMap.
This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .
How was this patch tested?
Added regression test.