Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use super::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
Expand All @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -254,13 +254,16 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| {
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
Ok(batch?)
} else {
match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});
}
}
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
Expand Down Expand Up @@ -1408,6 +1411,8 @@ message schema {
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
sequence_number: 0,
equality_ids: vec![],
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down
19 changes: 10 additions & 9 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::channel::mpsc::{channel, Sender};
use futures::StreamExt;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::scan::{DeleteFileContext, FileScanTask};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -110,10 +110,11 @@ impl PopulatedDeleteFileIndex {
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos del
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
}
// FIXME(Dylan): allow putting position delete to global deletes.
// if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
// }
}

let destination_map = match arc_ctx.manifest_entry.content_type() {
Expand Down Expand Up @@ -142,7 +143,7 @@ impl PopulatedDeleteFileIndex {
&self,
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
) -> Vec<FileScanTask> {
let mut results = vec![];

self.global_deletes
Expand All @@ -161,7 +162,7 @@ impl PopulatedDeleteFileIndex {
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
Expand All @@ -177,7 +178,7 @@ impl PopulatedDeleteFileIndex {
// filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num`
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
Expand All @@ -195,7 +196,7 @@ pub(crate) struct DeletesForDataFile<'a> {
}

impl Future for DeletesForDataFile<'_> {
type Output = Result<Vec<FileScanTaskDeleteFile>>;
type Output = Result<Vec<FileScanTask>>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state.try_read() {
Expand Down
37 changes: 36 additions & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ impl TableScan {
.send(DeleteFileContext {
manifest_entry: manifest_entry_context.manifest_entry.clone(),
partition_spec_id: manifest_entry_context.partition_spec_id,
snapshot_schema: manifest_entry_context.snapshot_schema.clone(),
field_ids: manifest_entry_context.field_ids.clone(),
})
.await?;

Expand Down Expand Up @@ -695,6 +697,8 @@ impl ManifestEntryContext {
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),

deletes,
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
})
}
}
Expand Down Expand Up @@ -1056,7 +1060,11 @@ pub struct FileScanTask {
pub predicate: Option<BoundPredicate>,

/// The list of delete files that may need to be applied to this data file
pub deletes: Vec<FileScanTaskDeleteFile>,
pub deletes: Vec<FileScanTask>,
/// sequence number
pub sequence_number: i64,
/// equality ids
pub equality_ids: Vec<i32>,
}

/// A task to scan part of file.
Expand All @@ -1076,6 +1084,8 @@ pub struct FileScanTaskDeleteFile {
pub(crate) struct DeleteFileContext {
pub(crate) manifest_entry: ManifestEntryRef,
pub(crate) partition_spec_id: i32,
pub(crate) snapshot_schema: SchemaRef,
pub(crate) field_ids: Arc<Vec<i32>>,
}

impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
Expand All @@ -1088,6 +1098,27 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
}
}

impl From<&DeleteFileContext> for FileScanTask {
fn from(ctx: &DeleteFileContext) -> Self {
FileScanTask {
start: 0,
length: ctx.manifest_entry.file_size_in_bytes(),
record_count: Some(ctx.manifest_entry.record_count()),

data_file_path: ctx.manifest_entry.file_path().to_string(),
data_file_content: ctx.manifest_entry.content_type(),
data_file_format: ctx.manifest_entry.file_format(),

schema: ctx.snapshot_schema.clone(),
project_field_ids: ctx.field_ids.to_vec(),
predicate: None,
deletes: vec![],
sequence_number: ctx.manifest_entry.sequence_number().unwrap_or(0),
equality_ids: ctx.manifest_entry.data_file().equality_ids().to_vec(),
}
}
}

impl FileScanTask {
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
Expand Down Expand Up @@ -2283,6 +2314,8 @@ pub mod tests {
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
deletes: vec![],
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);

Expand All @@ -2298,6 +2331,8 @@ pub mod tests {
record_count: None,
data_file_format: DataFileFormat::Avro,
deletes: vec![],
sequence_number: 0,
equality_ids: vec![],
};
test_fn(task);
}
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/writer/function_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

//! This module contains the functional writer.

pub mod equality_delta_writer;
pub mod fanout_partition_writer;
pub mod precompute_partition_writer;
Loading