Skip to content

Commit adc5d20

Browse files
authored
Add missing arrow predicate pushdown implementations for StartsWith, NotStartsWith, In, and NotIn (#404)
* feat: add [not_]starts_with and [not_]in arrow predicate pushdown * fixes from issues highlighted in review
1 parent 070576b commit adc5d20

File tree

6 files changed

+187
-22
lines changed

6 files changed

+187
-22
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ arrow-array = { version = "52" }
4343
arrow-ord = { version = "52" }
4444
arrow-schema = { version = "52" }
4545
arrow-select = { version = "52" }
46+
arrow-string = { version = "52" }
4647
async-stream = "0.3.5"
4748
async-trait = "0.1"
4849
aws-config = "1.1.8"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ arrow-array = { workspace = true }
4444
arrow-ord = { workspace = true }
4545
arrow-schema = { workspace = true }
4646
arrow-select = { workspace = true }
47+
arrow-string = { workspace = true }
4748
async-stream = { workspace = true }
4849
async-trait = { workspace = true }
4950
bimap = { workspace = true }

crates/iceberg/src/arrow/reader.rs

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
2222
use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
2323
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2424
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
25+
use arrow_string::like::starts_with;
2526
use async_stream::try_stream;
2627
use bytes::Bytes;
2728
use fnv::FnvHashSet;
@@ -741,50 +742,107 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
741742

742743
fn starts_with(
743744
&mut self,
744-
_reference: &BoundReference,
745-
_literal: &Datum,
745+
reference: &BoundReference,
746+
literal: &Datum,
746747
_predicate: &BoundPredicate,
747748
) -> Result<Box<PredicateResult>> {
748-
// TODO: Implement starts_with
749-
self.build_always_true()
749+
if let Some(idx) = self.bound_reference(reference)? {
750+
let literal = get_arrow_datum(literal)?;
751+
752+
Ok(Box::new(move |batch| {
753+
let left = project_column(&batch, idx)?;
754+
starts_with(&left, literal.as_ref())
755+
}))
756+
} else {
757+
// A missing column, treating it as null.
758+
self.build_always_false()
759+
}
750760
}
751761

752762
fn not_starts_with(
753763
&mut self,
754-
_reference: &BoundReference,
755-
_literal: &Datum,
764+
reference: &BoundReference,
765+
literal: &Datum,
756766
_predicate: &BoundPredicate,
757767
) -> Result<Box<PredicateResult>> {
758-
// TODO: Implement not_starts_with
759-
self.build_always_true()
768+
if let Some(idx) = self.bound_reference(reference)? {
769+
let literal = get_arrow_datum(literal)?;
770+
771+
Ok(Box::new(move |batch| {
772+
let left = project_column(&batch, idx)?;
773+
774+
// update here if arrow ever adds a native not_starts_with
775+
not(&starts_with(&left, literal.as_ref())?)
776+
}))
777+
} else {
778+
// A missing column, treating it as null.
779+
self.build_always_true()
780+
}
760781
}
761782

762783
fn r#in(
763784
&mut self,
764-
_reference: &BoundReference,
765-
_literals: &FnvHashSet<Datum>,
785+
reference: &BoundReference,
786+
literals: &FnvHashSet<Datum>,
766787
_predicate: &BoundPredicate,
767788
) -> Result<Box<PredicateResult>> {
768-
// TODO: Implement in
769-
self.build_always_true()
789+
if let Some(idx) = self.bound_reference(reference)? {
790+
let literals: Vec<_> = literals
791+
.iter()
792+
.map(|lit| get_arrow_datum(lit).unwrap())
793+
.collect();
794+
795+
Ok(Box::new(move |batch| {
796+
// update this if arrow ever adds a native is_in kernel
797+
let left = project_column(&batch, idx)?;
798+
let mut acc = BooleanArray::from(vec![false; batch.num_rows()]);
799+
for literal in &literals {
800+
acc = or(&acc, &eq(&left, literal.as_ref())?)?
801+
}
802+
803+
Ok(acc)
804+
}))
805+
} else {
806+
// A missing column, treating it as null.
807+
self.build_always_false()
808+
}
770809
}
771810

772811
fn not_in(
773812
&mut self,
774-
_reference: &BoundReference,
775-
_literals: &FnvHashSet<Datum>,
813+
reference: &BoundReference,
814+
literals: &FnvHashSet<Datum>,
776815
_predicate: &BoundPredicate,
777816
) -> Result<Box<PredicateResult>> {
778-
// TODO: Implement not_in
779-
self.build_always_true()
817+
if let Some(idx) = self.bound_reference(reference)? {
818+
let literals: Vec<_> = literals
819+
.iter()
820+
.map(|lit| get_arrow_datum(lit).unwrap())
821+
.collect();
822+
823+
Ok(Box::new(move |batch| {
824+
// update this if arrow ever adds a native not_in kernel
825+
let left = project_column(&batch, idx)?;
826+
let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
827+
for literal in &literals {
828+
acc = and(&acc, &neq(&left, literal.as_ref())?)?
829+
}
830+
831+
Ok(acc)
832+
}))
833+
} else {
834+
// A missing column, treating it as null.
835+
self.build_always_true()
836+
}
780837
}
781838
}
782839

783840
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
784841
///
785842
/// # TODO
786843
///
787-
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct:
844+
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
845+
/// contains the following hints to speed up metadata loading, we can consider adding them to this struct:
788846
///
789847
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
790848
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].

crates/iceberg/src/arrow/schema.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::{Error, ErrorKind};
2626
use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type};
2727
use arrow_array::{
2828
BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array,
29+
StringArray,
2930
};
3031
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
3132
use bitvec::macros::internal::funty::Fundamental;
@@ -605,6 +606,7 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
605606
PrimitiveLiteral::Long(value) => Ok(Box::new(Int64Array::new_scalar(*value))),
606607
PrimitiveLiteral::Float(value) => Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
607608
PrimitiveLiteral::Double(value) => Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
609+
PrimitiveLiteral::String(value) => Ok(Box::new(StringArray::new_scalar(value.as_str()))),
608610
l => Err(Error::new(
609611
ErrorKind::FeatureUnsupported,
610612
format!(

crates/iceberg/src/scan.rs

Lines changed: 106 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ mod tests {
523523
};
524524
use crate::table::Table;
525525
use crate::TableIdent;
526-
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
526+
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
527527
use futures::TryStreamExt;
528528
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
529529
use parquet::basic::Compression;
@@ -705,10 +705,15 @@ mod tests {
705705
PARQUET_FIELD_ID_META_KEY.to_string(),
706706
"3".to_string(),
707707
)])),
708+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
709+
.with_metadata(HashMap::from([(
710+
PARQUET_FIELD_ID_META_KEY.to_string(),
711+
"4".to_string(),
712+
)])),
708713
];
709714
Arc::new(arrow_schema::Schema::new(fields))
710715
};
711-
// 3 columns:
716+
// 4 columns:
712717
// x: [1, 1, 1, 1, ...]
713718
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
714719

@@ -725,7 +730,14 @@ mod tests {
725730

726731
// z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
727732
let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
728-
let to_write = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap();
733+
734+
// a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
735+
let mut values = vec!["Apache"; 512];
736+
values.append(vec!["Iceberg"; 512].as_mut());
737+
let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
738+
739+
let to_write =
740+
RecordBatch::try_new(schema.clone(), vec![col1, col2, col3, col4]).unwrap();
729741

730742
// Write the Parquet files
731743
let props = WriterProperties::builder()
@@ -773,7 +785,7 @@ mod tests {
773785
fn test_select_no_exist_column() {
774786
let table = TableTestFixture::new().table;
775787

776-
let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
788+
let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
777789
assert!(table_scan.is_err());
778790
}
779791

@@ -1040,4 +1052,94 @@ mod tests {
10401052
let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
10411053
assert_eq!(col, &expected_z);
10421054
}
1055+
1056+
#[tokio::test]
1057+
async fn test_filter_on_arrow_startswith() {
1058+
let mut fixture = TableTestFixture::new();
1059+
fixture.setup_manifest_files().await;
1060+
1061+
// Filter: a STARTSWITH "Ice"
1062+
let mut builder = fixture.table.scan();
1063+
let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
1064+
builder = builder.filter(predicate);
1065+
let table_scan = builder.build().unwrap();
1066+
1067+
let batch_stream = table_scan.to_arrow().await.unwrap();
1068+
1069+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1070+
1071+
assert_eq!(batches[0].num_rows(), 512);
1072+
1073+
let col = batches[0].column_by_name("a").unwrap();
1074+
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1075+
assert_eq!(string_arr.value(0), "Iceberg");
1076+
}
1077+
1078+
#[tokio::test]
1079+
async fn test_filter_on_arrow_not_startswith() {
1080+
let mut fixture = TableTestFixture::new();
1081+
fixture.setup_manifest_files().await;
1082+
1083+
// Filter: a NOT STARTSWITH "Ice"
1084+
let mut builder = fixture.table.scan();
1085+
let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
1086+
builder = builder.filter(predicate);
1087+
let table_scan = builder.build().unwrap();
1088+
1089+
let batch_stream = table_scan.to_arrow().await.unwrap();
1090+
1091+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1092+
1093+
assert_eq!(batches[0].num_rows(), 512);
1094+
1095+
let col = batches[0].column_by_name("a").unwrap();
1096+
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1097+
assert_eq!(string_arr.value(0), "Apache");
1098+
}
1099+
1100+
#[tokio::test]
1101+
async fn test_filter_on_arrow_in() {
1102+
let mut fixture = TableTestFixture::new();
1103+
fixture.setup_manifest_files().await;
1104+
1105+
// Filter: a IN ("Sioux", "Iceberg")
1106+
let mut builder = fixture.table.scan();
1107+
let predicate =
1108+
Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1109+
builder = builder.filter(predicate);
1110+
let table_scan = builder.build().unwrap();
1111+
1112+
let batch_stream = table_scan.to_arrow().await.unwrap();
1113+
1114+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1115+
1116+
assert_eq!(batches[0].num_rows(), 512);
1117+
1118+
let col = batches[0].column_by_name("a").unwrap();
1119+
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1120+
assert_eq!(string_arr.value(0), "Iceberg");
1121+
}
1122+
1123+
#[tokio::test]
1124+
async fn test_filter_on_arrow_not_in() {
1125+
let mut fixture = TableTestFixture::new();
1126+
fixture.setup_manifest_files().await;
1127+
1128+
// Filter: a NOT IN ("Sioux", "Iceberg")
1129+
let mut builder = fixture.table.scan();
1130+
let predicate =
1131+
Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
1132+
builder = builder.filter(predicate);
1133+
let table_scan = builder.build().unwrap();
1134+
1135+
let batch_stream = table_scan.to_arrow().await.unwrap();
1136+
1137+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1138+
1139+
assert_eq!(batches[0].num_rows(), 512);
1140+
1141+
let col = batches[0].column_by_name("a").unwrap();
1142+
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
1143+
assert_eq!(string_arr.value(0), "Apache");
1144+
}
10431145
}

crates/iceberg/testdata/example_table_metadata_v2.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"fields": [
1616
{"id": 1, "name": "x", "required": true, "type": "long"},
1717
{"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"},
18-
{"id": 3, "name": "z", "required": true, "type": "long"}
18+
{"id": 3, "name": "z", "required": true, "type": "long"},
19+
{"id": 4, "name": "a", "required": true, "type": "string"}
1920
]
2021
}
2122
],

0 commit comments

Comments
 (0)