diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs index 3f6169270..fbd9219d5 100644 --- a/crates/iceberg/src/scan/incremental/context.rs +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -27,8 +27,7 @@ use crate::io::object_cache::ObjectCache; use crate::scan::ExpressionEvaluatorCache; use crate::scan::context::{ManifestEntryContext, ManifestEntryFilterFn, ManifestFileContext}; use crate::spec::{ - ManifestContentType, ManifestEntryRef, ManifestFile, Operation, SchemaRef, SnapshotRef, - TableMetadataRef, + ManifestContentType, ManifestEntryRef, ManifestFile, SchemaRef, SnapshotRef, TableMetadataRef, }; #[derive(Debug)] @@ -65,28 +64,8 @@ impl IncrementalPlanContext { delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, ) -> Result> + 'static>> { - // Validate that all snapshots are Append or Delete operations and collect their IDs - let snapshot_ids: HashSet = { - let mut ids = HashSet::new(); - for snapshot in self.snapshots.iter() { - let operation = &snapshot.summary().operation; - if !matches!( - operation, - Operation::Append | Operation::Overwrite | Operation::Delete - ) { - return Err(crate::Error::new( - crate::ErrorKind::FeatureUnsupported, - format!( - "Incremental scan only supports Append, Overwrite and Delete operations, but snapshot {} has operation {:?}", - snapshot.snapshot_id(), - operation - ), - )); - } - ids.insert(snapshot.snapshot_id()); - } - ids - }; + // Collect all snapshot IDs (all operation types are supported) + let snapshot_ids: HashSet = self.snapshots.iter().map(|s| s.snapshot_id()).collect(); let (manifest_files, filter_fn) = { let mut manifest_files = HashSet::::new(); diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 32f007873..0ae836b53 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -33,7 +33,7 @@ use crate::TableIdent; use crate::io::{FileIO, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, ManifestEntry, ManifestListWriter, - ManifestStatus, ManifestWriterBuilder, PartitionSpec, Struct, TableMetadata, + ManifestStatus, ManifestWriterBuilder, PartitionSpec, SchemaRef, Struct, TableMetadata, }; use crate::table::Table; @@ -72,6 +72,20 @@ pub enum Operation { Vec<(i64, String)>, Vec, ), + + /// Replace operation for file compaction/reorganization. + /// The logical table content does NOT change - only the physical file representation changes. + /// + /// Parameters: + /// 1. Files to compact: Vec of existing file names that are being compacted + /// 2. Target file: String name of the new compacted file + /// + /// Example: `Replace(vec!["file-a.parquet", "file-b.parquet"], "file-a-b-compacted.parquet")` + /// This compacts two existing files into one new file with the same logical content. + /// + /// For an incremental scan that only contains Replace operations, the result should be + /// zero additions and zero deletions, because the logical data hasn't changed. + Replace(Vec, String), } /// Tracks the state of data files across snapshots @@ -81,6 +95,8 @@ struct DataFileInfo { snapshot_id: i64, sequence_number: i64, n_values: Vec, + data_values: Vec, + file_size: u64, } /// Test fixture that creates a table with custom snapshots based on operations. @@ -110,7 +126,6 @@ pub struct IncrementalTestFixture { impl IncrementalTestFixture { /// Create a new test fixture with the given operations. pub async fn new(operations: Vec) -> Self { - // Use pwd let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("incremental_test_table"); @@ -142,6 +157,7 @@ impl IncrementalTestFixture { Operation::Add(..) => "append", Operation::Delete(..) => "delete", Operation::Overwrite(..) => "overwrite", + Operation::Replace(..) => "replace", }; let manifest_list_location = @@ -282,7 +298,7 @@ impl IncrementalTestFixture { // Track all data files and their contents across snapshots let mut data_files: Vec = Vec::new(); #[allow(clippy::type_complexity)] - let mut delete_files: Vec<(String, i64, i64, Vec<(String, i64)>)> = Vec::new(); // (path, snapshot_id, sequence_number, [(data_file_path, position)]) + let mut delete_files: Vec<(String, i64, i64, Vec<(String, i64)>, u64)> = Vec::new(); // (path, snapshot_id, sequence_number, [(data_file_path, position)], file_size) for (snapshot_idx, operation) in operations.iter().enumerate() { let snapshot_id = (snapshot_idx + 1) as i64; @@ -324,7 +340,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file.path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(data_file.file_size) .record_count(data_file.n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -339,7 +355,8 @@ impl IncrementalTestFixture { // Add new data if not empty if !n_values.is_empty() { let data_file_path = format!("{}/data/{}", &self.table_location, file_name); - self.write_parquet_file(&data_file_path, &n_values, &data_values) + let file_size = self + .write_parquet_file(&data_file_path, &n_values, &data_values) .await; data_writer @@ -352,7 +369,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(file_size) .record_count(n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -369,6 +386,8 @@ impl IncrementalTestFixture { snapshot_id, sequence_number, n_values, + data_values, + file_size, }); } @@ -386,12 +405,13 @@ impl IncrementalTestFixture { ) .build_v2_deletes(); - for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files { let delete_count = delete_files .iter() - .filter(|(p, _, _, _)| p == delete_path) - .map(|(_, _, _, deletes)| deletes.len()) + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) .sum::(); delete_writer @@ -407,7 +427,7 @@ impl IncrementalTestFixture { .content(DataContentType::PositionDeletes) .file_path(delete_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(512) + .file_size_in_bytes(*del_file_size) .record_count(delete_count as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -477,7 +497,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file.path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(data_file.file_size) .record_count(data_file.n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -502,11 +522,13 @@ impl IncrementalTestFixture { .build_v2_deletes(); // Add existing delete files - for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files { + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files + { let delete_count = delete_files .iter() - .filter(|(p, _, _, _)| p == delete_path) - .map(|(_, _, _, deletes)| deletes.len()) + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) .sum::(); delete_writer @@ -522,7 +544,7 @@ impl IncrementalTestFixture { .content(DataContentType::PositionDeletes) .file_path(delete_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(512) + .file_size_in_bytes(*del_file_size) .record_count(delete_count as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -542,12 +564,13 @@ impl IncrementalTestFixture { snapshot_id, Uuid::new_v4() ); - self.write_positional_delete_file( - &delete_file_path, - &data_file_path, - &positions, - ) - .await; + let delete_file_size = self + .write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; delete_writer .add_entry( @@ -559,7 +582,7 @@ impl IncrementalTestFixture { .content(DataContentType::PositionDeletes) .file_path(delete_file_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(512) + .file_size_in_bytes(delete_file_size) .record_count(positions.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -579,6 +602,7 @@ impl IncrementalTestFixture { .into_iter() .map(|pos| (data_file_path.clone(), pos)) .collect(), + delete_file_size, )); } @@ -642,7 +666,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file.path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(data_file.file_size) .record_count(data_file.n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -667,7 +691,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file.path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(data_file.file_size) .record_count(data_file.n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -687,7 +711,8 @@ impl IncrementalTestFixture { rows.iter().map(|(_, d)| d.clone()).collect(); let data_file_path = format!("{}/data/{}", &self.table_location, file_name); - self.write_parquet_file(&data_file_path, &n_values, &data_values) + let file_size = self + .write_parquet_file(&data_file_path, &n_values, &data_values) .await; data_writer @@ -700,7 +725,7 @@ impl IncrementalTestFixture { .content(DataContentType::Data) .file_path(data_file_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(1024) + .file_size_in_bytes(file_size) .record_count(n_values.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -717,6 +742,8 @@ impl IncrementalTestFixture { snapshot_id, sequence_number, n_values, + data_values, + file_size, }); } @@ -739,12 +766,13 @@ impl IncrementalTestFixture { .build_v2_deletes(); // Add existing delete files - for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files + for (delete_path, del_snapshot_id, del_sequence_number, _, del_file_size) in + &delete_files { let delete_count = delete_files .iter() - .filter(|(p, _, _, _)| p == delete_path) - .map(|(_, _, _, deletes)| deletes.len()) + .filter(|(p, _, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes, _)| deletes.len()) .sum::(); delete_writer @@ -760,7 +788,7 @@ impl IncrementalTestFixture { .content(DataContentType::PositionDeletes) .file_path(delete_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(512) + .file_size_in_bytes(*del_file_size) .record_count(delete_count as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -792,12 +820,13 @@ impl IncrementalTestFixture { snapshot_id, Uuid::new_v4() ); - self.write_positional_delete_file( - &delete_file_path, - &data_file_path, - &positions, - ) - .await; + let delete_file_size = self + .write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; delete_writer .add_entry( @@ -809,7 +838,7 @@ impl IncrementalTestFixture { .content(DataContentType::PositionDeletes) .file_path(delete_file_path.clone()) .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(512) + .file_size_in_bytes(delete_file_size) .record_count(positions.len() as u64) .partition(empty_partition.clone()) .key_metadata(None) @@ -829,6 +858,7 @@ impl IncrementalTestFixture { .into_iter() .map(|pos| (data_file_path.clone(), pos)) .collect(), + delete_file_size, )); } } @@ -855,11 +885,227 @@ impl IncrementalTestFixture { manifest_list_write.close().await.unwrap(); } + + Operation::Replace(files_to_compact, target_file) => { + // Replace operation: compact existing files into a new file + // The logical content doesn't change, only the physical representation + self.handle_replace_operation( + files_to_compact, + target_file, + &mut data_files, + &delete_files, + current_schema.clone(), + &partition_spec, + snapshot_id, + sequence_number, + parent_snapshot_id, + ) + .await + .unwrap(); + } } } } - async fn write_parquet_file(&self, path: &str, n_values: &[i32], data_values: &[String]) { + #[allow(clippy::too_many_arguments, clippy::type_complexity)] + async fn handle_replace_operation( + &mut self, + files_to_compact: &[String], + target_file: &str, + data_files: &mut Vec, + delete_files: &[(String, i64, i64, Vec<(String, i64)>, u64)], + current_schema: SchemaRef, + partition_spec: &PartitionSpec, + snapshot_id: i64, + sequence_number: i64, + parent_snapshot_id: Option, + ) -> Result<(), Box> { + let empty_partition = Struct::empty(); + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema, + partition_spec.clone(), + ) + .build_v2_data(); + + // Determine which files are being compacted + let files_to_compact_set: std::collections::HashSet = files_to_compact + .iter() + .map(|f| format!("{}/data/{}", &self.table_location, f)) + .collect(); + + // Build a set of deleted positions for each file being compacted + let mut deleted_positions: std::collections::HashMap< + String, + std::collections::HashSet, + > = std::collections::HashMap::new(); + for (_, _, _, delete_records, _) in delete_files { + for (file_path, position) in delete_records { + deleted_positions + .entry(file_path.clone()) + .or_default() + .insert(*position); + } + } + + // Track the data being compacted + let mut compacted_data: Vec<(i32, String)> = Vec::new(); + let mut compacted_record_count: u64 = 0; + + // Add existing data files (mark compacted ones as DELETED, others as EXISTING) + for data_file in data_files.iter() { + if files_to_compact_set.contains(&data_file.path) { + // Mark file as deleted (being compacted away) + data_writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Collect data from compacted files, filtering out deleted records + let file_deleted_positions = deleted_positions.get(&data_file.path); + for (position, (n, d)) in data_file + .n_values + .iter() + .zip(data_file.data_values.iter()) + .enumerate() + { + // Skip this record if it was deleted via positional delete + if let Some(deleted) = file_deleted_positions { + if deleted.contains(&(position as i64)) { + continue; + } + } + compacted_data.push((*n, d.clone())); + } + compacted_record_count += data_file.n_values.len() as u64; + } else { + // Keep existing file + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(data_file.file_size) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + } + + // Create the compacted file with the collected data + if !compacted_data.is_empty() { + let compacted_n_values: Vec = compacted_data.iter().map(|(n, _)| *n).collect(); + let compacted_data_values: Vec = + compacted_data.iter().map(|(_, d)| d.clone()).collect(); + let compacted_file_path = format!("{}/data/{}", &self.table_location, target_file); + + let file_size = self + .write_parquet_file( + &compacted_file_path, + &compacted_n_values, + &compacted_data_values, + ) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(compacted_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(file_size) + .record_count(compacted_record_count) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Update data_files tracking: remove compacted, add new + data_files.retain(|df| !files_to_compact_set.contains(&df.path)); + data_files.push(DataFileInfo { + path: compacted_file_path, + snapshot_id, + sequence_number, + n_values: compacted_n_values, + data_values: compacted_data_values, + file_size, + }); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Write manifest list (no delete manifests for Replace) + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(vec![data_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + Ok(()) + } + + async fn write_parquet_file( + &self, + path: &str, + n_values: &[i32], + data_values: &[String], + ) -> u64 { let schema = { let fields = vec![ arrow_schema::Field::new("n", arrow_schema::DataType::Int32, false).with_metadata( @@ -887,6 +1133,9 @@ impl IncrementalTestFixture { let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); + + // Return the actual file size + fs::metadata(path).unwrap().len() } async fn write_positional_delete_file( @@ -894,7 +1143,7 @@ impl IncrementalTestFixture { path: &str, data_file_path: &str, positions: &[i64], - ) { + ) -> u64 { let schema = { let fields = vec![ arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) @@ -925,6 +1174,9 @@ impl IncrementalTestFixture { let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); + + // Return the actual file size + fs::metadata(path).unwrap().len() } /// Verify incremental scan results. @@ -1743,6 +1995,111 @@ async fn test_incremental_scan_positional_deletes_then_file_delete() { .await; } +#[tokio::test] +async fn test_incremental_scan_with_replace_operation() { + // This test verifies the Replace operation semantics: + // - A Replace operation compacts multiple files into one + // - The logical table content does NOT change (same data) + // - But physically, files are reorganized: old files are DELETED, new file is ADDED + // - For incremental scans, we report these physical changes (additions and deletions) + // even though the logical data is identical + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-a with 3 rows + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file-b with 2 rows + Operation::Add( + vec![(10, "x".to_string()), (20, "y".to_string())], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Replace (compact file-a and file-b into file-ab-compact) + // This will delete file-a and file-b, and add file-ab-compact with the same data + Operation::Replace( + vec!["file-a.parquet".to_string(), "file-b.parquet".to_string()], + "file-ab-compact.parquet".to_string(), + ), + ]) + .await; + + // Verify we have 4 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 4); + + // Verify snapshot operations + assert_eq!( + snapshots[3].summary().operation, + crate::spec::Operation::Replace, + "Snapshot 4 should be a Replace operation" + ); + + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + + // Test 1: Incremental scan from snapshot 3 to 4 (ONLY Replace operation) + // Physical changes: file-a and file-b are deleted, file-ab-compact is added + // But the data is the same, so: + // - Additions: all 5 rows from file-ab-compact (1,2,3,10,20) + // - Deletions: all 5 rows from file-a and file-b (positions sorted by (position, file_path)) + fixture + .verify_incremental_scan( + 3, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![ + (0, &file_a_path), + (0, &file_b_path), + (1, &file_a_path), + (1, &file_b_path), + (2, &file_a_path), + ], + ) + .await; + + // Test 2: Incremental scan from snapshot 1 to 4 (includes Appends and Replace) + // Snapshot 2 adds file-a (1,2,3) + // Snapshot 3 adds file-b (10,20) + // Snapshot 4 replaces both files (deletes file-a, file-b; adds file-ab-compact) + // Since file-a and file-b are added in the scan range and then deleted in the Replace: + // - The additions of file-a and file-b cancel with their deletions + // - Net result: only file-ab-compact is added (1,2,3,10,20) + // - Deletions: empty (file-a and file-b deletions cancelled by their additions) + fixture + .verify_incremental_scan( + 1, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![], + ) + .await; + + // Test 3: Incremental scan from snapshot 2 to 4 + // Starting from snapshot 2: file-a (1,2,3) exists (added before scan start) + // Snapshot 3: adds file-b (10,20) + // Snapshot 4: replaces both files with file-ab-compact + // Since file-a was added before the scan started, its deletion is not cancelled + // But file-b is added and deleted within the scan, so cancels out + // Net result: + // - Additions: file-ab-compact (1,2,3,10,20) + // - Deletions: file-a (0,1,2) since it existed at scan start + fixture + .verify_incremental_scan( + 2, + 4, + vec![(1, "a"), (2, "b"), (3, "c"), (10, "x"), (20, "y")], + vec![(0, &file_a_path), (1, &file_a_path), (2, &file_a_path)], + ) + .await; +} + #[tokio::test] async fn test_incremental_scan_with_deleted_files_cancellation() { // This test verifies that incremental scans properly handle file deletions with cancellation logic: @@ -1848,3 +2205,166 @@ async fn test_incremental_scan_with_deleted_files_cancellation() { ) .await; } + +#[tokio::test] +async fn test_incremental_scan_with_replace_and_positional_deletes() { + // This test verifies Replace operations with positional deletes before and after the replace. + // + // Test scenario: + // Snapshot 1: Empty starting point + // Snapshot 2: Add file-a with 5 records (1,2,3,4,5) + // Snapshot 3: Add file-b with 3 records (10,11,12) + // Snapshot 4: Delete record at position 1 in file-a (delete "2") + // Snapshot 5: Replace - compact file-a and file-b into file-ab-compact + // (containing records 1,3,4,5 from file-a and 10,11,12 from file-b after the previous delete) + // Snapshot 6: Delete record at position 2 in file-ab-compact (delete "4") + + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file-a with 5 rows + Operation::Add( + vec![ + (1, "1".to_string()), + (2, "2".to_string()), + (3, "3".to_string()), + (4, "4".to_string()), + (5, "5".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file-b with 3 rows + Operation::Add( + vec![ + (10, "10".to_string()), + (11, "11".to_string()), + (12, "12".to_string()), + ], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Delete position 1 (record "2") from file-a + Operation::Delete(vec![(1, "file-a.parquet".to_string())]), + // Snapshot 5: Replace - compact file-a and file-b into file-ab-compact + Operation::Replace( + vec!["file-a.parquet".to_string(), "file-b.parquet".to_string()], + "file-ab-compact.parquet".to_string(), + ), + // Snapshot 6: Delete position 2 (record "4") from file-ab-compact + Operation::Delete(vec![(2, "file-ab-compact.parquet".to_string())]), + ]) + .await; + + // Test 1: Full scan from snapshot 1 to 6 + // Snapshot 2: Add file-a (1,2,3,4,5) + // Snapshot 3: Add file-b (10,11,12) + // Snapshot 4: Delete position 1 from file-a (record "2" deleted) + // Snapshot 5: Replace both files with file-ab-compact (compacts to 1,3,4,5,10,11,12, filtering out position 1) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: records from compacted file with deleted positions filtered (1,3,5,10,11,12) + // Note: position 1 from file-a (record "2") is filtered during compaction + // position 2 from compacted file (record "4") is never added since it's deleted in snapshot 6 + // - Deletions: empty (deletes from Replace are absorbed into the additions of the compacted file) + fixture + .verify_incremental_scan( + 1, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + vec![], + ) + .await; + + // Test 2: Scan from snapshot 3 to 6 (after file-b added, through replace and final delete) + // Snapshot 3: Starting point with file-a and file-b (both exist in starting snapshot) + // Snapshot 4: Delete position 1 from file-a (record "2" deleted) + // Snapshot 5: Replace both files with file-ab-compact (filters out position 1 from file-a) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: compacted file records with filtered positions (1,3,5,10,11,12) + // - Deletions: All positions from file-a (0-4) and file-b (0-2) because these files + // existed in the starting snapshot (3) and are deleted/replaced in snapshot 5. + // Deletes are sorted by (position, file_path) tuple ordering + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + let test2_deletes = vec![ + (0, file_a_path.as_str()), + (0, file_b_path.as_str()), + (1, file_a_path.as_str()), + (1, file_b_path.as_str()), + (2, file_a_path.as_str()), + (2, file_b_path.as_str()), + (3, file_a_path.as_str()), + (4, file_a_path.as_str()), + ]; + fixture + .verify_incremental_scan( + 3, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + test2_deletes, + ) + .await; + + // Test 3: Scan from snapshot 4 to 6 (after first delete, through replace and final delete) + // Snapshot 4: Starting point - file-a already has position 1 deleted (record "2") + // Snapshot 5: Replace both files with file-ab-compact (filters out position 1) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: compacted records with position 1 filtered from file-a (1,3,5,10,11,12) + // - Deletions: All positions from file-a (0-4) and file-b (0-2) because these files + // existed in the starting snapshot (4) and are deleted/replaced in snapshot 5. + // Sorted by (position, file_path) tuple ordering + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + let test3_deletes = vec![ + (0, file_a_path.as_str()), + (0, file_b_path.as_str()), + (1, file_a_path.as_str()), + (1, file_b_path.as_str()), + (2, file_a_path.as_str()), + (2, file_b_path.as_str()), + (3, file_a_path.as_str()), + (4, file_a_path.as_str()), + ]; + fixture + .verify_incremental_scan( + 4, + 6, + vec![ + (1, "1"), + (3, "3"), + (5, "5"), + (10, "10"), + (11, "11"), + (12, "12"), + ], + test3_deletes, + ) + .await; + + // Test 4: Scan from snapshot 5 to 6 (after replace, only sees the final delete) + // Snapshot 5: Starting point - file-ab-compact is newly created with records (1,3,4,5,10,11,12) + // Snapshot 6: Delete position 2 from file-ab-compact (record "4" deleted) + // Net result: + // - Additions: empty (file-ab-compact was added in snapshot 5, not in this scan range) + // - Deletions: position 2 from file-ab-compact (the positional delete in snapshot 6) + let file_ab_path = format!("{}/data/file-ab-compact.parquet", fixture.table_location); + let test4_deletes = vec![(2, file_ab_path.as_str())]; + fixture + .verify_incremental_scan(5, 6, vec![], test4_deletes) + .await; +}