From 007d86c84d7b10f62ee16fe9e31cd3a20e492762 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 18 Mar 2025 21:41:55 -0400 Subject: [PATCH 1/4] feat: Support `PartitionStats` --- crates/iceberg/src/spec/statistic_file.rs | 145 ++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/crates/iceberg/src/spec/statistic_file.rs b/crates/iceberg/src/spec/statistic_file.rs index 4d806f6e4d..7c3b3237b1 100644 --- a/crates/iceberg/src/spec/statistic_file.rs +++ b/crates/iceberg/src/spec/statistic_file.rs @@ -21,6 +21,11 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; +use crate::spec::{DataContentType, DataFile, Snapshot, Struct}; +use crate::{Error, ErrorKind}; + +static STATS_COUNT: u32 = 12; + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] /// Represents a statistics file @@ -69,6 +74,146 @@ pub struct PartitionStatisticsFile { pub file_size_in_bytes: i64, } +#[derive(Clone, Debug, PartialEq, Eq)] +/// Statistics for partition pruning +pub struct PartitionStats { + partition: Struct, + spec_id: i32, + data_record_count: u64, + data_file_count: u32, + total_data_file_size_in_bytes: u64, + position_delete_record_count: u64, + position_delete_file_count: u32, + equality_delete_record_count: u64, + equality_delete_file_count: u32, + total_record_count: u64, + last_updated_at: Option, + last_updated_snapshot_id: Option, +} + +impl PartitionStats { + /// Creates new `PartitionStats` instance based on partition struct + /// spec id + pub fn new(partition: Struct, spec_id: i32) -> Self { + Self { + partition, + spec_id, + data_record_count: 0, + data_file_count: 0, + total_data_file_size_in_bytes: 0, + position_delete_record_count: 0, + position_delete_file_count: 0, + equality_delete_record_count: 0, + equality_delete_file_count: 0, + total_record_count: 0, + last_updated_at: None, + last_updated_snapshot_id: None, + } + } + + /// Returns partition struct + pub fn partition(&self) -> &Struct { + &self.partition + } + + /// Returns partition spec id + pub fn spec_id(&self) -> i32 { + self.spec_id + } + + /// Returns the total number of data records in the partition. + pub fn data_record_count(&self) -> u64 { + self.data_record_count + } + + /// Returns the number of data files in the partition + pub fn data_file_count(&self) -> u32 { + self.data_file_count + } + + /// Returns the total size in bytes of all data files in the partition + pub fn total_data_file_size_in_bytes(&self) -> u64 { + self.total_data_file_size_in_bytes + } + + /// Returns the total number of records in position delete files + pub fn position_delete_record_count(&self) -> u64 { + self.position_delete_record_count + } + + /// Returns the number of position delete files + pub fn position_delete_file_count(&self) -> u32 { + self.position_delete_file_count + } + + /// Returns the total number of records in equality delete files + pub fn equality_delete_record_count(&self) -> u64 { + self.equality_delete_record_count + } + + /// Returns the number of equality delete files + pub fn equality_delete_file_count(&self) -> u32 { + self.equality_delete_file_count + } + + /// Returns the total record count in the partition + pub fn total_record_count(&self) -> u64 { + self.total_record_count + } + + /// Returns the timestamp of the last snapshot update + pub fn last_updated_at(&self) -> Option { + self.last_updated_at + } + + /// Returns the snapshot id of the last update + pub fn last_updated_snapshot_id(&self) -> Option { + self.last_updated_snapshot_id + } + + /// Updates the partition statistics based on the given `DataFile` and its corresponding `Snapshot`. + pub fn live_entry(&mut self, file: DataFile, snapshot: Snapshot) -> Result<(), Error> { + if file.partition_spec_id != self.spec_id { + return Err(Error::new( + ErrorKind::Unexpected, + "Spec IDs must match.", + )); + } + + match file.content_type() { + DataContentType::Data => { + self.data_record_count += file.record_count(); + self.data_file_count += 1; + self.total_data_file_size_in_bytes += file.file_size_in_bytes(); + } + DataContentType::PositionDeletes => { + self.position_delete_record_count += file.record_count(); + self.position_delete_file_count += 1; + } + DataContentType::EqualityDeletes => { + self.equality_delete_record_count += file.record_count(); + self.equality_delete_file_count += 1; + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + "Unsupported file content type:" + )); + } + } + + self.update_snapshot_info(snapshot.snapshot_id(), snapshot.timestamp_ms()); + Ok(()) + } + + fn update_snapshot_info(&mut self, snapshot_id: i64, updated_at: i64) { + if self.last_updated_at.is_none() || self.last_updated_at.unwrap() < updated_at { + self.last_updated_at = Some(updated_at); + self.last_updated_snapshot_id = Some(snapshot_id); + } + } +} + #[cfg(test)] mod test { use std::fmt::Debug; From c46da88be94a4e7616c16376150308177a3464af Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 18 Mar 2025 22:03:30 -0400 Subject: [PATCH 2/4] test + clippy + fmt --- crates/iceberg/src/spec/statistic_file.rs | 186 ++++++++++++++++++++-- 1 file changed, 171 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/statistic_file.rs b/crates/iceberg/src/spec/statistic_file.rs index 7c3b3237b1..ca8a7b6276 100644 --- a/crates/iceberg/src/spec/statistic_file.rs +++ b/crates/iceberg/src/spec/statistic_file.rs @@ -24,8 +24,6 @@ use serde::{Deserialize, Serialize}; use crate::spec::{DataContentType, DataFile, Snapshot, Struct}; use crate::{Error, ErrorKind}; -static STATS_COUNT: u32 = 12; - #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] /// Represents a statistics file @@ -92,7 +90,7 @@ pub struct PartitionStats { } impl PartitionStats { - /// Creates new `PartitionStats` instance based on partition struct + /// Creates new `PartitionStats` instance based on partition struct /// spec id pub fn new(partition: Struct, spec_id: i32) -> Self { Self { @@ -174,12 +172,9 @@ impl PartitionStats { /// Updates the partition statistics based on the given `DataFile` and its corresponding `Snapshot`. pub fn live_entry(&mut self, file: DataFile, snapshot: Snapshot) -> Result<(), Error> { if file.partition_spec_id != self.spec_id { - return Err(Error::new( - ErrorKind::Unexpected, - "Spec IDs must match.", - )); + return Err(Error::new(ErrorKind::Unexpected, "Spec IDs must match.")); } - + match file.content_type() { DataContentType::Data => { self.data_record_count += file.record_count(); @@ -194,14 +189,8 @@ impl PartitionStats { self.equality_delete_record_count += file.record_count(); self.equality_delete_file_count += 1; } - _ => { - return Err(Error::new( - ErrorKind::Unexpected, - "Unsupported file content type:" - )); - } } - + self.update_snapshot_info(snapshot.snapshot_id(), snapshot.timestamp_ms()); Ok(()) } @@ -222,6 +211,7 @@ mod test { use serde_json::json; use super::*; + use crate::spec::{DataFileFormat, Datum, Literal, Operation, Summary}; fn test_serde_json( json: serde_json::Value, @@ -338,4 +328,170 @@ mod test { }, ); } + + #[test] + fn test_partition_stats() -> Result<(), Error> { + let partition = Struct::from_iter(vec![Some(Literal::string("x"))]); + + let spec_id = 0; + let mut stats = PartitionStats::new(partition.clone(), spec_id); + + // test data file + let snapshot1 = Snapshot::builder() + .with_snapshot_id(1) + .with_sequence_number(0) + .with_timestamp_ms(1000) + .with_manifest_list("manifest_list_path".to_string()) + .with_summary(Summary { + operation: Operation::default(), + additional_properties: HashMap::new(), + }) + .build(); + + let data_file = DataFile { + content: DataContentType::Data, + file_path: "test.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: partition.clone(), + record_count: 1, + file_size_in_bytes: 874, + column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]), + value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([ + (1, Datum::long(1)), + (2, Datum::string("a")), + (3, Datum::string("x")), + ]), + upper_bounds: HashMap::from([ + (1, Datum::long(1)), + (2, Datum::string("a")), + (3, Datum::string("x")), + ]), + key_metadata: None, + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + partition_spec_id: spec_id, + }; + stats.live_entry(data_file, snapshot1.clone())?; + assert_eq!(stats.data_record_count(), 1); + assert_eq!(stats.data_file_count(), 1); + assert_eq!(stats.total_data_file_size_in_bytes(), 874); + assert_eq!(stats.last_updated_snapshot_id(), Some(1)); + assert_eq!(stats.last_updated_at(), Some(1000)); + + // test position delete file + let snapshot2 = Snapshot::builder() + .with_snapshot_id(2) + .with_sequence_number(1) + .with_timestamp_ms(2000) + .with_manifest_list("manifest_list_path".to_string()) + .with_summary(Summary { + operation: Operation::default(), + additional_properties: HashMap::new(), + }) + .build(); + + let posdel_file = DataFile { + content: DataContentType::PositionDeletes, + file_path: "test.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: partition.clone(), + record_count: 5, + file_size_in_bytes: 500, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: vec![10], + equality_ids: vec![], + sort_order_id: None, + partition_spec_id: spec_id, + }; + + stats.live_entry(posdel_file, snapshot2.clone())?; + assert_eq!(stats.position_delete_record_count(), 5); + assert_eq!(stats.position_delete_file_count(), 1); + + assert_eq!(stats.last_updated_snapshot_id(), Some(2)); + assert_eq!(stats.last_updated_at(), Some(2000)); + + // test equality delete file + let snapshot3 = Snapshot::builder() + .with_snapshot_id(3) + .with_sequence_number(2) + .with_timestamp_ms(3000) + .with_manifest_list("manifest_list_path".to_string()) + .with_summary(Summary { + operation: Operation::default(), + additional_properties: HashMap::new(), + }) + .build(); + + let eqdel_file = DataFile { + content: DataContentType::EqualityDeletes, + file_path: "test.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: partition.clone(), + record_count: 3, + file_size_in_bytes: 300, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: vec![15], + equality_ids: vec![], + sort_order_id: None, + partition_spec_id: spec_id, + }; + stats.live_entry(eqdel_file, snapshot3.clone())?; + assert_eq!(stats.equality_delete_record_count(), 3); + assert_eq!(stats.equality_delete_file_count(), 1); + assert_eq!(stats.last_updated_snapshot_id(), Some(3)); + assert_eq!(stats.last_updated_at(), Some(3000)); + + let snapshot4 = Snapshot::builder() + .with_snapshot_id(4) + .with_sequence_number(3) + .with_timestamp_ms(4000) + .with_manifest_list("manifest_list_path".to_string()) + .with_summary(Summary { + operation: Operation::default(), + additional_properties: HashMap::new(), + }) + .build(); + + let wrong_file = DataFile { + content: DataContentType::Data, + file_path: "test.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition, + record_count: 2, + file_size_in_bytes: 900, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: vec![20], + equality_ids: vec![], + sort_order_id: Some(0), + partition_spec_id: spec_id + 1, // mismatch spec id. + }; + + let result = stats.live_entry(wrong_file, snapshot4); + assert!(result.is_err()); + + Ok(()) + } } From b9f5a35b668888529105a21dddffa502e90f6c51 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 7 May 2025 13:24:21 -0700 Subject: [PATCH 3/4] clippy --- crates/iceberg/src/spec/statistic_file.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/crates/iceberg/src/spec/statistic_file.rs b/crates/iceberg/src/spec/statistic_file.rs index ca8a7b6276..ac62a36fe4 100644 --- a/crates/iceberg/src/spec/statistic_file.rs +++ b/crates/iceberg/src/spec/statistic_file.rs @@ -374,6 +374,10 @@ mod test { equality_ids: vec![], sort_order_id: Some(0), partition_spec_id: spec_id, + content_offset: None, + content_size_in_bytes: None, + first_row_id: None, + referenced_data_file: None, }; stats.live_entry(data_file, snapshot1.clone())?; assert_eq!(stats.data_record_count(), 1); @@ -412,6 +416,10 @@ mod test { equality_ids: vec![], sort_order_id: None, partition_spec_id: spec_id, + content_offset: None, + content_size_in_bytes: None, + first_row_id: None, + referenced_data_file: None, }; stats.live_entry(posdel_file, snapshot2.clone())?; @@ -451,6 +459,10 @@ mod test { equality_ids: vec![], sort_order_id: None, partition_spec_id: spec_id, + content_offset: None, + content_size_in_bytes: None, + first_row_id: None, + referenced_data_file: None, }; stats.live_entry(eqdel_file, snapshot3.clone())?; assert_eq!(stats.equality_delete_record_count(), 3); @@ -487,6 +499,10 @@ mod test { equality_ids: vec![], sort_order_id: Some(0), partition_spec_id: spec_id + 1, // mismatch spec id. + content_offset: None, + content_size_in_bytes: None, + first_row_id: None, + referenced_data_file: None, }; let result = stats.live_entry(wrong_file, snapshot4); From b8ed5d5cf520dda455aedca25d2b231d9f9b26bf Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 20 May 2025 18:57:39 -0400 Subject: [PATCH 4/4] minor fixes --- crates/iceberg/src/spec/statistic_file.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/spec/statistic_file.rs b/crates/iceberg/src/spec/statistic_file.rs index ac62a36fe4..e60fb35e40 100644 --- a/crates/iceberg/src/spec/statistic_file.rs +++ b/crates/iceberg/src/spec/statistic_file.rs @@ -24,9 +24,9 @@ use serde::{Deserialize, Serialize}; use crate::spec::{DataContentType, DataFile, Snapshot, Struct}; use crate::{Error, ErrorKind}; +/// Represents a statistics file #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] -/// Represents a statistics file pub struct StatisticsFile { /// The snapshot id of the statistics file. pub snapshot_id: i64, @@ -43,9 +43,9 @@ pub struct StatisticsFile { pub blob_metadata: Vec, } +/// Represents a blob of metadata, which is a part of a statistics file #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] -/// Represents a blob of metadata, which is a part of a statistics file pub struct BlobMetadata { /// Type of the blob. pub r#type: String, @@ -60,9 +60,9 @@ pub struct BlobMetadata { pub properties: HashMap, } +/// Statistics file for a partition #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] -/// Statistics file for a partition pub struct PartitionStatisticsFile { /// The snapshot id of the statistics file. pub snapshot_id: i64, @@ -72,8 +72,8 @@ pub struct PartitionStatisticsFile { pub file_size_in_bytes: i64, } -#[derive(Clone, Debug, PartialEq, Eq)] /// Statistics for partition pruning +#[derive(Clone, Debug, PartialEq, Eq)] pub struct PartitionStats { partition: Struct, spec_id: i32, @@ -170,7 +170,7 @@ impl PartitionStats { } /// Updates the partition statistics based on the given `DataFile` and its corresponding `Snapshot`. - pub fn live_entry(&mut self, file: DataFile, snapshot: Snapshot) -> Result<(), Error> { + pub fn add_stats_for_file(&mut self, file: DataFile, snapshot: Snapshot) -> Result<(), Error> { if file.partition_spec_id != self.spec_id { return Err(Error::new(ErrorKind::Unexpected, "Spec IDs must match.")); } @@ -379,7 +379,7 @@ mod test { first_row_id: None, referenced_data_file: None, }; - stats.live_entry(data_file, snapshot1.clone())?; + stats.add_stats_for_file(data_file, snapshot1.clone())?; assert_eq!(stats.data_record_count(), 1); assert_eq!(stats.data_file_count(), 1); assert_eq!(stats.total_data_file_size_in_bytes(), 874); @@ -422,7 +422,7 @@ mod test { referenced_data_file: None, }; - stats.live_entry(posdel_file, snapshot2.clone())?; + stats.add_stats_for_file(posdel_file, snapshot2.clone())?; assert_eq!(stats.position_delete_record_count(), 5); assert_eq!(stats.position_delete_file_count(), 1); @@ -464,7 +464,7 @@ mod test { first_row_id: None, referenced_data_file: None, }; - stats.live_entry(eqdel_file, snapshot3.clone())?; + stats.add_stats_for_file(eqdel_file, snapshot3.clone())?; assert_eq!(stats.equality_delete_record_count(), 3); assert_eq!(stats.equality_delete_file_count(), 1); assert_eq!(stats.last_updated_snapshot_id(), Some(3)); @@ -505,7 +505,7 @@ mod test { referenced_data_file: None, }; - let result = stats.live_entry(wrong_file, snapshot4); + let result = stats.add_stats_for_file(wrong_file, snapshot4); assert!(result.is_err()); Ok(())