-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support vectorized append and compare for multi group by #12996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize) { | ||
| if NULLABLE { | ||
| self.nulls.append(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be optimized to append nulls for entire batch instead of per value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I plan to refactor the interface for supporting input a rows: &[usize].
And make all parts' appending vectorized, and see the performance again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(i.e. remove it here and call it in such a way we use https://docs.rs/arrow/latest/arrow/array/struct.BooleanBufferBuilder.html#method.append_n
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the append_batch function to support vectorized append more better.
But the improvement seems still not obvious. #12996 (comment)
🤔 I guess, it is likely due the new introduced branch of equal_to:
if *group_idx < group_values_len {
for (i, group_val) in self.group_values.iter().enumerate() {
if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row)
{
return false;
}
}
} else {
let row_idx_offset = group_idx - group_values_len;
let row_idx = self.append_rows_buffer[row_idx_offset];
return is_rows_eq(cols, row, cols, row_idx).unwrap();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To eliminate this extra branch, I think we need to refactor the intern process metioned in #12821 (comment)
I am trying it.
|
The latest benchmark numbers: |
| core(array, row); | ||
| struct AggregationHashTable<T: AggregationHashTableEntry> { | ||
| /// Raw table storing values in a `Vec` | ||
| raw_table: Vec<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some experiments in changing hash join algorithm, I think it's likely hashbrown performs much better than implementing a hashtable ourselves although I would like to be surprised 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some experiments in changing hash join algorithm, I think it's likely
hashbrownperforms much better than implementing a hashtable ourselves although I would like to be surprised 🙂
🤔 Even if we can perform something like vectorized compare or vectorized append in our hashtable?
I found in multi group by case, we will perform the compare for each row leading to the array downcasting again and again... And actually the downcast operation will be compiled to many asm codes....
And I foudn we can't eliminate it and perform the vectorized compare with hashbrown...
fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
let array = array.as_byte_view::<B>();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still do "vectorized compare" by doing the lookup in the hashtable (based on hash value only) and the vectorized equality check separately. That way you still can use the fast hashtable, but move the equality check to a separate/vectorized step.
That's at least what is done in the vectorized hash join implementation :). I changed it before to use a Vec-based index like you did here, but that performed significantly worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I think is that the lookup is incredibly well optimized using the swiss table design and you get fewer 'false" candidates to check for, while we can still use the vectorized/type specialized equality check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, thank you!
601c7b2 to
d4b5820
Compare
3415659 to
d79b813
Compare
|
The logic is a bit complex, I plan to finish and do benchmark for it today. |
e0635fe to
c6f8074
Compare
c6f8074 to
14fffb8
Compare
|
This is top of my list to review tomorrow morning |
jayzhan211
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
I am giving this a final review now |
|
Performance results: 🚀 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 @Rachelint @jayzhan211 @2010YOUY01 and @Dandandan. What great teamwork
This PR is really nice in my opinion. It makes a super tricky and performance sensitive part of the code about as clear as I could imagine it to be.
I also ran some code coverage on this
nice cargo llvm-cov --html test --test fuzz -- aggregate
nice cargo llvm-cov --html test -p datafusion-physical-plan -- group_valuesAnd verified that the new code was well covered
| } | ||
|
|
||
| impl GroupValuesColumn { | ||
| /// Buffers to store intermediate results in `vectorized_append` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
|
||
| /// Create a new instance of GroupValuesColumn if supported for the specified schema | ||
| pub fn try_new(schema: SchemaRef) -> Result<Self> { | ||
| let map = RawTable::with_capacity(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This with_capacity can probably be improved (as a follow on PR) to avoid some smaller allocations
| /// `Group indices` order are against with their input order, and this will lead to error | ||
| /// in `streaming aggregation`. | ||
| /// | ||
| fn scalarized_intern( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is basically the same as GroupValuesColumn::intern was previously, which makes sense to me
| fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; | ||
|
|
||
| /// Appends the row at `row` in `array` to this builder | ||
| fn append_val(&mut self, array: &ArrayRef, row: usize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as a follow on we can consider removing append_val and equal_to and simpl change all codepaths to use the vectorized version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried about if we merge them, some extra if else will be introduced.
It hurt much for performance for the row level operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good thing to benchmark (as a follow on PR) perhaps
| /// it will record the `true` result at the corresponding | ||
| /// position in `equal_to_results`. | ||
| /// | ||
| /// And if found nth result in `equal_to_results` is already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is quite clever to pass in the existing "is equal to results"
|
|
||
| (false, _) => { | ||
| for &row in rows { | ||
| self.group_values.push(arr.value(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆 I think we can even do more, like check if rows.len() == array.len(), if so we just perform extend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already could use extend instead of push? extend on Vec is somewhat faster than push as the capacity check / allocation is done once instead of once per value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are several things that could be done to make the append even faster:
extend_from_sliceif rows.len() == array.len()- use
extendrather thanpushfor values - Speed up appending nulls (don't append bits one by one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already could use extend instead of
push?extendonVecis somewhat faster than push as the capacity check / allocation is done once instead of once per value.
Ok, I got it, I think again and found it indeed simple to do it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are several things that could be done to make the append even faster:
1. `extend_from_slice` `if rows.len() == array.len()` 2. use `extend` rather than `push` for values 3. Speed up appending nulls (don't append bits one by one)
I filed an issue to tracking the potential improvements for vecotrized operations.
#13275
| }; | ||
| } | ||
|
|
||
| fn vectorized_equal_to( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What i have been dreaming about with @XiangpengHao is maybe something like adding take / filter to arrow array builders
I took this opportunity to write up the idea (finally) for your amusement:
|
As my admittedly sparse help for this PR I have filed some additional tickets for follow on work after this PR is merged: |
|
I don't think we need to wait on this PR anymore, let's merge it in and keep moving forward. Thank you everyone again! |
|
Update here is that this is looking like it results in some sweet clickbench improvements: |
* simple support vectorized append. * fix tests. * some logs. * add `append_n` in `MaybeNullBufferBuilder`. * impl basic append_batch * fix equal to. * define `GroupIndexContext`. * define the structs useful in vectorizing. * re-define some structs for vectorized operations. * impl some vectorized logics. * impl chekcing hashmap stage. * fix compile. * tmp * define and impl `vectorized_compare`. * fix compile. * impl `vectorized_equal_to`. * impl `vectorized_append`. * finish the basic vectorized ops logic. * impl `take_n`. * fix `renaming clear` and `groups fill`. * fix death loop due to rehashing. * fix vectorized append. * add counter. * use extend rather than resize. * remove dbg!. * remove reserve. * refactor the codes to make simpler and more performant. * clear `scalarized_indices` in `intern` to avoid some corner case. * fix `scalarized_equal_to`. * fallback to total scalarized `GroupValuesColumn` in streaming aggregation. * add unit test for `VectorizedGroupValuesColumn`. * add unit test for emitting first n in `VectorizedGroupValuesColumn`. * sort out tests codes in for group columns and add vectorized tests for primitives. * add vectorized test for byte builder. * add vectorized test for byte view builder. * add test for the all nulls or not nulls branches in vectorized. * fix clippy. * fix fmt. * fix compile in rust 1.79. * improve comments. * fix doc. * add more comments to explain the really complex vectorized intern process. * add comments to explain why we still need origin `GroupValuesColumn`. * remove some stale comments. * fix clippy. * add comments for `vectorized_equal_to` and `vectorized_append`. * fix clippy. * use zip to simplify codes. * use izip to simplify codes. * Update datafusion/physical-plan/src/aggregates/group_values/group_column.rs Co-authored-by: Jay Zhan <[email protected]> * first_n attempt Signed-off-by: jayzhan211 <[email protected]> * add test Signed-off-by: jayzhan211 <[email protected]> * improve hashtable modifying in emit first n test. * add `emit_group_index_list_buffer` to avoid allocating new `Vec` to store the remaining gourp indices. * make comments in VectorizedGroupValuesColumn::intern simpler and clearer. * define `VectorizedOperationBuffers` to hold buffers used in vectorized operations to make code clearer. * unify `VectorizedGroupValuesColumn` and `GroupValuesColumn`. * fix fmt. * fix comments. * fix clippy. --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: Jay Zhan <[email protected]> (cherry picked from commit 345117b)
… v44 * simple support vectorized append. * fix tests. * some logs. * add `append_n` in `MaybeNullBufferBuilder`. * impl basic append_batch * fix equal to. * define `GroupIndexContext`. * define the structs useful in vectorizing. * re-define some structs for vectorized operations. * impl some vectorized logics. * impl chekcing hashmap stage. * fix compile. * tmp * define and impl `vectorized_compare`. * fix compile. * impl `vectorized_equal_to`. * impl `vectorized_append`. * finish the basic vectorized ops logic. * impl `take_n`. * fix `renaming clear` and `groups fill`. * fix death loop due to rehashing. * fix vectorized append. * add counter. * use extend rather than resize. * remove dbg!. * remove reserve. * refactor the codes to make simpler and more performant. * clear `scalarized_indices` in `intern` to avoid some corner case. * fix `scalarized_equal_to`. * fallback to total scalarized `GroupValuesColumn` in streaming aggregation. * add unit test for `VectorizedGroupValuesColumn`. * add unit test for emitting first n in `VectorizedGroupValuesColumn`. * sort out tests codes in for group columns and add vectorized tests for primitives. * add vectorized test for byte builder. * add vectorized test for byte view builder. * add test for the all nulls or not nulls branches in vectorized. * fix clippy. * fix fmt. * fix compile in rust 1.79. * improve comments. * fix doc. * add more comments to explain the really complex vectorized intern process. * add comments to explain why we still need origin `GroupValuesColumn`. * remove some stale comments. * fix clippy. * add comments for `vectorized_equal_to` and `vectorized_append`. * fix clippy. * use zip to simplify codes. * use izip to simplify codes. * Update datafusion/physical-plan/src/aggregates/group_values/group_column.rs Co-authored-by: Jay Zhan <[email protected]> * first_n attempt Signed-off-by: jayzhan211 <[email protected]> * add test Signed-off-by: jayzhan211 <[email protected]> * improve hashtable modifying in emit first n test. * add `emit_group_index_list_buffer` to avoid allocating new `Vec` to store the remaining gourp indices. * make comments in VectorizedGroupValuesColumn::intern simpler and clearer. * define `VectorizedOperationBuffers` to hold buffers used in vectorized operations to make code clearer. * unify `VectorizedGroupValuesColumn` and `GroupValuesColumn`. * fix fmt. * fix comments. * fix clippy. --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: Jay Zhan <[email protected]> (cherry picked from commit 345117b)

Which issue does this PR close?
Closes #.
Related to
Rationale for this change
Although
GroupValuesColumnis stored themulti gourp by valuesincolumn orientedway.However, it still use
row orientedapproach to performappendandequal to.The most obvious overhead is that we need to downcast the
arraywhen processing each row, and instructions for downcast is actually not few, and even worse it will introduce branches.And as I guess, the
row orientedapproach will also increase the random memory accesses but I am not sure.What changes are included in this PR?
This pr introduce the
vectorized appendandvectorized equal toforGroupValuesColumn.But such vectorized appoach is not compatible with
streaming aggregationdepending on the order betweeninput rowsand their correspondinggourp indices.So I define a new
VectorizedGroupValuesColumnfor optimizingnon streaming aggregationcases, and keep the originalGroupValuesColumnfor thestreaming aggregationcases.Are these changes tested?
Yes, I think enough new unit tests are added.
Are there any user-facing changes?
No.