Skip to content

Commit f766445

Browse files
committed
add interface to help serialize/deserialize DataFile
1 parent 2e0b646 commit f766445

File tree

1 file changed

+108
-44
lines changed

1 file changed

+108
-44
lines changed

crates/iceberg/src/spec/manifest.rs

Lines changed: 108 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ use typed_builder::TypedBuilder;
3131
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
3232
use super::{
3333
BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile,
34-
Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER,
34+
Schema, SchemaId, SchemaRef, Struct, StructType, INITIAL_SEQUENCE_NUMBER,
35+
UNASSIGNED_SEQUENCE_NUMBER,
3536
};
3637
use crate::error::Result;
3738
use crate::io::OutputFile;
@@ -625,6 +626,38 @@ mod _const_schema {
625626
})
626627
};
627628

629+
fn data_file_fields_v2(partition_type: StructType) -> Vec<NestedFieldRef> {
630+
vec![
631+
CONTENT.clone(),
632+
FILE_PATH.clone(),
633+
FILE_FORMAT.clone(),
634+
Arc::new(NestedField::required(
635+
102,
636+
"partition",
637+
Type::Struct(partition_type),
638+
)),
639+
RECORD_COUNT.clone(),
640+
FILE_SIZE_IN_BYTES.clone(),
641+
COLUMN_SIZES.clone(),
642+
VALUE_COUNTS.clone(),
643+
NULL_VALUE_COUNTS.clone(),
644+
NAN_VALUE_COUNTS.clone(),
645+
LOWER_BOUNDS.clone(),
646+
UPPER_BOUNDS.clone(),
647+
KEY_METADATA.clone(),
648+
SPLIT_OFFSETS.clone(),
649+
EQUALITY_IDS.clone(),
650+
SORT_ORDER_ID.clone(),
651+
]
652+
}
653+
654+
pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
655+
let schema = Schema::builder()
656+
.with_fields(data_file_fields_v2(partition_type))
657+
.build()?;
658+
schema_to_avro_schema("data_file", &schema)
659+
}
660+
628661
pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
629662
let fields = vec![
630663
STATUS.clone(),
@@ -634,62 +667,52 @@ mod _const_schema {
634667
Arc::new(NestedField::required(
635668
2,
636669
"data_file",
637-
Type::Struct(StructType::new(vec![
638-
CONTENT.clone(),
639-
FILE_PATH.clone(),
640-
FILE_FORMAT.clone(),
641-
Arc::new(NestedField::required(
642-
102,
643-
"partition",
644-
Type::Struct(partition_type),
645-
)),
646-
RECORD_COUNT.clone(),
647-
FILE_SIZE_IN_BYTES.clone(),
648-
COLUMN_SIZES.clone(),
649-
VALUE_COUNTS.clone(),
650-
NULL_VALUE_COUNTS.clone(),
651-
NAN_VALUE_COUNTS.clone(),
652-
LOWER_BOUNDS.clone(),
653-
UPPER_BOUNDS.clone(),
654-
KEY_METADATA.clone(),
655-
SPLIT_OFFSETS.clone(),
656-
EQUALITY_IDS.clone(),
657-
SORT_ORDER_ID.clone(),
658-
])),
670+
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
659671
)),
660672
];
661673
let schema = Schema::builder().with_fields(fields).build()?;
662674
schema_to_avro_schema("manifest_entry", &schema)
663675
}
664676

677+
fn data_file_fields_v1(partition_type: StructType) -> Vec<NestedFieldRef> {
678+
vec![
679+
FILE_PATH.clone(),
680+
FILE_FORMAT.clone(),
681+
Arc::new(NestedField::required(
682+
102,
683+
"partition",
684+
Type::Struct(partition_type),
685+
)),
686+
RECORD_COUNT.clone(),
687+
FILE_SIZE_IN_BYTES.clone(),
688+
BLOCK_SIZE_IN_BYTES.clone(),
689+
COLUMN_SIZES.clone(),
690+
VALUE_COUNTS.clone(),
691+
NULL_VALUE_COUNTS.clone(),
692+
NAN_VALUE_COUNTS.clone(),
693+
LOWER_BOUNDS.clone(),
694+
UPPER_BOUNDS.clone(),
695+
KEY_METADATA.clone(),
696+
SPLIT_OFFSETS.clone(),
697+
SORT_ORDER_ID.clone(),
698+
]
699+
}
700+
701+
pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
702+
let schema = Schema::builder()
703+
.with_fields(data_file_fields_v1(partition_type))
704+
.build()?;
705+
schema_to_avro_schema("data_file", &schema)
706+
}
707+
665708
pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
666709
let fields = vec![
667710
STATUS.clone(),
668711
SNAPSHOT_ID_V1.clone(),
669712
Arc::new(NestedField::required(
670713
2,
671714
"data_file",
672-
Type::Struct(StructType::new(vec![
673-
FILE_PATH.clone(),
674-
FILE_FORMAT.clone(),
675-
Arc::new(NestedField::required(
676-
102,
677-
"partition",
678-
Type::Struct(partition_type),
679-
)),
680-
RECORD_COUNT.clone(),
681-
FILE_SIZE_IN_BYTES.clone(),
682-
BLOCK_SIZE_IN_BYTES.clone(),
683-
COLUMN_SIZES.clone(),
684-
VALUE_COUNTS.clone(),
685-
NULL_VALUE_COUNTS.clone(),
686-
NAN_VALUE_COUNTS.clone(),
687-
LOWER_BOUNDS.clone(),
688-
UPPER_BOUNDS.clone(),
689-
KEY_METADATA.clone(),
690-
SPLIT_OFFSETS.clone(),
691-
SORT_ORDER_ID.clone(),
692-
])),
715+
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
693716
)),
694717
];
695718
let schema = Schema::builder().with_fields(fields).build()?;
@@ -1158,6 +1181,47 @@ impl DataFile {
11581181
self.sort_order_id
11591182
}
11601183
}
1184+
1185+
/// Convert data files to avro bytes.
1186+
pub fn data_files_to_avro(
1187+
data_files: impl IntoIterator<Item = DataFile>,
1188+
partition_type: &StructType,
1189+
version: FormatVersion,
1190+
) -> Result<Vec<u8>> {
1191+
let avro_schema = match version {
1192+
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
1193+
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
1194+
};
1195+
let mut writer = AvroWriter::new(&avro_schema, Vec::new());
1196+
1197+
for data_file in data_files {
1198+
let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)?
1199+
.resolve(&avro_schema)?;
1200+
writer.append(value)?;
1201+
}
1202+
1203+
Ok(writer.into_inner()?)
1204+
}
1205+
1206+
/// Parse data files from avro bytes.
1207+
pub fn parse_from_avro(
1208+
bytes: &[u8],
1209+
schema: &Schema,
1210+
partition_type: &StructType,
1211+
version: FormatVersion,
1212+
) -> Result<Vec<DataFile>> {
1213+
let avro_schema = match version {
1214+
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
1215+
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
1216+
};
1217+
1218+
let reader = AvroReader::with_schema(&avro_schema, bytes)?;
1219+
reader
1220+
.into_iter()
1221+
.map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, &schema))
1222+
.collect::<Result<Vec<_>>>()
1223+
}
1224+
11611225
/// Type of content stored by the data file: data, equality deletes, or
11621226
/// position deletes (all v1 files are data files)
11631227
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]

0 commit comments

Comments
 (0)