From 3c0e53e4984200235f78f399ed8bbb68e11795b5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sun, 2 Mar 2025 23:21:13 +0800 Subject: [PATCH 1/3] . Signed-off-by: xxchan --- crates/iceberg/src/scan.rs | 207 ++++++++++++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index a1f53fcfc6..2f8be448ff 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -17,13 +17,14 @@ //! Table scan api. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use arrow_array::RecordBatch; use futures::channel::mpsc::{channel, Sender}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; @@ -38,7 +39,7 @@ use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, - ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef, + ManifestList, ManifestStatus, Operation, Schema, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::utils::available_parallelism; @@ -55,6 +56,10 @@ pub struct TableScanBuilder<'a> { // Defaults to none which means select all columns column_names: Option>, snapshot_id: Option, + /// Exclusive. Used for incremental scan. + from_snapshot_id: Option, + /// Inclusive. Used for incremental scan. + to_snapshot_id: Option, batch_size: Option, case_sensitive: bool, filter: Option, @@ -78,6 +83,8 @@ impl<'a> TableScanBuilder<'a> { table, column_names: None, snapshot_id: None, + from_snapshot_id: None, + to_snapshot_id: None, batch_size: None, case_sensitive: true, filter: None, @@ -140,6 +147,18 @@ impl<'a> TableScanBuilder<'a> { self } + /// Set the starting snapshot id (exclusive) for incremental scan. + pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { + self.from_snapshot_id = Some(from_snapshot_id); + self + } + + /// Set the ending snapshot id (inclusive) for incremental scan. + pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self { + self.to_snapshot_id = Some(to_snapshot_id); + self + } + /// Sets the concurrency limit for both manifest files and manifest /// entries for this scan pub fn with_concurrency_limit(mut self, limit: usize) -> Self { @@ -206,6 +225,25 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { + // Validate that we have either a snapshot scan or an incremental scan configuration + if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() { + // For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional. + if self.to_snapshot_id.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Incremental scan requires to_snapshot_id to be set", + )); + } + + // snapshot_id should not be set for incremental scan + if self.snapshot_id.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.", + )); + } + } + let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -227,7 +265,6 @@ impl<'a> TableScanBuilder<'a> { })? .clone(), }; - let schema = snapshot.schema(self.table.metadata())?; // Check that all column names exist in the schema. @@ -297,6 +334,8 @@ impl<'a> TableScanBuilder<'a> { snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), object_cache: self.table.object_cache(), field_ids: Arc::new(field_ids), + from_snapshot_id: self.from_snapshot_id, + to_snapshot_id: self.to_snapshot_id, partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), @@ -358,6 +397,11 @@ struct PlanContext { partition_filter_cache: Arc, manifest_evaluator_cache: Arc, expression_evaluator_cache: Arc, + + // for incremental scan. + // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`. + from_snapshot_id: Option, + to_snapshot_id: Option, } impl TableScan { @@ -375,6 +419,65 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + if let Some(to_snapshot_id) = self.plan_context.to_snapshot_id { + // Incremental scan mode + let added_files = added_files_between( + &self.plan_context.object_cache, + &self.plan_context.table_metadata, + to_snapshot_id, + self.plan_context.from_snapshot_id, + ) + .await?; + + for entry in added_files { + let manifest_entry_context = ManifestEntryContext { + manifest_entry: entry, + expression_evaluator_cache: self + .plan_context + .expression_evaluator_cache + .clone(), + field_ids: self.plan_context.field_ids.clone(), + bound_predicates: None, // TODO: support predicates in incremental scan + partition_spec_id: 0, // TODO: get correct partition spec id + // It's used to skip any data file whose partition data indicates that it can't contain + // any data that matches this scan's filter + snapshot_schema: self.plan_context.snapshot_schema.clone(), + // delete is not supported in incremental scan + delete_file_index: None, + }; + + manifest_entry_data_ctx_tx + .clone() + .send(manifest_entry_context) + .await + .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))?; + } + + let mut channel_for_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_data_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_data_manifest_entry(manifest_entry_context, tx).await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_manifest_entry_error.send(Err(error)).await; + } + }); + + return Ok(file_scan_task_rx.boxed()); + } + let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)> = if self.delete_file_processing_enabled { Some(DeleteFileIndex::new()) @@ -1146,6 +1249,104 @@ impl FileScanTask { } } +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +} + +/// Get all added files between two snapshots. +/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive). +async fn added_files_between( + object_cache: &ObjectCache, + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Result> { + let mut added_files = vec![]; + + let append_snapshots = + ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id) + .filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append)) + .collect_vec(); + let snapshot_ids: HashSet = append_snapshots + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect(); + + for snapshot in append_snapshots { + let manifest_list = object_cache + .get_manifest_list(&snapshot, table_metadata) + .await?; + + for manifest_file in manifest_list.entries() { + if !snapshot_ids.contains(&manifest_file.added_snapshot_id) { + continue; + } + let manifest = object_cache.get_manifest(manifest_file).await?; + let entries = manifest.entries().iter().filter(|entry| { + matches!(entry.status(), ManifestStatus::Added) + && ( + // Is it possible that the snapshot id here is not contained? + entry.snapshot_id().is_none() + || snapshot_ids.contains(&entry.snapshot_id().unwrap()) + ) + }); + added_files.extend(entries.cloned()); + } + } + + Ok(added_files) +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; From c59b7383cdd7efc5aba6eb6819e552020d2229c8 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 17 Mar 2025 23:17:41 +0800 Subject: [PATCH 2/3] fix: check delete file Signed-off-by: xxchan --- crates/iceberg/src/scan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 2f8be448ff..5153991877 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1334,6 +1334,7 @@ async fn added_files_between( let manifest = object_cache.get_manifest(manifest_file).await?; let entries = manifest.entries().iter().filter(|entry| { matches!(entry.status(), ManifestStatus::Added) + && matches!(entry.data_file().content_type(), DataContentType::Data) && ( // Is it possible that the snapshot id here is not contained? entry.snapshot_id().is_none() From 44472419573c9a5bf6392ce0079bab92181ecfb0 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 18 Mar 2025 16:47:25 +0800 Subject: [PATCH 3/3] include Operation::Overwrite Signed-off-by: xxchan --- crates/iceberg/src/scan.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5153991877..ab9de6a403 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1304,7 +1304,11 @@ fn ancestors_between( } /// Get all added files between two snapshots. -/// The files in `latest_snapshot_id` (inclusive) but not in `oldest_snapshot_id` (exclusive). +/// - data files in `Append` and `Overwrite` snapshots are included. +/// - delete files are ignored +/// - `Replace` snapshots (e.g., compaction) are ignored. +/// +/// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive. async fn added_files_between( object_cache: &ObjectCache, table_metadata: &TableMetadataRef, @@ -1313,16 +1317,20 @@ async fn added_files_between( ) -> Result> { let mut added_files = vec![]; - let append_snapshots = - ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id) - .filter(|snapshot| matches!(snapshot.summary().operation, Operation::Append)) - .collect_vec(); - let snapshot_ids: HashSet = append_snapshots + let snapshots = ancestors_between(table_metadata, latest_snapshot_id, oldest_snapshot_id) + .filter(|snapshot| { + matches!( + snapshot.summary().operation, + Operation::Append | Operation::Overwrite + ) + }) + .collect_vec(); + let snapshot_ids: HashSet = snapshots .iter() .map(|snapshot| snapshot.snapshot_id()) .collect(); - for snapshot in append_snapshots { + for snapshot in snapshots { let manifest_list = object_cache .get_manifest_list(&snapshot, table_metadata) .await?;