Skip to content

Commit b94c978

Browse files
committed
add interface to help serialize/deserialize DataFile
1 parent 74a85e7 commit b94c978

File tree

1 file changed

+106
-43
lines changed

1 file changed

+106
-43
lines changed

crates/iceberg/src/spec/manifest.rs

Lines changed: 106 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,38 @@ mod _const_schema {
656656
})
657657
};
658658

659+
fn data_file_fields_v2(partition_type: StructType) -> Vec<NestedFieldRef> {
660+
vec![
661+
CONTENT.clone(),
662+
FILE_PATH.clone(),
663+
FILE_FORMAT.clone(),
664+
Arc::new(NestedField::required(
665+
102,
666+
"partition",
667+
Type::Struct(partition_type),
668+
)),
669+
RECORD_COUNT.clone(),
670+
FILE_SIZE_IN_BYTES.clone(),
671+
COLUMN_SIZES.clone(),
672+
VALUE_COUNTS.clone(),
673+
NULL_VALUE_COUNTS.clone(),
674+
NAN_VALUE_COUNTS.clone(),
675+
LOWER_BOUNDS.clone(),
676+
UPPER_BOUNDS.clone(),
677+
KEY_METADATA.clone(),
678+
SPLIT_OFFSETS.clone(),
679+
EQUALITY_IDS.clone(),
680+
SORT_ORDER_ID.clone(),
681+
]
682+
}
683+
684+
pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
685+
let schema = Schema::builder()
686+
.with_fields(data_file_fields_v2(partition_type))
687+
.build()?;
688+
schema_to_avro_schema("data_file", &schema)
689+
}
690+
659691
pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
660692
let fields = vec![
661693
STATUS.clone(),
@@ -665,62 +697,52 @@ mod _const_schema {
665697
Arc::new(NestedField::required(
666698
2,
667699
"data_file",
668-
Type::Struct(StructType::new(vec![
669-
CONTENT.clone(),
670-
FILE_PATH.clone(),
671-
FILE_FORMAT.clone(),
672-
Arc::new(NestedField::required(
673-
102,
674-
"partition",
675-
Type::Struct(partition_type),
676-
)),
677-
RECORD_COUNT.clone(),
678-
FILE_SIZE_IN_BYTES.clone(),
679-
COLUMN_SIZES.clone(),
680-
VALUE_COUNTS.clone(),
681-
NULL_VALUE_COUNTS.clone(),
682-
NAN_VALUE_COUNTS.clone(),
683-
LOWER_BOUNDS.clone(),
684-
UPPER_BOUNDS.clone(),
685-
KEY_METADATA.clone(),
686-
SPLIT_OFFSETS.clone(),
687-
EQUALITY_IDS.clone(),
688-
SORT_ORDER_ID.clone(),
689-
])),
700+
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
690701
)),
691702
];
692703
let schema = Schema::builder().with_fields(fields).build()?;
693704
schema_to_avro_schema("manifest_entry", &schema)
694705
}
695706

707+
fn data_file_fields_v1(partition_type: StructType) -> Vec<NestedFieldRef> {
708+
vec![
709+
FILE_PATH.clone(),
710+
FILE_FORMAT.clone(),
711+
Arc::new(NestedField::required(
712+
102,
713+
"partition",
714+
Type::Struct(partition_type),
715+
)),
716+
RECORD_COUNT.clone(),
717+
FILE_SIZE_IN_BYTES.clone(),
718+
BLOCK_SIZE_IN_BYTES.clone(),
719+
COLUMN_SIZES.clone(),
720+
VALUE_COUNTS.clone(),
721+
NULL_VALUE_COUNTS.clone(),
722+
NAN_VALUE_COUNTS.clone(),
723+
LOWER_BOUNDS.clone(),
724+
UPPER_BOUNDS.clone(),
725+
KEY_METADATA.clone(),
726+
SPLIT_OFFSETS.clone(),
727+
SORT_ORDER_ID.clone(),
728+
]
729+
}
730+
731+
pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
732+
let schema = Schema::builder()
733+
.with_fields(data_file_fields_v1(partition_type))
734+
.build()?;
735+
schema_to_avro_schema("data_file", &schema)
736+
}
737+
696738
pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
697739
let fields = vec![
698740
STATUS.clone(),
699741
SNAPSHOT_ID_V1.clone(),
700742
Arc::new(NestedField::required(
701743
2,
702744
"data_file",
703-
Type::Struct(StructType::new(vec![
704-
FILE_PATH.clone(),
705-
FILE_FORMAT.clone(),
706-
Arc::new(NestedField::required(
707-
102,
708-
"partition",
709-
Type::Struct(partition_type),
710-
)),
711-
RECORD_COUNT.clone(),
712-
FILE_SIZE_IN_BYTES.clone(),
713-
BLOCK_SIZE_IN_BYTES.clone(),
714-
COLUMN_SIZES.clone(),
715-
VALUE_COUNTS.clone(),
716-
NULL_VALUE_COUNTS.clone(),
717-
NAN_VALUE_COUNTS.clone(),
718-
LOWER_BOUNDS.clone(),
719-
UPPER_BOUNDS.clone(),
720-
KEY_METADATA.clone(),
721-
SPLIT_OFFSETS.clone(),
722-
SORT_ORDER_ID.clone(),
723-
])),
745+
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
724746
)),
725747
];
726748
let schema = Schema::builder().with_fields(fields).build()?;
@@ -1189,6 +1211,47 @@ impl DataFile {
11891211
self.sort_order_id
11901212
}
11911213
}
1214+
1215+
/// Convert data files to avro bytes.
1216+
pub fn data_files_to_avro(
1217+
data_files: impl IntoIterator<Item = DataFile>,
1218+
partition_type: &StructType,
1219+
version: FormatVersion,
1220+
) -> Result<Vec<u8>> {
1221+
let avro_schema = match version {
1222+
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
1223+
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
1224+
};
1225+
let mut writer = AvroWriter::new(&avro_schema, Vec::new());
1226+
1227+
for data_file in data_files {
1228+
let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)?
1229+
.resolve(&avro_schema)?;
1230+
writer.append(value)?;
1231+
}
1232+
1233+
Ok(writer.into_inner()?)
1234+
}
1235+
1236+
/// Parse data files from avro bytes.
1237+
pub fn parse_from_avro(
1238+
bytes: &[u8],
1239+
schema: &Schema,
1240+
partition_type: &StructType,
1241+
version: FormatVersion,
1242+
) -> Result<Vec<DataFile>> {
1243+
let avro_schema = match version {
1244+
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
1245+
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
1246+
};
1247+
1248+
let reader = AvroReader::with_schema(&avro_schema, bytes)?;
1249+
reader
1250+
.into_iter()
1251+
.map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema))
1252+
.collect::<Result<Vec<_>>>()
1253+
}
1254+
11921255
/// Type of content stored by the data file: data, equality deletes, or
11931256
/// position deletes (all v1 files are data files)
11941257
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]

0 commit comments

Comments
 (0)