Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 3 additions & 120 deletions crates/iceberg/src/expr/visitors/page_index_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
}

fn or(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result<RowSelection> {
Ok(union_row_selections(&lhs, &rhs))
Ok(lhs.union(&rhs))
}

fn not(&mut self, _: RowSelection) -> Result<RowSelection> {
Expand Down Expand Up @@ -772,99 +772,12 @@ impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
}
}

/// Combine two lists of `RowSelection` return the union of them
/// For example:
/// self: NNYYYYNNYYNYN
/// other: NYNNNNNNY
///
/// returned: NYYYYYNNYYNYN
///
/// This can be removed from here once RowSelection::union is in parquet::arrow
/// (Hopefully once https://github.com/apache/arrow-rs/pull/6308 gets merged)
fn union_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelection {
let mut l_iter = left.iter().copied().peekable();
let mut r_iter = right.iter().copied().peekable();

let iter = std::iter::from_fn(move || {
loop {
let l = l_iter.peek_mut();
let r = r_iter.peek_mut();

match (l, r) {
(Some(a), _) if a.row_count == 0 => {
l_iter.next().unwrap();
}
(_, Some(b)) if b.row_count == 0 => {
r_iter.next().unwrap();
}
(Some(l), Some(r)) => {
return match (l.skip, r.skip) {
// Skip both ranges
(true, true) => {
if l.row_count < r.row_count {
let skip = l.row_count;
r.row_count -= l.row_count;
l_iter.next();
Some(RowSelector::skip(skip))
} else {
let skip = r.row_count;
l.row_count -= skip;
r_iter.next();
Some(RowSelector::skip(skip))
}
}
// Keep rows from left
(false, true) => {
if l.row_count < r.row_count {
r.row_count -= l.row_count;
l_iter.next()
} else {
let r_row_count = r.row_count;
l.row_count -= r_row_count;
r_iter.next();
Some(RowSelector::select(r_row_count))
}
}
// Keep rows from right
(true, false) => {
if l.row_count < r.row_count {
let l_row_count = l.row_count;
r.row_count -= l_row_count;
l_iter.next();
Some(RowSelector::select(l_row_count))
} else {
l.row_count -= r.row_count;
r_iter.next()
}
}
// Keep at least one
_ => {
if l.row_count < r.row_count {
r.row_count -= l.row_count;
l_iter.next()
} else {
l.row_count -= r.row_count;
r_iter.next()
}
}
};
}
(Some(_), None) => return l_iter.next(),
(None, Some(_)) => return r_iter.next(),
(None, None) => return None,
}
}
});

iter.collect()
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::arrow_reader::RowSelector;
use parquet::basic::{LogicalType as ParquetLogicalType, Type as ParquetPhysicalType};
use parquet::data_type::ByteArray;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
Expand All @@ -877,41 +790,11 @@ mod tests {
};
use rand::{thread_rng, Rng};

use super::{union_row_selections, PageIndexEvaluator};
use super::PageIndexEvaluator;
use crate::expr::{Bind, Reference};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::{ErrorKind, Result};

#[test]
fn test_union_row_selections() {
let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
let result = union_row_selections(&selection, &selection);
assert_eq!(result, selection);

// NYNYY
let a = RowSelection::from(vec![
RowSelector::skip(10),
RowSelector::select(10),
RowSelector::skip(10),
RowSelector::select(20),
]);

// NNYYN
let b = RowSelection::from(vec![
RowSelector::skip(20),
RowSelector::select(20),
RowSelector::skip(10),
]);

let result = union_row_selections(&a, &b);

// NYYYY
assert_eq!(result.iter().collect::<Vec<_>>(), vec![
&RowSelector::skip(10),
&RowSelector::select(40)
]);
}

#[test]
fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
let row_group_metadata = create_row_group_metadata(0, 0, None, 0, None)?;
Expand Down
Loading