diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4aea3218c2..079673be89 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -39,7 +39,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use super::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -254,13 +254,16 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. - let record_batch_stream = - record_batch_stream_builder - .build()? - .map(move |batch| match batch { + let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| { + if matches!(task.data_file_content, DataContentType::PositionDeletes) { + Ok(batch?) + } else { + match batch { Ok(batch) => record_batch_transformer.process_record_batch(batch), Err(err) => Err(err.into()), - }); + } + } + }); Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } @@ -1408,6 +1411,8 @@ message schema { project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + sequence_number: 0, + equality_ids: vec![], })] .into_iter(), )) as FileScanTaskStream; diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 8a534d1fd3..a1df83e783 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -26,7 +26,7 @@ use futures::channel::mpsc::{channel, Sender}; use futures::StreamExt; use crate::runtime::spawn; -use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; +use crate::scan::{DeleteFileContext, FileScanTask}; use crate::spec::{DataContentType, DataFile, Struct}; use crate::{Error, ErrorKind, Result}; @@ -110,10 +110,11 @@ impl PopulatedDeleteFileIndex { // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". if partition.fields().is_empty() { // TODO: confirm we're good to skip here if we encounter a pos del - if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { - global_deletes.push(arc_ctx); - return; - } + // FIXME(Dylan): allow putting position delete to global deletes. + // if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { + global_deletes.push(arc_ctx); + return; + // } } let destination_map = match arc_ctx.manifest_entry.content_type() { @@ -142,7 +143,7 @@ impl PopulatedDeleteFileIndex { &self, data_file: &DataFile, seq_num: Option, - ) -> Vec { + ) -> Vec { let mut results = vec![]; self.global_deletes @@ -161,7 +162,7 @@ impl PopulatedDeleteFileIndex { // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` .filter(|&delete| { seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) .unwrap_or_else(|| true) }) .for_each(|delete| results.push(delete.as_ref().into())); @@ -177,7 +178,7 @@ impl PopulatedDeleteFileIndex { // filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num` .filter(|&delete| { seq_num - .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) .unwrap_or_else(|| true) }) .for_each(|delete| results.push(delete.as_ref().into())); @@ -195,7 +196,7 @@ pub(crate) struct DeletesForDataFile<'a> { } impl Future for DeletesForDataFile<'_> { - type Output = Result>; + type Output = Result>; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { match self.state.try_read() { diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index bfa1266dd0..a1f53fcfc6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -584,6 +584,8 @@ impl TableScan { .send(DeleteFileContext { manifest_entry: manifest_entry_context.manifest_entry.clone(), partition_spec_id: manifest_entry_context.partition_spec_id, + snapshot_schema: manifest_entry_context.snapshot_schema.clone(), + field_ids: manifest_entry_context.field_ids.clone(), }) .await?; @@ -695,6 +697,8 @@ impl ManifestEntryContext { .map(|x| x.as_ref().snapshot_bound_predicate.clone()), deletes, + sequence_number: self.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(), }) } } @@ -1056,7 +1060,11 @@ pub struct FileScanTask { pub predicate: Option, /// The list of delete files that may need to be applied to this data file - pub deletes: Vec, + pub deletes: Vec, + /// sequence number + pub sequence_number: i64, + /// equality ids + pub equality_ids: Vec, } /// A task to scan part of file. @@ -1076,6 +1084,8 @@ pub struct FileScanTaskDeleteFile { pub(crate) struct DeleteFileContext { pub(crate) manifest_entry: ManifestEntryRef, pub(crate) partition_spec_id: i32, + pub(crate) snapshot_schema: SchemaRef, + pub(crate) field_ids: Arc>, } impl From<&DeleteFileContext> for FileScanTaskDeleteFile { @@ -1088,6 +1098,27 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile { } } +impl From<&DeleteFileContext> for FileScanTask { + fn from(ctx: &DeleteFileContext) -> Self { + FileScanTask { + start: 0, + length: ctx.manifest_entry.file_size_in_bytes(), + record_count: Some(ctx.manifest_entry.record_count()), + + data_file_path: ctx.manifest_entry.file_path().to_string(), + data_file_content: ctx.manifest_entry.content_type(), + data_file_format: ctx.manifest_entry.file_format(), + + schema: ctx.snapshot_schema.clone(), + project_field_ids: ctx.field_ids.to_vec(), + predicate: None, + deletes: vec![], + sequence_number: ctx.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: ctx.manifest_entry.data_file().equality_ids().to_vec(), + } + } +} + impl FileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str { @@ -2283,6 +2314,8 @@ pub mod tests { record_count: Some(100), data_file_format: DataFileFormat::Parquet, deletes: vec![], + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); @@ -2298,6 +2331,8 @@ pub mod tests { record_count: None, data_file_format: DataFileFormat::Avro, deletes: vec![], + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); } diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs index 4d608d6571..58e2f7671c 100644 --- a/crates/iceberg/src/writer/function_writer/mod.rs +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -17,5 +17,6 @@ //! This module contains the functional writer. +pub mod equality_delta_writer; pub mod fanout_partition_writer; pub mod precompute_partition_writer;