Skip to content

Commit f2bc57d

Browse files
committed
feat: add equality delete parsing
1 parent 44cd718 commit f2bc57d

File tree

1 file changed

+219
-29
lines changed

1 file changed

+219
-29
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 219 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
19+
use std::ops::Not;
1920

21+
use arrow_array::{
22+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
23+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
24+
};
2025
use futures::{StreamExt, TryStreamExt};
26+
use itertools::Itertools;
2127
use tokio::sync::oneshot::{Receiver, channel};
2228

2329
use super::delete_filter::DeleteFilter;
30+
use crate::arrow::arrow_schema_to_schema;
2431
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2532
use crate::delete_vector::DeleteVector;
26-
use crate::expr::Predicate;
33+
use crate::expr::Predicate::AlwaysTrue;
34+
use crate::expr::{Predicate, Reference};
2735
use crate::io::FileIO;
2836
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29-
use crate::spec::{DataContentType, SchemaRef};
37+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, SchemaRef};
3038
use crate::{Error, ErrorKind, Result};
3139

3240
#[derive(Clone, Debug)]
@@ -42,6 +50,7 @@ enum DeleteFileContext {
4250
PosDels(ArrowRecordBatchStream),
4351
FreshEqDel {
4452
batch_stream: ArrowRecordBatchStream,
53+
equality_ids: HashSet<i32>,
4554
sender: tokio::sync::oneshot::Sender<Predicate>,
4655
},
4756
}
@@ -223,6 +232,7 @@ impl CachingDeleteFileLoader {
223232
)
224233
.await?,
225234
sender,
235+
equality_ids: HashSet::from_iter(task.equality_ids.clone()),
226236
})
227237
}
228238

@@ -246,9 +256,11 @@ impl CachingDeleteFileLoader {
246256
DeleteFileContext::FreshEqDel {
247257
sender,
248258
batch_stream,
259+
equality_ids,
249260
} => {
250261
let predicate =
251-
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
262+
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
263+
.await?;
252264

253265
sender
254266
.send(predicate)
@@ -277,48 +289,226 @@ impl CachingDeleteFileLoader {
277289
))
278290
}
279291

280-
/// Parses record batch streams from individual equality delete files
281-
///
282-
/// Returns an unbound Predicate for each batch stream
283292
async fn parse_equality_deletes_record_batch_stream(
284-
streams: ArrowRecordBatchStream,
293+
mut stream: ArrowRecordBatchStream,
294+
equality_ids: HashSet<i32>,
285295
) -> Result<Predicate> {
286-
// TODO
296+
let mut result_predicate = AlwaysTrue;
287297

288-
Err(Error::new(
289-
ErrorKind::FeatureUnsupported,
290-
"parsing of equality deletes is not yet supported",
291-
))
298+
while let Some(record_batch) = stream.next().await {
299+
let record_batch = record_batch?;
300+
301+
if record_batch.num_columns() == 0 {
302+
return Ok(AlwaysTrue);
303+
}
304+
305+
let batch_schema_arrow = record_batch.schema();
306+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
307+
308+
let mut datum_columns_with_names: Vec<_> = record_batch
309+
.columns()
310+
.iter()
311+
.zip(batch_schema_iceberg.as_struct().fields())
312+
// only use columns that are in the set of equality_ids for this delete file
313+
.filter(|(field, value)| equality_ids.contains(&value.id))
314+
.map(|(column, field)| {
315+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
316+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
317+
})
318+
.try_collect()?;
319+
320+
// consume all the iterators in lockstep, creating per-row predicates that get combined
321+
// into a single final predicate
322+
#[allow(clippy::len_zero)]
323+
// (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty`
324+
while datum_columns_with_names[0].0.len() > 0 {
325+
let mut row_predicate = AlwaysTrue;
326+
for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names {
327+
if let Some(item) = column.next() {
328+
if let Some(datum) = item? {
329+
row_predicate = row_predicate
330+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
331+
}
332+
}
333+
}
334+
result_predicate = result_predicate.and(row_predicate.not());
335+
}
336+
}
337+
Ok(result_predicate.rewrite_not())
338+
}
339+
}
340+
341+
macro_rules! prim_to_datum {
342+
($column:ident, $arr:ty, $dat:path) => {{
343+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
344+
ErrorKind::Unexpected,
345+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
346+
))?;
347+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
348+
}};
349+
}
350+
351+
fn eq_col_unsupported(ty: &str) -> Error {
352+
Error::new(
353+
ErrorKind::FeatureUnsupported,
354+
format!(
355+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
356+
ty
357+
),
358+
)
359+
}
360+
361+
fn arrow_array_to_datum_iterator<'a>(
362+
column: &'a ArrayRef,
363+
field: &NestedFieldRef,
364+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
365+
match field.field_type.as_primitive_type() {
366+
Some(primitive_type) => match primitive_type {
367+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
368+
PrimitiveType::Boolean => {
369+
prim_to_datum!(column, BooleanArray, Datum::bool)
370+
}
371+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
372+
PrimitiveType::Float => {
373+
prim_to_datum!(column, Float32Array, Datum::float)
374+
}
375+
PrimitiveType::Double => {
376+
prim_to_datum!(column, Float64Array, Datum::double)
377+
}
378+
PrimitiveType::String => {
379+
prim_to_datum!(column, StringArray, Datum::string)
380+
}
381+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
382+
PrimitiveType::Timestamp => {
383+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
384+
}
385+
PrimitiveType::Timestamptz => {
386+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
387+
}
388+
PrimitiveType::TimestampNs => {
389+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
390+
}
391+
PrimitiveType::TimestamptzNs => {
392+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
393+
}
394+
PrimitiveType::Time => {
395+
let arr = column
396+
.as_any()
397+
.downcast_ref::<Time64MicrosecondArray>()
398+
.ok_or(Error::new(
399+
ErrorKind::Unexpected,
400+
"could not downcast ArrayRef to Time64MicrosecondArray",
401+
))?;
402+
Ok(Box::new(arr.iter().map(|val| match val {
403+
None => Ok(None),
404+
Some(val) => Datum::time_micros(val).map(Some),
405+
})))
406+
}
407+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
408+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
409+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
410+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
411+
},
412+
None => Err(eq_col_unsupported(
413+
"non-primitive (i.e. Struct, List, or Map)",
414+
)),
292415
}
293416
}
294417

295418
#[cfg(test)]
296419
mod tests {
420+
use std::collections::HashMap;
421+
use std::fs::File;
422+
use std::sync::Arc;
423+
424+
use arrow_array::{Int64Array, RecordBatch, StringArray};
425+
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
426+
use parquet::basic::Compression;
427+
use parquet::file::properties::WriterProperties;
297428
use tempfile::TempDir;
298429

299430
use super::*;
300-
use crate::arrow::delete_file_loader::tests::setup;
301431

302432
#[tokio::test]
303-
async fn test_delete_file_manager_load_deletes() {
433+
async fn test_delete_file_loader_parse_equality_deletes() {
304434
let tmp_dir = TempDir::new().unwrap();
305-
let table_location = tmp_dir.path();
306-
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
307-
.unwrap()
308-
.build()
309-
.unwrap();
435+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
436+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
310437

311-
// Note that with the delete file parsing not yet in place, all we can test here is that
312-
// the call to the loader fails with the expected FeatureUnsupportedError.
313-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
438+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
314439

315-
let file_scan_tasks = setup(table_location);
316-
317-
let result = delete_file_manager
318-
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
440+
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
441+
let record_batch_stream = basic_delete_file_loader
442+
.parquet_to_batch_stream(&eq_delete_file_path)
319443
.await
320-
.unwrap();
444+
.expect("could not get batch stream");
445+
446+
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
447+
448+
let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
449+
record_batch_stream,
450+
eq_ids,
451+
)
452+
.await
453+
.expect("error parsing batch stream");
454+
println!("{}", parsed_eq_delete);
455+
456+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
457+
458+
assert_eq!(parsed_eq_delete.to_string(), expected);
459+
}
321460

322-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
461+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
462+
let col_y_vals = vec![1, 2];
463+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
464+
465+
let col_z_vals = vec![Some(100), None];
466+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
467+
468+
let col_a_vals = vec![Some("HELP"), None];
469+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
470+
471+
let equality_delete_schema = {
472+
let fields = vec![
473+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
474+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
475+
),
476+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
477+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
478+
),
479+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
480+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
481+
),
482+
];
483+
Arc::new(arrow_schema::Schema::new(fields))
484+
};
485+
486+
let equality_deletes_to_write =
487+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
488+
.unwrap();
489+
490+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
491+
492+
let file = File::create(&path).unwrap();
493+
494+
let props = WriterProperties::builder()
495+
.set_compression(Compression::SNAPPY)
496+
.build();
497+
498+
let mut writer = ArrowWriter::try_new(
499+
file,
500+
equality_deletes_to_write.schema(),
501+
Some(props.clone()),
502+
)
503+
.unwrap();
504+
505+
writer
506+
.write(&equality_deletes_to_write)
507+
.expect("Writing batch");
508+
509+
// writer must be closed to write footer
510+
writer.close().unwrap();
511+
512+
path
323513
}
324514
}

0 commit comments

Comments
 (0)