diff --git a/crates/iceberg/src/spec/manifest/_serde.rs b/crates/iceberg/src/spec/manifest/_serde.rs index fd7bc2e69a..99d3128db5 100644 --- a/crates/iceberg/src/spec/manifest/_serde.rs +++ b/crates/iceberg/src/spec/manifest/_serde.rs @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use super::{Datum, ManifestEntry, Schema, Struct}; -use crate::spec::{Literal, RawLiteral, StructType, Type}; +use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type}; use crate::{Error, ErrorKind}; #[derive(Serialize, Deserialize)] @@ -40,7 +40,7 @@ impl ManifestEntryV2 { snapshot_id: value.snapshot_id, sequence_number: value.sequence_number, file_sequence_number: value.file_sequence_number, - data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?, }) } @@ -74,7 +74,7 @@ impl ManifestEntryV1 { Ok(Self { status: value.status as i32, snapshot_id: value.snapshot_id.unwrap_or_default(), - data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?, }) } @@ -129,9 +129,13 @@ impl DataFileSerde { pub fn try_from( value: super::DataFile, partition_type: &StructType, - is_version_1: bool, + format_version: FormatVersion, ) -> Result { - let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; + let block_size_in_bytes = if format_version == FormatVersion::V1 { + Some(0) + } else { + None + }; Ok(Self { content: value.content as i32, file_path: value.file_path, @@ -292,8 +296,9 @@ fn parse_i64_entry(v: Vec) -> Result, Error> { Ok(m) } +#[allow(unused_mut)] fn to_i64_entry(entries: HashMap) -> Result, Error> { - entries + let mut i64_entries = entries .iter() .map(|e| { Ok(I64Entry { @@ -301,7 +306,13 @@ fn to_i64_entry(entries: HashMap) -> Result, Error> { value: (*e.1).try_into()?, }) }) - .collect() + .collect::, Error>>()?; + + // Ensure that the order is deterministic during testing + #[cfg(test)] + i64_entries.sort_by_key(|e| e.key); + + Ok(i64_entries) } #[cfg(test)] diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 1de59a3874..9ea1fcd0a7 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -297,8 +297,12 @@ pub fn write_data_files_to_avro( let mut writer = AvroWriter::new(&avro_schema, writer); for data_file in data_files { - let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)? - .resolve(&avro_schema)?; + let value = to_value(DataFileSerde::try_from( + data_file, + partition_type, + FormatVersion::V1, + )?)? + .resolve(&avro_schema)?; writer.append(value)?; } diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 33b7d38706..da03977324 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -34,6 +34,7 @@ use super::{ UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; +use crate::{Error, ErrorKind}; /// A manifest contains metadata and a list of entries. #[derive(Debug, PartialEq, Eq, Clone)] @@ -119,12 +120,47 @@ impl Manifest { } } +/// Serialize a DataFile to a JSON string. +pub fn serialize_data_file_to_json( + data_file: DataFile, + partition_type: &super::StructType, + format_version: FormatVersion, +) -> Result { + let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?; + serde_json::to_string(&serde).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to serialize DataFile to JSON!".to_string(), + ) + .with_source(e) + }) +} + +/// Deserialize a DataFile from a JSON string. +pub fn deserialize_data_file_from_json( + json: &str, + partition_spec_id: i32, + partition_type: &super::StructType, + schema: &Schema, +) -> Result { + let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to deserialize JSON to DataFile!".to_string(), + ) + .with_source(e) + })?; + + serde.try_into(partition_spec_id, partition_type, schema) +} + #[cfg(test)] mod tests { use std::collections::HashMap; use std::fs; use std::sync::Arc; + use serde_json::Value; use tempfile::TempDir; use super::*; @@ -1056,4 +1092,159 @@ mod tests { assert!(!partitions[2].clone().contains_null); assert_eq!(partitions[2].clone().contains_nan, Some(false)); } + + #[test] + fn test_data_file_serialization() { + // Create a simple schema + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + // Create a partition spec + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("id", "id_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Get partition type from the partition spec + let partition_type = partition_spec.partition_type(&schema).unwrap(); + + // Create a vector of DataFile objects + let data_files = vec![ + DataFileBuilder::default() + .content(DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file1.parquet".to_string()) + .file_size_in_bytes(1024) + .record_count(100) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 512), (2, 1024)])) + .value_counts(HashMap::from([(1, 100), (2, 500)])) + .null_value_counts(HashMap::from([(1, 0), (2, 1)])) + .build() + .unwrap(), + DataFileBuilder::default() + .content(DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file2.parquet".to_string()) + .file_size_in_bytes(2048) + .record_count(200) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 1024), (2, 2048)])) + .value_counts(HashMap::from([(1, 200), (2, 600)])) + .null_value_counts(HashMap::from([(1, 10), (2, 999)])) + .build() + .unwrap(), + ]; + + // Serialize the DataFile objects + let serialized_files = data_files + .clone() + .into_iter() + .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap()) + .collect::>(); + + // Verify we have the expected serialized files + assert_eq!(serialized_files.len(), 2); + let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap(); + let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap(); + let expected_serialized_file1 = serde_json::json!({ + "content": 0, + "file_path": "path/to/file1.parquet", + "file_format": "PARQUET", + "partition": {}, + "record_count": 100, + "file_size_in_bytes": 1024, + "column_sizes": [ + { "key": 1, "value": 512 }, + { "key": 2, "value": 1024 } + ], + "value_counts": [ + { "key": 1, "value": 100 }, + { "key": 2, "value": 500 } + ], + "null_value_counts": [ + { "key": 1, "value": 0 }, + { "key": 2, "value": 1 } + ], + "nan_value_counts": [], + "lower_bounds": [], + "upper_bounds": [], + "key_metadata": null, + "split_offsets": [], + "equality_ids": [], + "sort_order_id": null, + "first_row_id": null, + "referenced_data_file": null, + "content_offset": null, + "content_size_in_bytes": null + }); + let expected_serialized_file2 = serde_json::json!({ + "content": 0, + "file_path": "path/to/file2.parquet", + "file_format": "PARQUET", + "partition": {}, + "record_count": 200, + "file_size_in_bytes": 2048, + "column_sizes": [ + { "key": 1, "value": 1024 }, + { "key": 2, "value": 2048 } + ], + "value_counts": [ + { "key": 1, "value": 200 }, + { "key": 2, "value": 600 } + ], + "null_value_counts": [ + { "key": 1, "value": 10 }, + { "key": 2, "value": 999 } + ], + "nan_value_counts": [], + "lower_bounds": [], + "upper_bounds": [], + "key_metadata": null, + "split_offsets": [], + "equality_ids": [], + "sort_order_id": null, + "first_row_id": null, + "referenced_data_file": null, + "content_offset": null, + "content_size_in_bytes": null + }); + assert_eq!(pretty_json1, expected_serialized_file1); + assert_eq!(pretty_json2, expected_serialized_file2); + + // Now deserialize the JSON strings back into DataFile objects + let deserialized_files: Vec = serialized_files + .into_iter() + .map(|json| { + deserialize_data_file_from_json( + &json, + partition_spec.spec_id(), + &partition_type, + &schema, + ) + .unwrap() + }) + .collect(); + + // Verify we have the expected number of deserialized files + assert_eq!(deserialized_files.len(), 2); + let deserialized_data_file1 = deserialized_files.first().unwrap(); + let deserialized_data_file2 = deserialized_files.get(1).unwrap(); + let original_data_file1 = data_files.first().unwrap(); + let original_data_file2 = data_files.get(1).unwrap(); + + assert_eq!(deserialized_data_file1, original_data_file1); + assert_eq!(deserialized_data_file2, original_data_file2); + } }