Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 320 additions & 3 deletions crates/iceberg/src/spec/statistic_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::spec::{DataContentType, DataFile, Snapshot, Struct};
use crate::{Error, ErrorKind};

/// Represents a statistics file
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
/// Represents a statistics file
pub struct StatisticsFile {
/// The snapshot id of the statistics file.
pub snapshot_id: i64,
Expand All @@ -40,9 +43,9 @@ pub struct StatisticsFile {
pub blob_metadata: Vec<BlobMetadata>,
}

/// Represents a blob of metadata, which is a part of a statistics file
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
/// Represents a blob of metadata, which is a part of a statistics file
pub struct BlobMetadata {
/// Type of the blob.
pub r#type: String,
Expand All @@ -57,9 +60,9 @@ pub struct BlobMetadata {
pub properties: HashMap<String, String>,
}

/// Statistics file for a partition
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
/// Statistics file for a partition
pub struct PartitionStatisticsFile {
/// The snapshot id of the statistics file.
pub snapshot_id: i64,
Expand All @@ -69,6 +72,137 @@ pub struct PartitionStatisticsFile {
pub file_size_in_bytes: i64,
}

/// Statistics for partition pruning
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PartitionStats {
partition: Struct,
spec_id: i32,
data_record_count: u64,
data_file_count: u32,
total_data_file_size_in_bytes: u64,
position_delete_record_count: u64,
position_delete_file_count: u32,
equality_delete_record_count: u64,
equality_delete_file_count: u32,
total_record_count: u64,
last_updated_at: Option<i64>,
last_updated_snapshot_id: Option<i64>,
}

impl PartitionStats {
/// Creates new `PartitionStats` instance based on partition struct
/// spec id
pub fn new(partition: Struct, spec_id: i32) -> Self {
Self {
partition,
spec_id,
data_record_count: 0,
data_file_count: 0,
total_data_file_size_in_bytes: 0,
position_delete_record_count: 0,
position_delete_file_count: 0,
equality_delete_record_count: 0,
equality_delete_file_count: 0,
total_record_count: 0,
last_updated_at: None,
last_updated_snapshot_id: None,
}
}

/// Returns partition struct
pub fn partition(&self) -> &Struct {
&self.partition
}

/// Returns partition spec id
pub fn spec_id(&self) -> i32 {
self.spec_id
}

/// Returns the total number of data records in the partition.
pub fn data_record_count(&self) -> u64 {
self.data_record_count
}

/// Returns the number of data files in the partition
pub fn data_file_count(&self) -> u32 {
self.data_file_count
}

/// Returns the total size in bytes of all data files in the partition
pub fn total_data_file_size_in_bytes(&self) -> u64 {
self.total_data_file_size_in_bytes
}

/// Returns the total number of records in position delete files
pub fn position_delete_record_count(&self) -> u64 {
self.position_delete_record_count
}

/// Returns the number of position delete files
pub fn position_delete_file_count(&self) -> u32 {
self.position_delete_file_count
}

/// Returns the total number of records in equality delete files
pub fn equality_delete_record_count(&self) -> u64 {
self.equality_delete_record_count
}

/// Returns the number of equality delete files
pub fn equality_delete_file_count(&self) -> u32 {
self.equality_delete_file_count
}

/// Returns the total record count in the partition
pub fn total_record_count(&self) -> u64 {
self.total_record_count
}

/// Returns the timestamp of the last snapshot update
pub fn last_updated_at(&self) -> Option<i64> {
self.last_updated_at
}

/// Returns the snapshot id of the last update
pub fn last_updated_snapshot_id(&self) -> Option<i64> {
self.last_updated_snapshot_id
}

/// Updates the partition statistics based on the given `DataFile` and its corresponding `Snapshot`.
pub fn add_stats_for_file(&mut self, file: DataFile, snapshot: Snapshot) -> Result<(), Error> {
if file.partition_spec_id != self.spec_id {
return Err(Error::new(ErrorKind::Unexpected, "Spec IDs must match."));
}

match file.content_type() {
DataContentType::Data => {
self.data_record_count += file.record_count();
self.data_file_count += 1;
self.total_data_file_size_in_bytes += file.file_size_in_bytes();
}
DataContentType::PositionDeletes => {
self.position_delete_record_count += file.record_count();
self.position_delete_file_count += 1;
}
DataContentType::EqualityDeletes => {
self.equality_delete_record_count += file.record_count();
self.equality_delete_file_count += 1;
}
}

self.update_snapshot_info(snapshot.snapshot_id(), snapshot.timestamp_ms());
Ok(())
}

fn update_snapshot_info(&mut self, snapshot_id: i64, updated_at: i64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only use update_snapshot_info internally, is it better to have seperate APIs like update_with_data_file and refresh_with_snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe its safer to just have update_snapshot_info kept like this because if users are going to update the statistics for the latest snapshot this should always be called alongside it. I think it is quite dangerous to not update to latest snapshot when updating because it is incorrect if the user forgets to call the API to refresh. It will also be used for other partition stat updates as seen in the java implementation here.

if self.last_updated_at.is_none() || self.last_updated_at.unwrap() < updated_at {
self.last_updated_at = Some(updated_at);
self.last_updated_snapshot_id = Some(snapshot_id);
}
}
}

#[cfg(test)]
mod test {
use std::fmt::Debug;
Expand All @@ -77,6 +211,7 @@ mod test {
use serde_json::json;

use super::*;
use crate::spec::{DataFileFormat, Datum, Literal, Operation, Summary};

fn test_serde_json<T: Serialize + DeserializeOwned + PartialEq + Debug>(
json: serde_json::Value,
Expand Down Expand Up @@ -193,4 +328,186 @@ mod test {
},
);
}

#[test]
fn test_partition_stats() -> Result<(), Error> {
let partition = Struct::from_iter(vec![Some(Literal::string("x"))]);

let spec_id = 0;
let mut stats = PartitionStats::new(partition.clone(), spec_id);

// test data file
let snapshot1 = Snapshot::builder()
.with_snapshot_id(1)
.with_sequence_number(0)
.with_timestamp_ms(1000)
.with_manifest_list("manifest_list_path".to_string())
.with_summary(Summary {
operation: Operation::default(),
additional_properties: HashMap::new(),
})
.build();

let data_file = DataFile {
content: DataContentType::Data,
file_path: "test.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: partition.clone(),
record_count: 1,
file_size_in_bytes: 874,
column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::string("a")),
(3, Datum::string("x")),
]),
upper_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::string("a")),
(3, Datum::string("x")),
]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
partition_spec_id: spec_id,
content_offset: None,
content_size_in_bytes: None,
first_row_id: None,
referenced_data_file: None,
};
stats.add_stats_for_file(data_file, snapshot1.clone())?;
assert_eq!(stats.data_record_count(), 1);
assert_eq!(stats.data_file_count(), 1);
assert_eq!(stats.total_data_file_size_in_bytes(), 874);
assert_eq!(stats.last_updated_snapshot_id(), Some(1));
assert_eq!(stats.last_updated_at(), Some(1000));

// test position delete file
let snapshot2 = Snapshot::builder()
.with_snapshot_id(2)
.with_sequence_number(1)
.with_timestamp_ms(2000)
.with_manifest_list("manifest_list_path".to_string())
.with_summary(Summary {
operation: Operation::default(),
additional_properties: HashMap::new(),
})
.build();

let posdel_file = DataFile {
content: DataContentType::PositionDeletes,
file_path: "test.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: partition.clone(),
record_count: 5,
file_size_in_bytes: 500,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![10],
equality_ids: vec![],
sort_order_id: None,
partition_spec_id: spec_id,
content_offset: None,
content_size_in_bytes: None,
first_row_id: None,
referenced_data_file: None,
};

stats.add_stats_for_file(posdel_file, snapshot2.clone())?;
assert_eq!(stats.position_delete_record_count(), 5);
assert_eq!(stats.position_delete_file_count(), 1);

assert_eq!(stats.last_updated_snapshot_id(), Some(2));
assert_eq!(stats.last_updated_at(), Some(2000));

// test equality delete file
let snapshot3 = Snapshot::builder()
.with_snapshot_id(3)
.with_sequence_number(2)
.with_timestamp_ms(3000)
.with_manifest_list("manifest_list_path".to_string())
.with_summary(Summary {
operation: Operation::default(),
additional_properties: HashMap::new(),
})
.build();

let eqdel_file = DataFile {
content: DataContentType::EqualityDeletes,
file_path: "test.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: partition.clone(),
record_count: 3,
file_size_in_bytes: 300,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![15],
equality_ids: vec![],
sort_order_id: None,
partition_spec_id: spec_id,
content_offset: None,
content_size_in_bytes: None,
first_row_id: None,
referenced_data_file: None,
};
stats.add_stats_for_file(eqdel_file, snapshot3.clone())?;
assert_eq!(stats.equality_delete_record_count(), 3);
assert_eq!(stats.equality_delete_file_count(), 1);
assert_eq!(stats.last_updated_snapshot_id(), Some(3));
assert_eq!(stats.last_updated_at(), Some(3000));

let snapshot4 = Snapshot::builder()
.with_snapshot_id(4)
.with_sequence_number(3)
.with_timestamp_ms(4000)
.with_manifest_list("manifest_list_path".to_string())
.with_summary(Summary {
operation: Operation::default(),
additional_properties: HashMap::new(),
})
.build();

let wrong_file = DataFile {
content: DataContentType::Data,
file_path: "test.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition,
record_count: 2,
file_size_in_bytes: 900,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![20],
equality_ids: vec![],
sort_order_id: Some(0),
partition_spec_id: spec_id + 1, // mismatch spec id.
content_offset: None,
content_size_in_bytes: None,
first_row_id: None,
referenced_data_file: None,
};

let result = stats.add_stats_for_file(wrong_file, snapshot4);
assert!(result.is_err());

Ok(())
}
}
Loading