-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
MinMaxBytesAccumulator's update_batch function has runtime that quadratic in the number of groups accumulated: On each update_batch call, the implementation allocates a new vector that is sized in the number of total groups accumulated. As more and more groups are added, this allocation grows and eventually dominates runtime.
datafusion/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Lines 438 to 456 in 10a437b
| fn update_batch<'a, F, I>( | |
| &mut self, | |
| iter: I, | |
| group_indices: &[usize], | |
| total_num_groups: usize, | |
| mut cmp: F, | |
| ) -> Result<()> | |
| where | |
| F: FnMut(&[u8], &[u8]) -> bool + Send + Sync, | |
| I: IntoIterator<Item = Option<&'a [u8]>>, | |
| { | |
| self.min_max.resize(total_num_groups, None); | |
| // Minimize value copies by calculating the new min/maxes for each group | |
| // in this batch (either the existing min/max or the new input value) | |
| // and updating the owned values in `self.min_maxes` at most once | |
| let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups]; | |
| // Figure out the new min value for each group | |
| for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) { |
This completely kills performance for high-cardinality aggregations that use this accumulator. One such query would be:
select l_orderkey, l_partkey, min(l_comment) from lineitem group by l_orderkey, l_partkey
... ran on a TPC-H dataset.
To Reproduce
No response
Expected behavior
No response
Additional context
Possible fixes:
- Use a hash table (sized for the number of groups in the batch) instead of a vector
Possible follow-ups:
- Reuse the hash table between update_batch calls