From b94c978a7573f2583fb90f4069e25fef0c1f920e Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 18 Dec 2024 19:40:19 +0800 Subject: [PATCH 1/3] add interface to help serialize/deserialize DataFile --- crates/iceberg/src/spec/manifest.rs | 149 ++++++++++++++++++++-------- 1 file changed, 106 insertions(+), 43 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080f..2491630181 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -656,6 +656,38 @@ mod _const_schema { }) }; + fn data_file_fields_v2(partition_type: StructType) -> Vec { + vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v2(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), @@ -665,34 +697,44 @@ mod _const_schema { Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - CONTENT.clone(), - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - EQUALITY_IDS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v2(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; schema_to_avro_schema("manifest_entry", &schema) } + fn data_file_fields_v1(partition_type: StructType) -> Vec { + vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v1(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), @@ -700,27 +742,7 @@ mod _const_schema { Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - BLOCK_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v1(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; @@ -1189,6 +1211,47 @@ impl DataFile { self.sort_order_id } } + +/// Convert data files to avro bytes. +pub fn data_files_to_avro( + data_files: impl IntoIterator, + partition_type: &StructType, + version: FormatVersion, +) -> Result> { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + }; + let mut writer = AvroWriter::new(&avro_schema, Vec::new()); + + for data_file in data_files { + let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)? + .resolve(&avro_schema)?; + writer.append(value)?; + } + + Ok(writer.into_inner()?) +} + +/// Parse data files from avro bytes. +pub fn parse_from_avro( + bytes: &[u8], + schema: &Schema, + partition_type: &StructType, + version: FormatVersion, +) -> Result> { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + }; + + let reader = AvroReader::with_schema(&avro_schema, bytes)?; + reader + .into_iter() + .map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema)) + .collect::>>() +} + /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) #[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] From 9cdd7620b740e2e46f03db3d5430ca121a777936 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 31 Dec 2024 16:39:25 +0800 Subject: [PATCH 2/3] refine interface and add test --- crates/iceberg/src/spec/manifest.rs | 115 ++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 2491630181..622d4c4b5c 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -18,6 +18,7 @@ //! Manifest for Iceberg. use std::cmp::min; use std::collections::HashMap; +use std::io::{Read, Write}; use std::str::FromStr; use std::sync::Arc; @@ -61,7 +62,7 @@ impl Manifest { let entries = match metadata.format_version { FormatVersion::V1 => { - let schema = manifest_schema_v1(partition_type.clone())?; + let schema = manifest_schema_v1(&partition_type)?; let reader = AvroReader::with_schema(&schema, bs)?; reader .into_iter() @@ -72,7 +73,7 @@ impl Manifest { .collect::>>()? } FormatVersion::V2 => { - let schema = manifest_schema_v2(partition_type.clone())?; + let schema = manifest_schema_v2(&partition_type)?; let reader = AvroReader::with_schema(&schema, bs)?; reader .into_iter() @@ -241,8 +242,8 @@ impl ManifestWriter { .partition_type(&manifest.metadata.schema)?; let table_schema = &manifest.metadata.schema; let avro_schema = match manifest.metadata.format_version { - FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, - FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + FormatVersion::V1 => manifest_schema_v1(&partition_type)?, + FormatVersion::V2 => manifest_schema_v2(&partition_type)?, }; let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); avro_writer.add_user_metadata( @@ -656,7 +657,7 @@ mod _const_schema { }) }; - fn data_file_fields_v2(partition_type: StructType) -> Vec { + fn data_file_fields_v2(partition_type: &StructType) -> Vec { vec![ CONTENT.clone(), FILE_PATH.clone(), @@ -664,7 +665,7 @@ mod _const_schema { Arc::new(NestedField::required( 102, "partition", - Type::Struct(partition_type), + Type::Struct(partition_type.clone()), )), RECORD_COUNT.clone(), FILE_SIZE_IN_BYTES.clone(), @@ -681,14 +682,14 @@ mod _const_schema { ] } - pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result { + pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result { let schema = Schema::builder() .with_fields(data_file_fields_v2(partition_type)) .build()?; schema_to_avro_schema("data_file", &schema) } - pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { + pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V2.clone(), @@ -704,14 +705,14 @@ mod _const_schema { schema_to_avro_schema("manifest_entry", &schema) } - fn data_file_fields_v1(partition_type: StructType) -> Vec { + fn data_file_fields_v1(partition_type: &StructType) -> Vec { vec![ FILE_PATH.clone(), FILE_FORMAT.clone(), Arc::new(NestedField::required( 102, "partition", - Type::Struct(partition_type), + Type::Struct(partition_type.clone()), )), RECORD_COUNT.clone(), FILE_SIZE_IN_BYTES.clone(), @@ -728,14 +729,14 @@ mod _const_schema { ] } - pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result { + pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result { let schema = Schema::builder() .with_fields(data_file_fields_v1(partition_type)) .build()?; schema_to_avro_schema("data_file", &schema) } - pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { + pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V1.clone(), @@ -1212,17 +1213,19 @@ impl DataFile { } } -/// Convert data files to avro bytes. -pub fn data_files_to_avro( +/// Convert data files to avro bytes and write to writer. +/// Return the bytes written. +pub fn write_data_files_to_avro( + writer: &mut W, data_files: impl IntoIterator, partition_type: &StructType, version: FormatVersion, -) -> Result> { +) -> Result { let avro_schema = match version { - FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), - FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(), }; - let mut writer = AvroWriter::new(&avro_schema, Vec::new()); + let mut writer = AvroWriter::new(&avro_schema, writer); for data_file in data_files { let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)? @@ -1230,22 +1233,22 @@ pub fn data_files_to_avro( writer.append(value)?; } - Ok(writer.into_inner()?) + Ok(writer.flush()?) } /// Parse data files from avro bytes. -pub fn parse_from_avro( - bytes: &[u8], +pub fn parse_data_file_from_avro( + reader: &mut R, schema: &Schema, partition_type: &StructType, version: FormatVersion, ) -> Result> { let avro_schema = match version { - FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), - FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(), }; - let reader = AvroReader::with_schema(&avro_schema, bytes)?; + let reader = AvroReader::with_schema(&avro_schema, reader)?; reader .into_iter() .map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema)) @@ -1614,6 +1617,7 @@ mod _serde { #[cfg(test)] mod tests { use std::fs; + use std::io::Cursor; use std::sync::Arc; use tempfile::TempDir; @@ -2399,4 +2403,67 @@ mod tests { // Verify manifest (fs::read(path).expect("read_file must succeed"), res) } + + #[tokio::test] + async fn test_data_file_serialize_deserialize() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "v1", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "v2", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "v3", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let data_files = vec![DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 875, + column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), + value_counts: HashMap::from([(1,1),(2,1),(3,1)]), + null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), + upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), + key_metadata: None, + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + }]; + + let mut buffer = Vec::new(); + let _ = write_data_files_to_avro( + &mut buffer, + data_files.clone().into_iter(), + &StructType::new(vec![]), + FormatVersion::V2, + ) + .unwrap(); + + let actual_data_file = parse_data_file_from_avro( + &mut Cursor::new(buffer), + &schema, + &StructType::new(vec![]), + FormatVersion::V2, + ) + .unwrap(); + + assert_eq!(data_files, actual_data_file); + } } From 92871afc404c6fa67696752954cdc67dfa840283 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 2 Jan 2025 13:22:30 +0800 Subject: [PATCH 3/3] rename interface --- crates/iceberg/src/spec/manifest.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 622d4c4b5c..f517b8e0d9 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1237,7 +1237,7 @@ pub fn write_data_files_to_avro( } /// Parse data files from avro bytes. -pub fn parse_data_file_from_avro( +pub fn read_data_files_from_avro( reader: &mut R, schema: &Schema, partition_type: &StructType, @@ -2456,7 +2456,7 @@ mod tests { ) .unwrap(); - let actual_data_file = parse_data_file_from_avro( + let actual_data_file = read_data_files_from_avro( &mut Cursor::new(buffer), &schema, &StructType::new(vec![]),