diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6fcd592973..166baa521d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -163,6 +163,14 @@ impl ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result { + // TODO: add support for delete files + if !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete files are not yet supported", + )); + } + // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(&task.data_file_path)?; @@ -751,10 +759,14 @@ impl PredicateConverter<'_> { let index = self .column_indices .iter() - .position(|&idx| idx == *column_idx).ok_or(Error::new(ErrorKind::DataInvalid, format!( - "Leave column `{}` in predicates cannot be found in the required column indices.", - reference.field().name - )))?; + .position(|&idx| idx == *column_idx) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Leave column `{}` in predicates cannot be found in the required column indices.", + reference.field().name + ), + ))?; Ok(Some(index)) } else { diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs new file mode 100644 index 0000000000..8a534d1fd3 --- /dev/null +++ b/crates/iceberg/src/delete_file_index.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::mpsc::{channel, Sender}; +use futures::StreamExt; + +use crate::runtime::spawn; +use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; +use crate::spec::{DataContentType, DataFile, Struct}; +use crate::{Error, ErrorKind, Result}; + +/// Index of delete files +#[derive(Clone, Debug)] +pub(crate) struct DeleteFileIndex { + state: Arc>, +} + +#[derive(Debug)] +enum DeleteFileIndexState { + Populating, + Populated(PopulatedDeleteFileIndex), +} + +#[derive(Debug)] +struct PopulatedDeleteFileIndex { + #[allow(dead_code)] + global_deletes: Vec>, + eq_deletes_by_partition: HashMap>>, + pos_deletes_by_partition: HashMap>>, + // TODO: do we need this? + // pos_deletes_by_path: HashMap>>, + + // TODO: Deletion Vector support +} + +impl DeleteFileIndex { + /// create a new `DeleteFileIndex` along with the sender that populates it with delete files + pub(crate) fn new() -> (DeleteFileIndex, Sender) { + // TODO: what should the channel limit be? + let (tx, rx) = channel(10); + let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating)); + let delete_file_stream = rx.boxed(); + + spawn({ + let state = state.clone(); + async move { + let delete_files = delete_file_stream.collect::>().await; + + let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); + + let mut guard = state.write().unwrap(); + *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + } + }); + + (DeleteFileIndex { state }, tx) + } + + /// Gets all the delete files that apply to the specified data file. + /// + /// Returns a future that resolves to a Result> + pub(crate) fn get_deletes_for_data_file<'a>( + &self, + data_file: &'a DataFile, + seq_num: Option, + ) -> DeletesForDataFile<'a> { + DeletesForDataFile { + state: self.state.clone(), + data_file, + seq_num, + } + } +} + +impl PopulatedDeleteFileIndex { + fn new(files: Vec) -> PopulatedDeleteFileIndex { + let mut eq_deletes_by_partition: HashMap>> = + HashMap::default(); + let mut pos_deletes_by_partition: HashMap>> = + HashMap::default(); + + let mut global_deletes: Vec> = vec![]; + + files.into_iter().for_each(|ctx| { + let arc_ctx = Arc::new(ctx); + + let partition = arc_ctx.manifest_entry.data_file().partition(); + + // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". + if partition.fields().is_empty() { + // TODO: confirm we're good to skip here if we encounter a pos del + if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { + global_deletes.push(arc_ctx); + return; + } + } + + let destination_map = match arc_ctx.manifest_entry.content_type() { + DataContentType::PositionDeletes => &mut pos_deletes_by_partition, + DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, + _ => unreachable!(), + }; + + destination_map + .entry(partition.clone()) + .and_modify(|entry| { + entry.push(arc_ctx.clone()); + }) + .or_insert(vec![arc_ctx.clone()]); + }); + + PopulatedDeleteFileIndex { + global_deletes, + eq_deletes_by_partition, + pos_deletes_by_partition, + } + } + + /// Determine all the delete files that apply to the provided `DataFile`. + fn get_deletes_for_data_file( + &self, + data_file: &DataFile, + seq_num: Option, + ) -> Vec { + let mut results = vec![]; + + self.global_deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + + if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + // TODO: the spec states that: + // "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null". + // we're not yet doing that here. The referenced data file's name will also be present in the positional + // delete file's file path column. + if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) { + deletes + .iter() + // filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num` + .filter(|&delete| { + seq_num + .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num)) + .unwrap_or_else(|| true) + }) + .for_each(|delete| results.push(delete.as_ref().into())); + } + + results + } +} + +/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method +pub(crate) struct DeletesForDataFile<'a> { + state: Arc>, + data_file: &'a DataFile, + seq_num: Option, +} + +impl Future for DeletesForDataFile<'_> { + type Output = Result>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + match self.state.try_read() { + Ok(guard) => match guard.deref() { + DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( + idx.get_deletes_for_data_file(self.data_file, self.seq_num) + )), + _ => Poll::Pending, + }, + Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), + } + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fe5a529998..d684be54c8 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -83,6 +83,7 @@ pub mod transform; mod runtime; pub mod arrow; +pub(crate) mod delete_file_index; mod utils; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7e05da59a3..30e45a074d 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -27,6 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; +use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::inclusive_projection::InclusiveProjection; @@ -62,6 +63,11 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + // TODO: defaults to false for now whilst delete file processing + // is still being worked on but will switch to a default of true + // once this work is complete + delete_file_processing_enabled: bool, } impl<'a> TableScanBuilder<'a> { @@ -80,6 +86,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_processing_enabled: false, } } @@ -186,6 +193,17 @@ impl<'a> TableScanBuilder<'a> { self } + /// Determines whether to enable delete file processing (currently disabled by default) + /// + /// When disabled, delete files are ignored. + pub fn with_delete_file_processing_enabled( + mut self, + delete_file_processing_enabled: bool, + ) -> Self { + self.delete_file_processing_enabled = delete_file_processing_enabled; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -294,6 +312,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_processing_enabled: self.delete_file_processing_enabled, }) } } @@ -319,6 +338,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_processing_enabled: bool, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -347,18 +367,33 @@ impl TableScan { let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; // used to stream ManifestEntryContexts between stages of the file plan operation - let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = + channel(concurrency_limit_manifest_files); + let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(concurrency_limit_manifest_files); + // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)> = + if self.delete_file_processing_enabled { + Some(DeleteFileIndex::new()) + } else { + None + }; + let manifest_list = self.plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out - // partitions cannot match the scan's filter - let manifest_file_contexts = self - .plan_context - .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any + // whose partitions cannot match this + // scan's filter + let manifest_file_contexts = self.plan_context.build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| { + (delete_file_idx.clone(), manifest_entry_delete_ctx_tx) + }), + )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -375,17 +410,45 @@ impl TableScan { } }); - let mut channel_for_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + + if let Some((_, delete_file_tx)) = delete_file_idx_and_tx { + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(manifest_entry_context, tx) + .await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + } - // Process the [`ManifestEntry`] stream in parallel + // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { - let result = manifest_entry_ctx_rx + let result = manifest_entry_data_ctx_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) .try_for_each_concurrent( concurrency_limit_manifest_entries, |(manifest_entry_context, tx)| async move { spawn(async move { - Self::process_manifest_entry(manifest_entry_context, tx).await + Self::process_data_manifest_entry(manifest_entry_context, tx).await }) .await }, @@ -393,7 +456,7 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_manifest_entry_error.send(Err(error)).await; + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; } }); @@ -427,7 +490,7 @@ impl TableScan { &self.plan_context.snapshot } - async fn process_manifest_entry( + async fn process_data_manifest_entry( manifest_entry_context: ManifestEntryContext, mut file_scan_task_tx: Sender>, ) -> Result<()> { @@ -436,12 +499,11 @@ impl TableScan { return Ok(()); } - // abort the plan if we encounter a manifest entry whose data file's - // content type is currently unsupported + // abort the plan if we encounter a manifest entry for a delete file if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::FeatureUnsupported, - "Only Data files currently supported", + "Encountered an entry for a delete file in a data file manifest", )); } @@ -479,7 +541,50 @@ impl TableScan { // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task())) + .send(Ok(manifest_entry_context.into_file_scan_task().await?)) + .await?; + + Ok(()) + } + + async fn process_delete_manifest_entry( + manifest_entry_context: ManifestEntryContext, + mut delete_file_ctx_tx: Sender, + ) -> Result<()> { + // skip processing this manifest entry if it has been marked as deleted + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // abort the plan if we encounter a manifest entry that is not for a delete file + if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a data file in a delete manifest", + )); + } + + if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates { + let expression_evaluator_cache = + manifest_entry_context.expression_evaluator_cache.as_ref(); + + let expression_evaluator = expression_evaluator_cache.get( + manifest_entry_context.partition_spec_id, + &bound_predicates.partition_bound_predicate, + )?; + + // skip any data file whose partition data indicates that it can't contain + // any data that matches this scan's filter + if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { + return Ok(()); + } + } + + delete_file_ctx_tx + .send(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + }) .await?; Ok(()) @@ -503,6 +608,7 @@ struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, + delete_file_index: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -515,6 +621,7 @@ struct ManifestEntryContext { bound_predicates: Option>, partition_spec_id: i32, snapshot_schema: SchemaRef, + delete_file_index: Option, } impl ManifestFileContext { @@ -529,6 +636,7 @@ impl ManifestFileContext { field_ids, mut sender, expression_evaluator_cache, + delete_file_index, .. } = self; @@ -536,13 +644,14 @@ impl ManifestFileContext { for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid clone + // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), expression_evaluator_cache: expression_evaluator_cache.clone(), field_ids: field_ids.clone(), partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), + delete_file_index: delete_file_index.clone(), }; sender @@ -558,8 +667,19 @@ impl ManifestFileContext { impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - fn into_file_scan_task(self) -> FileScanTask { - FileScanTask { + async fn into_file_scan_task(self) -> Result { + let deletes = if let Some(delete_file_index) = self.delete_file_index { + delete_file_index + .get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ) + .await? + } else { + vec![] + }; + + Ok(FileScanTask { start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), @@ -573,7 +693,9 @@ impl ManifestEntryContext { predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - } + + deletes, + }) } } @@ -609,29 +731,33 @@ impl PlanContext { fn build_manifest_file_contexts( &self, manifest_list: Arc, - sender: Sender, + tx_data: Sender, + delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)>, ) -> Result>>> { - let entries = manifest_list.entries(); - - if entries - .iter() - .any(|e| e.content != ManifestContentType::Data) - { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Merge-on-read is not yet supported", - )); - } + let manifest_files = manifest_list.entries().iter(); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - if self.predicate.is_some() { - for manifest_file in entries { + + for manifest_file in manifest_files { + let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes { + let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else { + continue; + }; + (Some(delete_file_idx.clone()), tx.clone()) + } else { + ( + delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()), + tx_data.clone(), + ) + }; + + let partition_bound_predicate = if self.predicate.is_some() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip // if it cannot contain any matching rows - if self + if !self .manifest_evaluator_cache .get( manifest_file.partition_spec_id, @@ -639,19 +765,22 @@ impl PlanContext { ) .eval(manifest_file)? { - let mfc = self.create_manifest_file_context( - manifest_file, - Some(partition_bound_predicate), - sender.clone(), - ); - filtered_mfcs.push(Ok(mfc)); + continue; } - } - } else { - for manifest_file in entries { - let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); - filtered_mfcs.push(Ok(mfc)); - } + + Some(partition_bound_predicate) + } else { + None + }; + + let mfc = self.create_manifest_file_context( + manifest_file, + partition_bound_predicate, + tx, + delete_file_idx, + ); + + filtered_mfcs.push(Ok(mfc)); } Ok(Box::new(filtered_mfcs.into_iter())) @@ -662,6 +791,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, + delete_file_index: Option, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -683,6 +813,7 @@ impl PlanContext { snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), + delete_file_index, } } } @@ -909,8 +1040,10 @@ pub struct FileScanTask { /// The data file path corresponding to the task. pub data_file_path: String, + /// The content type of the file to scan. pub data_file_content: DataContentType, + /// The format of the file to scan. pub data_file_format: DataFileFormat, @@ -921,6 +1054,38 @@ pub struct FileScanTask { /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + + /// The list of delete files that may need to be applied to this data file + pub deletes: Vec, +} + +/// A task to scan part of file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FileScanTaskDeleteFile { + /// The delete file path + pub file_path: String, + + /// delete file type + pub file_type: DataContentType, + + /// partition id + pub partition_spec_id: i32, +} + +#[derive(Debug)] +pub(crate) struct DeleteFileContext { + pub(crate) manifest_entry: ManifestEntryRef, + pub(crate) partition_spec_id: i32, +} + +impl From<&DeleteFileContext> for FileScanTaskDeleteFile { + fn from(ctx: &DeleteFileContext) -> Self { + FileScanTaskDeleteFile { + file_path: ctx.manifest_entry.file_path().to_string(), + file_type: ctx.manifest_entry.content_type(), + partition_spec_id: ctx.partition_spec_id, + } + } } impl FileScanTask { @@ -1409,16 +1574,16 @@ pub mod tests { .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche1: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .await .unwrap(); - let batche2: Vec<_> = batch_stream.try_collect().await.unwrap(); + let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); - assert_eq!(batche1, batche2); + assert_eq!(batch_1, batch_2); } #[tokio::test] @@ -1860,6 +2025,7 @@ pub mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + deletes: vec![], }; test_fn(task); @@ -1874,6 +2040,7 @@ pub mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + deletes: vec![], }; test_fn(task); } diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index 35230966ae..17bbd7d99a 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -7,7 +7,12 @@ "last-column-id": 3, "current-schema-id": 1, "schemas": [ - {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}, { "type": "struct", "schema-id": 1, @@ -25,7 +30,14 @@ } ], "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + {"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000} + ] + } + ], "last-partition-id": 1000, "default-sort-order-id": 3, "sort-orders": [ diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 41ca057a6b..9c6e3689d4 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -17,6 +17,7 @@ //! Integration tests for rest catalog. +use futures::TryStreamExt; use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_integration_tests::set_test_fixture; @@ -35,16 +36,31 @@ async fn test_read_table_with_positional_deletes() { .await .unwrap(); - let scan = table.scan().build().unwrap(); + let scan = table + .scan() + .with_delete_file_processing_enabled(true) + .build() + .unwrap(); println!("{:?}", scan); - assert!(scan - .to_arrow() + let plan: Vec<_> = scan + .plan_files() + .await + .unwrap() + .try_collect() .await - .is_err_and(|e| e.kind() == FeatureUnsupported)); + .unwrap(); + println!("{:?}", plan); + + // Scan plan phase should include delete files in file plan + // when with_delete_file_processing_enabled == true + assert_eq!(plan[0].deletes.len(), 2); // 😱 If we don't support positional deletes, we should fail when we try to read a table that // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py + let result = scan.to_arrow().await.unwrap().try_collect::>().await; + + assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported)); // When we get support for it: // let batch_stream = scan.to_arrow().await.unwrap();