Skip to content

Commit ae1cb47

Browse files
committed
refactor: remove arrow_array_to_datum_iterator and add arrow_primitive_to_literal, which uses existing visitor
1 parent 282a01a commit ae1cb47

File tree

2 files changed

+44
-87
lines changed

2 files changed

+44
-87
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 28 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,20 @@
1818
use std::collections::{HashMap, HashSet};
1919
use std::ops::Not;
2020

21-
use arrow_array::{
22-
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
23-
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
24-
};
21+
use arrow_array::{Array, Int64Array, StringArray};
2522
use futures::{StreamExt, TryStreamExt};
2623
use itertools::Itertools;
2724
use tokio::sync::oneshot::{Receiver, channel};
2825

2926
use super::delete_filter::DeleteFilter;
30-
use crate::arrow::arrow_schema_to_schema;
3127
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
28+
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
3229
use crate::delete_vector::DeleteVector;
3330
use crate::expr::Predicate::AlwaysTrue;
3431
use crate::expr::{Predicate, Reference};
3532
use crate::io::FileIO;
3633
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
37-
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, SchemaRef};
34+
use crate::spec::{DataContentType, Datum, SchemaRef};
3835
use crate::{Error, ErrorKind, Result};
3936

4037
#[derive(Clone, Debug)]
@@ -342,8 +339,30 @@ impl CachingDeleteFileLoader {
342339
// only use columns that are in the set of equality_ids for this delete file
343340
.filter(|(field, value)| equality_ids.contains(&value.id))
344341
.map(|(column, field)| {
345-
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
346-
col_as_datum_vec.map(|c| (c, field.name.to_string()))
342+
let lit_vec = arrow_primitive_to_literal(column, &field.field_type)?;
343+
344+
let primitive_type = field.field_type.as_primitive_type().ok_or(Error::new(
345+
ErrorKind::Unexpected,
346+
"field is not a primitive type",
347+
))?;
348+
349+
let datum_iterator: Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>> =
350+
Box::new(lit_vec.into_iter().map(move |c| {
351+
c.map(|literal| {
352+
literal
353+
.as_primitive_literal()
354+
.map(|primitive_literal| {
355+
Datum::new(primitive_type.clone(), primitive_literal)
356+
})
357+
.ok_or(Error::new(
358+
ErrorKind::Unexpected,
359+
"failed to convert to primitive literal",
360+
))
361+
})
362+
.transpose()
363+
}));
364+
365+
Ok::<_, Error>((datum_iterator, field.name.to_string()))
347366
})
348367
.try_collect()?;
349368

@@ -371,90 +390,13 @@ impl CachingDeleteFileLoader {
371390
}
372391
}
373392

374-
macro_rules! prim_to_datum {
375-
($column:ident, $arr:ty, $dat:path) => {{
376-
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
377-
ErrorKind::Unexpected,
378-
format!("could not downcast ArrayRef to {}", stringify!($arr)),
379-
))?;
380-
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
381-
}};
382-
}
383-
384-
fn eq_col_unsupported(ty: &str) -> Error {
385-
Error::new(
386-
ErrorKind::FeatureUnsupported,
387-
format!(
388-
"Equality deletes where a predicate acts upon a {} column are not yet supported",
389-
ty
390-
),
391-
)
392-
}
393-
394-
fn arrow_array_to_datum_iterator<'a>(
395-
column: &'a ArrayRef,
396-
field: &NestedFieldRef,
397-
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
398-
match field.field_type.as_primitive_type() {
399-
Some(primitive_type) => match primitive_type {
400-
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
401-
PrimitiveType::Boolean => {
402-
prim_to_datum!(column, BooleanArray, Datum::bool)
403-
}
404-
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
405-
PrimitiveType::Float => {
406-
prim_to_datum!(column, Float32Array, Datum::float)
407-
}
408-
PrimitiveType::Double => {
409-
prim_to_datum!(column, Float64Array, Datum::double)
410-
}
411-
PrimitiveType::String => {
412-
prim_to_datum!(column, StringArray, Datum::string)
413-
}
414-
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
415-
PrimitiveType::Timestamp => {
416-
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
417-
}
418-
PrimitiveType::Timestamptz => {
419-
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
420-
}
421-
PrimitiveType::TimestampNs => {
422-
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
423-
}
424-
PrimitiveType::TimestamptzNs => {
425-
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
426-
}
427-
PrimitiveType::Time => {
428-
let arr = column
429-
.as_any()
430-
.downcast_ref::<Time64MicrosecondArray>()
431-
.ok_or(Error::new(
432-
ErrorKind::Unexpected,
433-
"could not downcast ArrayRef to Time64MicrosecondArray",
434-
))?;
435-
Ok(Box::new(arr.iter().map(|val| match val {
436-
None => Ok(None),
437-
Some(val) => Datum::time_micros(val).map(Some),
438-
})))
439-
}
440-
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
441-
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
442-
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
443-
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
444-
},
445-
None => Err(eq_col_unsupported(
446-
"non-primitive (i.e. Struct, List, or Map)",
447-
)),
448-
}
449-
}
450-
451393
#[cfg(test)]
452394
mod tests {
453395
use std::collections::HashMap;
454396
use std::fs::File;
455397
use std::sync::Arc;
456398

457-
use arrow_array::{Int64Array, RecordBatch, StringArray};
399+
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
458400
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
459401
use parquet::basic::Compression;
460402
use parquet::file::properties::WriterProperties;

crates/iceberg/src/arrow/value.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use uuid::Uuid;
2727
use super::get_field_id;
2828
use crate::spec::{
2929
ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType,
30-
SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner,
30+
SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
31+
visit_type_with_partner,
3132
};
3233
use crate::{Error, ErrorKind, Result};
3334

@@ -553,6 +554,20 @@ pub fn arrow_struct_to_literal(
553554
)
554555
}
555556

557+
/// Convert arrow primitive array to iceberg primitive value array.
558+
/// This function will assume the schema of arrow struct array is the same as iceberg struct type.
559+
pub fn arrow_primitive_to_literal(
560+
primitive_array: &ArrayRef,
561+
ty: &Type,
562+
) -> Result<Vec<Option<Literal>>> {
563+
visit_type_with_partner(
564+
ty,
565+
primitive_array,
566+
&mut ArrowArrayToIcebergStructConverter,
567+
&ArrowArrayAccessor,
568+
)
569+
}
570+
556571
#[cfg(test)]
557572
mod test {
558573
use std::collections::HashMap;

0 commit comments

Comments
 (0)