From c00a80ffa9531dede619e3d1ab465704f06e8ebe Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 26 Mar 2025 15:54:28 -0400 Subject: [PATCH 1/2] feat: snapshot summaries --- crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/snapshot.rs | 10 +++ crates/iceberg/src/spec/snapshot_summary.rs | 34 ++++++++-- crates/iceberg/src/spec/table_metadata.rs | 5 ++ crates/iceberg/src/transaction/snapshot.rs | 70 ++++++++++++++++++--- 5 files changed, 105 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index a7fe9a3974..5cf2afa172 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -41,6 +41,7 @@ pub use manifest_list::*; pub use partition::*; pub use schema::*; pub use snapshot::*; +pub use snapshot_summary::*; pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index e73b8abaa3..93f87411e3 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -76,6 +76,16 @@ pub struct Summary { pub additional_properties: HashMap, } +impl Summary { + /// Create new snapshot summary + pub fn new(operation: Operation) -> Self { + Self { + operation, + additional_properties: HashMap::new(), + } + } +} + impl Default for Operation { fn default() -> Operation { Self::Append diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index fae0486356..847c1b4b80 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -16,10 +16,11 @@ // under the License. use std::collections::HashMap; +use std::sync::Arc; use itertools::Itertools; -use super::{DataContentType, DataFile, PartitionSpecRef}; +use super::{DataContentType, DataFile, PartitionSpecRef, Snapshot, SnapshotRef, TableMetadata}; use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary}; use crate::{Error, ErrorKind, Result}; @@ -48,8 +49,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size"; const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count"; const CHANGED_PARTITION_PREFIX: &str = "partitions."; +/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics. +/// It gathers metrics about added or removed data files and manifests, and tracks +/// partition-specific updates. #[derive(Default)] -#[allow(dead_code)] pub struct SnapshotSummaryCollector { metrics: UpdateMetrics, partition_metrics: HashMap, @@ -58,17 +61,19 @@ pub struct SnapshotSummaryCollector { trust_partition_metrics: bool, } -#[allow(dead_code)] impl SnapshotSummaryCollector { - // Set properties + /// Set properties for snapshot summary pub fn set(&mut self, key: &str, value: &str) { self.properties.insert(key.to_string(), value.to_string()); } + /// Sets the limit for including partition summaries. Summaries are not + /// included if the number of partitions is exceeded. pub fn set_partition_summary_limit(&mut self, limit: u64) { self.max_changed_partitions_for_summaries = limit; } + /// Adds a data file to the summary collector pub fn add_file( &mut self, data_file: &DataFile, @@ -81,6 +86,7 @@ impl SnapshotSummaryCollector { } } + /// Removes a data file from the summary collector pub fn remove_file( &mut self, data_file: &DataFile, @@ -93,12 +99,14 @@ impl SnapshotSummaryCollector { } } + /// Adds a manifest to the summary collector pub fn add_manifest(&mut self, manifest: &ManifestFile) { self.trust_partition_metrics = false; self.partition_metrics.clear(); self.metrics.add_manifest(manifest); } + /// Updates partition-specific metrics for a data file. pub fn update_partition_metrics( &mut self, schema: SchemaRef, @@ -116,6 +124,7 @@ impl SnapshotSummaryCollector { } } + /// Merges another `SnapshotSummaryCollector` into the current one pub fn merge(&mut self, summary: SnapshotSummaryCollector) { self.metrics.merge(&summary.metrics); self.properties.extend(summary.properties); @@ -133,6 +142,7 @@ impl SnapshotSummaryCollector { } } + /// Builds final map of summaries pub fn build(&self) -> HashMap { let mut properties = self.metrics.to_map(); let changed_partitions_count = self.partition_metrics.len() as u64; @@ -507,7 +517,21 @@ fn update_totals( .insert(total_property.to_string(), new_total.to_string()); } -// TODO: ancestors of function +// Returns the ancestors of the current snapshot +#[allow(dead_code)] +fn ancestors_of(current_snapshot: Snapshot, table_metadata: TableMetadata) -> Vec> { + let mut ancestors = Vec::new(); + let mut current: Option = Some(Arc::new(current_snapshot)); + + while let Some(snapshot) = current { + ancestors.push(snapshot.clone()); + current = snapshot + .parent_snapshot_id() + .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id).cloned()); + } + + ancestors +} #[cfg(test)] mod tests { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 94f1191b26..3b0d46e740 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -78,6 +78,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo /// Default value for max number of previous versions to keep. pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; +/// Property key for max number of partitions to keep summary stats for. +pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit"; +/// Default value for the max number of partitions to keep summary stats for. +pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; + /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index e1a2fa0d3b..f266491dba 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,9 +24,11 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct, - StructType, Summary, MAIN_BRANCH, + update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot, + SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, + MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, }; use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; @@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> { Ok(manifest_files) } - // # TODO - // Fulfill this function - fn summary(&self, snapshot_produce_operation: &OP) -> Summary { - Summary { - operation: snapshot_produce_operation.operation(), - additional_properties: self.snapshot_properties.clone(), + // Returns a `Summary` of the current snapshot + fn summary( + &self, + snapshot_produce_operation: &OP, + ) -> Result { + let mut summary_collector = SnapshotSummaryCollector::default(); + let table_metadata = self.tx.table.metadata_ref(); + + let partition_summary_limit = if let Some(limit) = table_metadata + .properties() + .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT) + { + if let Ok(limit) = limit.parse::() { + limit + } else { + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + } + } else { + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + }; + + summary_collector.set_partition_summary_limit(partition_summary_limit); + + for data_file in &self.added_data_files { + summary_collector.add_file( + data_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); } + + let previous_snapshot = table_metadata + .snapshot_by_id(self.snapshot_id) + .and_then(|snapshot| snapshot.parent_snapshot_id()) + .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id)); + + let mut additional_properties = summary_collector.build(); + additional_properties.extend(self.snapshot_properties.clone()); + + let summary = Summary { + operation: snapshot_produce_operation.operation(), + additional_properties, + }; + + update_snapshot_summaries( + summary, + previous_snapshot.map(|s| s.summary()), + snapshot_produce_operation.operation() == Operation::Overwrite, + ) } fn generate_manifest_list_file_path(&self, attempt: i64) -> String { @@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> { .await?; let next_seq_num = self.tx.table.metadata().next_sequence_number(); - let summary = self.summary(&snapshot_produce_operation); + let summary = self + .summary(&snapshot_produce_operation) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") + .with_source(err) + }) + .unwrap(); let manifest_list_path = self.generate_manifest_list_file_path(0); From 45edea096dc873bcc256ebbb8fed05ba605edf06 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 2 Apr 2025 17:33:50 -0400 Subject: [PATCH 2/2] fix --- crates/iceberg/src/spec/snapshot.rs | 10 ---------- crates/iceberg/src/spec/snapshot_summary.rs | 19 +------------------ 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index e3efab0cec..922e7bab95 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -76,16 +76,6 @@ pub struct Summary { pub additional_properties: HashMap, } -impl Summary { - /// Create new snapshot summary - pub fn new(operation: Operation) -> Self { - Self { - operation, - additional_properties: HashMap::new(), - } - } -} - impl Default for Operation { fn default() -> Operation { Self::Append diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index 847c1b4b80..05f9fb8e63 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -16,11 +16,10 @@ // under the License. use std::collections::HashMap; -use std::sync::Arc; use itertools::Itertools; -use super::{DataContentType, DataFile, PartitionSpecRef, Snapshot, SnapshotRef, TableMetadata}; +use super::{DataContentType, DataFile, PartitionSpecRef}; use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary}; use crate::{Error, ErrorKind, Result}; @@ -517,22 +516,6 @@ fn update_totals( .insert(total_property.to_string(), new_total.to_string()); } -// Returns the ancestors of the current snapshot -#[allow(dead_code)] -fn ancestors_of(current_snapshot: Snapshot, table_metadata: TableMetadata) -> Vec> { - let mut ancestors = Vec::new(); - let mut current: Option = Some(Arc::new(current_snapshot)); - - while let Some(snapshot) = current { - ancestors.push(snapshot.clone()); - current = snapshot - .parent_snapshot_id() - .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id).cloned()); - } - - ancestors -} - #[cfg(test)] mod tests { use std::collections::HashMap;