Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use manifest_list::*;
pub use partition::*;
pub use schema::*;
pub use snapshot::*;
pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
Expand Down
17 changes: 12 additions & 5 deletions crates/iceberg/src/spec/snapshot_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size";
const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
const CHANGED_PARTITION_PREFIX: &str = "partitions.";

/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
/// It gathers metrics about added or removed data files and manifests, and tracks
/// partition-specific updates.
#[derive(Default)]
#[allow(dead_code)]
pub struct SnapshotSummaryCollector {
metrics: UpdateMetrics,
partition_metrics: HashMap<String, UpdateMetrics>,
Expand All @@ -58,17 +60,19 @@ pub struct SnapshotSummaryCollector {
trust_partition_metrics: bool,
}

#[allow(dead_code)]
impl SnapshotSummaryCollector {
// Set properties
/// Set properties for snapshot summary
pub fn set(&mut self, key: &str, value: &str) {
self.properties.insert(key.to_string(), value.to_string());
}

/// Sets the limit for including partition summaries. Summaries are not
/// included if the number of partitions is exceeded.
pub fn set_partition_summary_limit(&mut self, limit: u64) {
self.max_changed_partitions_for_summaries = limit;
}

/// Adds a data file to the summary collector
pub fn add_file(
&mut self,
data_file: &DataFile,
Expand All @@ -81,6 +85,7 @@ impl SnapshotSummaryCollector {
}
}

/// Removes a data file from the summary collector
pub fn remove_file(
&mut self,
data_file: &DataFile,
Expand All @@ -93,12 +98,14 @@ impl SnapshotSummaryCollector {
}
}

/// Adds a manifest to the summary collector
pub fn add_manifest(&mut self, manifest: &ManifestFile) {
self.trust_partition_metrics = false;
self.partition_metrics.clear();
self.metrics.add_manifest(manifest);
}

/// Updates partition-specific metrics for a data file.
pub fn update_partition_metrics(
&mut self,
schema: SchemaRef,
Expand All @@ -116,6 +123,7 @@ impl SnapshotSummaryCollector {
}
}

/// Merges another `SnapshotSummaryCollector` into the current one
pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
self.metrics.merge(&summary.metrics);
self.properties.extend(summary.properties);
Expand All @@ -133,6 +141,7 @@ impl SnapshotSummaryCollector {
}
}

/// Builds final map of summaries
pub fn build(&self) -> HashMap<String, String> {
let mut properties = self.metrics.to_map();
let changed_partitions_count = self.partition_metrics.len() as u64;
Expand Down Expand Up @@ -507,8 +516,6 @@ fn update_totals(
.insert(total_property.to_string(), new_total.to_string());
}

// TODO: ancestors of function

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo
/// Default value for max number of previous versions to keep.
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;

/// Property key for max number of partitions to keep summary stats for.
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
/// Default value for the max number of partitions to keep summary stats for.
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;

/// Reserved Iceberg table properties list.
///
/// Reserved table properties are only used to control behaviors when creating or updating a
Expand Down
70 changes: 60 additions & 10 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct,
StructType, Summary, MAIN_BRANCH,
update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
};
use crate::transaction::Transaction;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
Expand Down Expand Up @@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> {
Ok(manifest_files)
}

// # TODO
// Fulfill this function
fn summary<OP: SnapshotProduceOperation>(&self, snapshot_produce_operation: &OP) -> Summary {
Summary {
operation: snapshot_produce_operation.operation(),
additional_properties: self.snapshot_properties.clone(),
// Returns a `Summary` of the current snapshot
fn summary<OP: SnapshotProduceOperation>(
&self,
snapshot_produce_operation: &OP,
) -> Result<Summary> {
let mut summary_collector = SnapshotSummaryCollector::default();
let table_metadata = self.tx.table.metadata_ref();

let partition_summary_limit = if let Some(limit) = table_metadata
.properties()
.get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
{
if let Ok(limit) = limit.parse::<u64>() {
limit
} else {
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
}
} else {
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
};

summary_collector.set_partition_summary_limit(partition_summary_limit);

for data_file in &self.added_data_files {
summary_collector.add_file(
data_file,
table_metadata.current_schema().clone(),
table_metadata.default_partition_spec().clone(),
);
}

let previous_snapshot = table_metadata
.snapshot_by_id(self.snapshot_id)
.and_then(|snapshot| snapshot.parent_snapshot_id())
.and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));

let mut additional_properties = summary_collector.build();
additional_properties.extend(self.snapshot_properties.clone());

let summary = Summary {
operation: snapshot_produce_operation.operation(),
additional_properties,
};

update_snapshot_summaries(
summary,
previous_snapshot.map(|s| s.summary()),
snapshot_produce_operation.operation() == Operation::Overwrite,
)
}

fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
Expand All @@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> {
.await?;
let next_seq_num = self.tx.table.metadata().next_sequence_number();

let summary = self.summary(&snapshot_produce_operation);
let summary = self
.summary(&snapshot_produce_operation)
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.")
.with_source(err)
})
.unwrap();

let manifest_list_path = self.generate_manifest_list_file_path(0);

Expand Down
Loading