Skip to content

Commit 444d86a

Browse files
chenzl25Li0k
authored andcommitted
fix: cherry-pick #27 (#41)
1 parent 4ec0dbf commit 444d86a

File tree

11 files changed

+157
-78
lines changed

11 files changed

+157
-78
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ use std::collections::HashMap;
1919

2020
use arrow_array::{Int64Array, StringArray};
2121
use futures::{StreamExt, TryStreamExt};
22-
use tokio::sync::oneshot::{Receiver, channel};
22+
use tokio::sync::oneshot::{channel, Receiver};
2323

2424
use super::delete_filter::DeleteFilter;
2525
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2626
use crate::delete_vector::DeleteVector;
2727
use crate::expr::Predicate;
2828
use crate::io::FileIO;
29-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29+
use crate::scan::{ArrowRecordBatchStream, FileScanTask};
3030
use crate::spec::{DataContentType, SchemaRef};
3131
use crate::{Error, ErrorKind, Result};
3232

@@ -129,7 +129,7 @@ impl CachingDeleteFileLoader {
129129
/// ```
130130
pub(crate) fn load_deletes(
131131
&self,
132-
delete_file_entries: &[FileScanTaskDeleteFile],
132+
delete_file_entries: &[FileScanTask],
133133
schema: SchemaRef,
134134
) -> Receiver<Result<DeleteFilter>> {
135135
let (tx, rx) = channel();
@@ -195,30 +195,30 @@ impl CachingDeleteFileLoader {
195195
}
196196

197197
async fn load_file_for_task(
198-
task: &FileScanTaskDeleteFile,
198+
task: &FileScanTask,
199199
basic_delete_file_loader: BasicDeleteFileLoader,
200200
del_filter: DeleteFilter,
201201
schema: SchemaRef,
202202
) -> Result<DeleteFileContext> {
203-
match task.file_type {
203+
match task.data_file_content {
204204
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
205205
basic_delete_file_loader
206-
.parquet_to_batch_stream(&task.file_path)
206+
.parquet_to_batch_stream(task.data_file_path())
207207
.await?,
208208
)),
209209

210210
DataContentType::EqualityDeletes => {
211-
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
211+
let Some(notify) = del_filter.try_start_eq_del_load(task.data_file_path()) else {
212212
return Ok(DeleteFileContext::ExistingEqDel);
213213
};
214214

215215
let (sender, receiver) = channel();
216-
del_filter.insert_equality_delete(&task.file_path, receiver);
216+
del_filter.insert_equality_delete(task.data_file_path(), receiver);
217217

218218
Ok(DeleteFileContext::FreshEqDel {
219219
batch_stream: BasicDeleteFileLoader::evolve_schema(
220220
basic_delete_file_loader
221-
.parquet_to_batch_stream(&task.file_path)
221+
.parquet_to_batch_stream(task.data_file_path())
222222
.await?,
223223
schema,
224224
)

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ mod tests {
115115

116116
use super::*;
117117
use crate::arrow::delete_filter::tests::setup;
118+
use crate::spec::DataContentType;
118119

119120
#[tokio::test]
120121
async fn test_basic_delete_file_loader_read_delete_file() {
@@ -129,11 +130,15 @@ mod tests {
129130

130131
let file_scan_tasks = setup(table_location);
131132

133+
let delete_task = FileScanTaskDeleteFile {
134+
file_path: file_scan_tasks[0].deletes[0].data_file_path.clone(),
135+
file_type: DataContentType::PositionDeletes,
136+
partition_spec_id: 0,
137+
equality_ids: vec![],
138+
};
139+
132140
let result = delete_file_loader
133-
.read_delete_file(
134-
&file_scan_tasks[0].deletes[0],
135-
file_scan_tasks[0].schema_ref(),
136-
)
141+
.read_delete_file(&delete_task, file_scan_tasks[0].schema_ref())
137142
.await
138143
.unwrap();
139144

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
use std::collections::HashMap;
1919
use std::sync::{Arc, Mutex, RwLock};
2020

21-
use tokio::sync::Notify;
2221
use tokio::sync::oneshot::Receiver;
22+
use tokio::sync::Notify;
2323

2424
use crate::delete_vector::DeleteVector;
2525
use crate::expr::Predicate::AlwaysTrue;
2626
use crate::expr::{Bind, BoundPredicate, Predicate};
27-
use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
27+
use crate::scan::FileScanTask;
2828
use crate::spec::DataContentType;
2929
use crate::{Error, ErrorKind, Result};
3030

@@ -120,14 +120,14 @@ impl DeleteFilter {
120120
}
121121

122122
let Some(predicate) = self
123-
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
123+
.get_equality_delete_predicate_for_delete_file_path(delete.data_file_path())
124124
.await
125125
else {
126126
return Err(Error::new(
127127
ErrorKind::Unexpected,
128128
format!(
129129
"Missing predicate for equality delete file '{}'",
130-
delete.file_path
130+
delete.data_file_path()
131131
),
132132
));
133133
};
@@ -190,8 +190,8 @@ impl DeleteFilter {
190190
}
191191
}
192192

193-
pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
194-
matches!(f.file_type, DataContentType::EqualityDeletes)
193+
pub(crate) fn is_equality_delete(f: &FileScanTask) -> bool {
194+
matches!(f.data_file_content, DataContentType::EqualityDeletes)
195195
}
196196

197197
#[cfg(test)]
@@ -307,24 +307,19 @@ pub(crate) mod tests {
307307
writer.close().unwrap();
308308
}
309309

310-
let pos_del_1 = FileScanTaskDeleteFile {
311-
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
312-
file_type: DataContentType::PositionDeletes,
313-
partition_spec_id: 0,
314-
equality_ids: vec![],
315-
};
316-
317-
let pos_del_2 = FileScanTaskDeleteFile {
318-
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
319-
file_type: DataContentType::PositionDeletes,
320-
partition_spec_id: 0,
321-
equality_ids: vec![],
322-
};
323-
324-
let pos_del_3 = FileScanTaskDeleteFile {
325-
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
326-
file_type: DataContentType::PositionDeletes,
327-
partition_spec_id: 0,
310+
// Helper to build a positional delete task with minimal fields
311+
let make_pos_del_task = |n: u8| FileScanTask {
312+
start: 0,
313+
length: 0,
314+
record_count: None,
315+
data_file_path: format!("{}/pos-del-{}.parquet", table_location.to_str().unwrap(), n),
316+
data_file_content: DataContentType::PositionDeletes,
317+
data_file_format: DataFileFormat::Parquet,
318+
schema: data_file_schema.clone(),
319+
project_field_ids: vec![],
320+
predicate: None,
321+
deletes: vec![],
322+
sequence_number: 0,
328323
equality_ids: vec![],
329324
};
330325

@@ -334,22 +329,28 @@ pub(crate) mod tests {
334329
length: 0,
335330
record_count: None,
336331
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
332+
data_file_content: DataContentType::Data,
337333
data_file_format: DataFileFormat::Parquet,
338334
schema: data_file_schema.clone(),
339335
project_field_ids: vec![],
340336
predicate: None,
341-
deletes: vec![pos_del_1, pos_del_2.clone()],
337+
deletes: vec![make_pos_del_task(1), make_pos_del_task(2)],
338+
sequence_number: 0,
339+
equality_ids: vec![],
342340
},
343341
FileScanTask {
344342
start: 0,
345343
length: 0,
346344
record_count: None,
347345
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
346+
data_file_content: DataContentType::Data,
348347
data_file_format: DataFileFormat::Parquet,
349348
schema: data_file_schema.clone(),
350349
project_field_ids: vec![],
351350
predicate: None,
352-
deletes: vec![pos_del_3],
351+
deletes: vec![make_pos_del_task(3)],
352+
sequence_number: 0,
353+
equality_ids: vec![],
353354
},
354355
];
355356

crates/iceberg/src/arrow/reader.rs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ use arrow_string::like::starts_with;
3333
use bytes::Bytes;
3434
use fnv::FnvHashSet;
3535
use futures::future::BoxFuture;
36-
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
36+
use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3737
use parquet::arrow::arrow_reader::{
3838
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
3939
};
4040
use parquet::arrow::async_reader::AsyncFileReader;
41-
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
41+
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
4242
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
4343
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4444

@@ -47,13 +47,13 @@ use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4747
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4848
use crate::delete_vector::DeleteVector;
4949
use crate::error::Result;
50-
use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
50+
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
5151
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
5252
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
5353
use crate::expr::{BoundPredicate, BoundReference};
5454
use crate::io::{FileIO, FileMetadata, FileRead};
5555
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
56-
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
56+
use crate::spec::{DataContentType, Datum, NestedField, PrimitiveType, Schema, Type};
5757
use crate::utils::available_parallelism;
5858
use crate::{Error, ErrorKind};
5959

@@ -312,13 +312,16 @@ impl ArrowReader {
312312

313313
// Build the batch stream and send all the RecordBatches that it generates
314314
// to the requester.
315-
let record_batch_stream =
316-
record_batch_stream_builder
317-
.build()?
318-
.map(move |batch| match batch {
315+
let record_batch_stream = record_batch_stream_builder.build()?.map(move |batch| {
316+
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
317+
Ok(batch?)
318+
} else {
319+
match batch {
319320
Ok(batch) => record_batch_transformer.process_record_batch(batch),
320321
Err(err) => Err(err.into()),
321-
});
322+
}
323+
}
324+
});
322325

323326
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
324327
}
@@ -1443,15 +1446,15 @@ mod tests {
14431446
use roaring::RoaringTreemap;
14441447
use tempfile::TempDir;
14451448

1446-
use crate::ErrorKind;
14471449
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
14481450
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
14491451
use crate::delete_vector::DeleteVector;
14501452
use crate::expr::visitors::bound_predicate_visitor::visit;
14511453
use crate::expr::{Bind, Predicate, Reference};
14521454
use crate::io::FileIO;
14531455
use crate::scan::{FileScanTask, FileScanTaskStream};
1454-
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
1456+
use crate::spec::{DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type};
1457+
use crate::ErrorKind;
14551458

14561459
fn table_schema_simple() -> SchemaRef {
14571460
Arc::new(
@@ -1740,11 +1743,14 @@ message schema {
17401743
length: 0,
17411744
record_count: None,
17421745
data_file_path: format!("{}/1.parquet", table_location),
1746+
data_file_content: DataContentType::Data,
17431747
data_file_format: DataFileFormat::Parquet,
17441748
schema: schema.clone(),
17451749
project_field_ids: vec![1],
17461750
predicate: Some(predicate.bind(schema, true).unwrap()),
17471751
deletes: vec![],
1752+
sequence_number: 0,
1753+
equality_ids: vec![],
17481754
})]
17491755
.into_iter(),
17501756
)) as FileScanTaskStream;
@@ -1774,19 +1780,25 @@ message schema {
17741780
let schema = Arc::new(
17751781
Schema::builder()
17761782
.with_schema_id(1)
1777-
.with_fields(vec![
1778-
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)).into(),
1779-
])
1783+
.with_fields(vec![NestedField::optional(
1784+
1,
1785+
"a",
1786+
Type::Primitive(PrimitiveType::String),
1787+
)
1788+
.into()])
17801789
.build()
17811790
.unwrap(),
17821791
);
17831792

1784-
let arrow_schema = Arc::new(ArrowSchema::new(vec![
1785-
Field::new("a", col_a_type.clone(), true).with_metadata(HashMap::from([(
1786-
PARQUET_FIELD_ID_META_KEY.to_string(),
1787-
"1".to_string(),
1788-
)])),
1789-
]));
1793+
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
1794+
"a",
1795+
col_a_type.clone(),
1796+
true,
1797+
)
1798+
.with_metadata(HashMap::from([(
1799+
PARQUET_FIELD_ID_META_KEY.to_string(),
1800+
"1".to_string(),
1801+
)]))]));
17901802

17911803
let tmp_dir = TempDir::new().unwrap();
17921804
let table_location = tmp_dir.path().to_str().unwrap().to_string();

crates/iceberg/src/delete_file_index.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ use std::collections::HashMap;
1919
use std::ops::Deref;
2020
use std::sync::{Arc, RwLock};
2121

22+
use futures::channel::mpsc::{channel, Sender};
2223
use futures::StreamExt;
23-
use futures::channel::mpsc::{Sender, channel};
2424
use tokio::sync::Notify;
2525

2626
use crate::runtime::spawn;
27-
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
27+
use crate::scan::{DeleteFileContext, FileScanTask};
2828
use crate::spec::{DataContentType, DataFile, Struct};
2929

3030
/// Index of delete files
@@ -85,7 +85,7 @@ impl DeleteFileIndex {
8585
&self,
8686
data_file: &DataFile,
8787
seq_num: Option<i64>,
88-
) -> Vec<FileScanTaskDeleteFile> {
88+
) -> Vec<FileScanTask> {
8989
let notifier = {
9090
let guard = self.state.read().unwrap();
9191
match *guard {
@@ -132,10 +132,11 @@ impl PopulatedDeleteFileIndex {
132132
// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
133133
if partition.fields().is_empty() {
134134
// TODO: confirm we're good to skip here if we encounter a pos del
135-
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
136-
global_deletes.push(arc_ctx);
137-
return;
138-
}
135+
// FIXME(Dylan): allow putting position delete to global deletes.
136+
// if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
137+
global_deletes.push(arc_ctx);
138+
return;
139+
// }
139140
}
140141

141142
let destination_map = match arc_ctx.manifest_entry.content_type() {
@@ -164,7 +165,7 @@ impl PopulatedDeleteFileIndex {
164165
&self,
165166
data_file: &DataFile,
166167
seq_num: Option<i64>,
167-
) -> Vec<FileScanTaskDeleteFile> {
168+
) -> Vec<FileScanTask> {
168169
let mut results = vec![];
169170

170171
self.global_deletes

crates/iceberg/src/scan/context.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ impl ManifestEntryContext {
127127
record_count: Some(self.manifest_entry.record_count()),
128128

129129
data_file_path: self.manifest_entry.file_path().to_string(),
130+
data_file_content: self.manifest_entry.data_file().content_type(),
130131
data_file_format: self.manifest_entry.file_format(),
131132

132133
schema: self.snapshot_schema,
@@ -136,6 +137,8 @@ impl ManifestEntryContext {
136137
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
137138

138139
deletes,
140+
sequence_number: self.manifest_entry.sequence_number().unwrap_or(0),
141+
equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(),
139142
})
140143
}
141144
}

0 commit comments

Comments
 (0)