diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index a7fe9a3974..5cf2afa172 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -41,6 +41,7 @@ pub use manifest_list::*; pub use partition::*; pub use schema::*; pub use snapshot::*; +pub use snapshot_summary::*; pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index fae0486356..05f9fb8e63 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -48,8 +48,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size"; const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count"; const CHANGED_PARTITION_PREFIX: &str = "partitions."; +/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics. +/// It gathers metrics about added or removed data files and manifests, and tracks +/// partition-specific updates. #[derive(Default)] -#[allow(dead_code)] pub struct SnapshotSummaryCollector { metrics: UpdateMetrics, partition_metrics: HashMap, @@ -58,17 +60,19 @@ pub struct SnapshotSummaryCollector { trust_partition_metrics: bool, } -#[allow(dead_code)] impl SnapshotSummaryCollector { - // Set properties + /// Set properties for snapshot summary pub fn set(&mut self, key: &str, value: &str) { self.properties.insert(key.to_string(), value.to_string()); } + /// Sets the limit for including partition summaries. Summaries are not + /// included if the number of partitions is exceeded. pub fn set_partition_summary_limit(&mut self, limit: u64) { self.max_changed_partitions_for_summaries = limit; } + /// Adds a data file to the summary collector pub fn add_file( &mut self, data_file: &DataFile, @@ -81,6 +85,7 @@ impl SnapshotSummaryCollector { } } + /// Removes a data file from the summary collector pub fn remove_file( &mut self, data_file: &DataFile, @@ -93,12 +98,14 @@ impl SnapshotSummaryCollector { } } + /// Adds a manifest to the summary collector pub fn add_manifest(&mut self, manifest: &ManifestFile) { self.trust_partition_metrics = false; self.partition_metrics.clear(); self.metrics.add_manifest(manifest); } + /// Updates partition-specific metrics for a data file. pub fn update_partition_metrics( &mut self, schema: SchemaRef, @@ -116,6 +123,7 @@ impl SnapshotSummaryCollector { } } + /// Merges another `SnapshotSummaryCollector` into the current one pub fn merge(&mut self, summary: SnapshotSummaryCollector) { self.metrics.merge(&summary.metrics); self.properties.extend(summary.properties); @@ -133,6 +141,7 @@ impl SnapshotSummaryCollector { } } + /// Builds final map of summaries pub fn build(&self) -> HashMap { let mut properties = self.metrics.to_map(); let changed_partitions_count = self.partition_metrics.len() as u64; @@ -507,8 +516,6 @@ fn update_totals( .insert(total_property.to_string(), new_total.to_string()); } -// TODO: ancestors of function - #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index e9e9960552..59d69e0f0d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -77,6 +77,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo /// Default value for max number of previous versions to keep. pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; +/// Property key for max number of partitions to keep summary stats for. +pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit"; +/// Default value for the max number of partitions to keep summary stats for. +pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; + /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index e1a2fa0d3b..f266491dba 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,9 +24,11 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct, - StructType, Summary, MAIN_BRANCH, + update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot, + SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, + MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, }; use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; @@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> { Ok(manifest_files) } - // # TODO - // Fulfill this function - fn summary(&self, snapshot_produce_operation: &OP) -> Summary { - Summary { - operation: snapshot_produce_operation.operation(), - additional_properties: self.snapshot_properties.clone(), + // Returns a `Summary` of the current snapshot + fn summary( + &self, + snapshot_produce_operation: &OP, + ) -> Result { + let mut summary_collector = SnapshotSummaryCollector::default(); + let table_metadata = self.tx.table.metadata_ref(); + + let partition_summary_limit = if let Some(limit) = table_metadata + .properties() + .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT) + { + if let Ok(limit) = limit.parse::() { + limit + } else { + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + } + } else { + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + }; + + summary_collector.set_partition_summary_limit(partition_summary_limit); + + for data_file in &self.added_data_files { + summary_collector.add_file( + data_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); } + + let previous_snapshot = table_metadata + .snapshot_by_id(self.snapshot_id) + .and_then(|snapshot| snapshot.parent_snapshot_id()) + .and_then(|parent_id| table_metadata.snapshot_by_id(parent_id)); + + let mut additional_properties = summary_collector.build(); + additional_properties.extend(self.snapshot_properties.clone()); + + let summary = Summary { + operation: snapshot_produce_operation.operation(), + additional_properties, + }; + + update_snapshot_summaries( + summary, + previous_snapshot.map(|s| s.summary()), + snapshot_produce_operation.operation() == Operation::Overwrite, + ) } fn generate_manifest_list_file_path(&self, attempt: i64) -> String { @@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> { .await?; let next_seq_num = self.tx.table.metadata().next_sequence_number(); - let summary = self.summary(&snapshot_produce_operation); + let summary = self + .summary(&snapshot_produce_operation) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") + .with_source(err) + }) + .unwrap(); let manifest_list_path = self.generate_manifest_list_file_path(0);