diff --git a/Cargo.lock b/Cargo.lock index cf4a23df9a..c5dc688655 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -580,6 +580,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "async-task" version = "4.7.1" @@ -2947,6 +2969,7 @@ dependencies = [ "arrow-select", "arrow-string", "async-std", + "async-stream", "async-trait", "bimap", "bitvec", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 7320c455d1..64f6e3d644 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -53,6 +53,7 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } +async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } @@ -86,6 +87,7 @@ uuid = { workspace = true } zstd = { workspace = true } [dev-dependencies] +arrow-cast = { workspace = true, features = ["prettyprint"] } ctor = { workspace = true } expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } diff --git a/crates/iceberg/src/inspect/entries.rs b/crates/iceberg/src/inspect/entries.rs new file mode 100644 index 0000000000..c254523565 --- /dev/null +++ b/crates/iceberg/src/inspect/entries.rs @@ -0,0 +1,603 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::type_name; +use std::string::ToString; +use std::sync::Arc; + +use arrow_array::builder::{ + ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, + Float32Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, ListBuilder, + MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, TimestampMicrosecondBuilder, + TimestampNanosecondBuilder, +}; +use arrow_array::types::{Int32Type, Int64Type}; +use arrow_array::{ArrowPrimitiveType, RecordBatch, StructArray}; +use arrow_schema::{DataType, Fields, TimeUnit}; +use async_stream::try_stream; +use futures::StreamExt; +use itertools::Itertools; +use ordered_float::OrderedFloat; + +use crate::arrow::{schema_to_arrow_schema, type_to_arrow_type}; +use crate::inspect::metrics::ReadableMetricsStructBuilder; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{ + DataFile, Datum, ManifestFile, NestedFieldRef, PrimitiveLiteral, Schema, Struct, TableMetadata, + Type, +}; +use crate::table::Table; +use crate::{Error, ErrorKind, Result}; + +/// Entries table containing the entries of the current snapshot's manifest files. +/// +/// The table has one row for each manifest file entry in the current snapshot's manifest list file. +/// For reference, see the Java implementation of [`ManifestEntry`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/core/src/main/java/org/apache/iceberg/ManifestEntry.java +pub struct EntriesTable<'a> { + table: &'a Table, +} + +impl<'a> EntriesTable<'a> { + /// Create a new Entries table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Scan the manifest entries table. + pub async fn scan(&self) -> Result { + let current_snapshot = self.table.metadata().current_snapshot().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Cannot scan entries for table without current snapshot", + ) + })?; + + let manifest_list = current_snapshot + .load_manifest_list(self.table.file_io(), self.table.metadata()) + .await?; + + // Copy to ensure that the stream can take ownership of these dependencies + let schema = self.schema(); + let arrow_schema = Arc::new(schema_to_arrow_schema(&schema)?); + let table_metadata = self.table.metadata_ref(); + let file_io = Arc::new(self.table.file_io().clone()); + let readable_metrics_schema = schema + .field_by_name("readable_metrics") + .and_then(|field| field.field_type.clone().to_struct_type()) + .unwrap(); + + Ok(try_stream! { + for manifest_file in manifest_list.entries() { + let mut status = Int32Builder::new(); + let mut snapshot_id = Int64Builder::new(); + let mut sequence_number = Int64Builder::new(); + let mut file_sequence_number = Int64Builder::new(); + let mut data_file = DataFileStructBuilder::new(&table_metadata)?; + let mut readable_metrics = + ReadableMetricsStructBuilder::new( + table_metadata.current_schema(), &readable_metrics_schema)?; + + for manifest_entry in manifest_file.load_manifest(&file_io).await?.entries() { + status.append_value(manifest_entry.status() as i32); + snapshot_id.append_option(manifest_entry.snapshot_id()); + sequence_number.append_option(manifest_entry.sequence_number()); + file_sequence_number.append_option(manifest_entry.file_sequence_number()); + data_file.append(manifest_file, manifest_entry.data_file())?; + readable_metrics.append(manifest_entry.data_file())?; + } + + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(status.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(sequence_number.finish()), + Arc::new(file_sequence_number.finish()), + Arc::new(data_file.finish()), + Arc::new(readable_metrics.finish()), + ])?; + + yield batch; + } + } + .boxed()) + } + + /// Get the schema for the manifest entries table. + pub fn schema(&self) -> Schema { + let partition_type = crate::spec::partition_type(self.table.metadata()).unwrap(); + let schema = Schema::builder() + .with_fields(crate::spec::_const_schema::manifest_schema_fields_v2( + &partition_type, + )) + .build() + .unwrap(); + let readable_metric_schema = ReadableMetricsStructBuilder::readable_metrics_schema( + self.table.metadata().current_schema(), + &schema, + ); + join_schemas(&schema, &readable_metric_schema.unwrap()).unwrap() + } +} + +/// Builds the struct describing data files listed in a table manifest. +/// +/// For reference, see the Java implementation of [`DataFile`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java +struct DataFileStructBuilder<'a> { + /// Builder for data file struct (including the partition struct). + struct_builder: StructBuilder, + /// Arrow fields of the combined partition struct of all partition specs. + /// We require this to reconstruct the field builders in the partition [`StructBuilder`]. + combined_partition_fields: Fields, + /// Table metadata to look up partition specs by partition spec id. + table_metadata: &'a TableMetadata, +} + +impl<'a> DataFileStructBuilder<'a> { + fn new(table_metadata: &'a TableMetadata) -> Result { + let combined_partition_type = crate::spec::partition_type(table_metadata)?; + let data_file_fields = + crate::spec::_const_schema::data_file_fields_v2(&combined_partition_type); + let data_file_schema = Schema::builder().with_fields(data_file_fields).build()?; + let DataType::Struct(combined_partition_fields) = + type_to_arrow_type(&Type::Struct(combined_partition_type))? + else { + panic!("Converted Arrow type was not struct") + }; + + Ok(DataFileStructBuilder { + struct_builder: StructBuilder::from_fields( + schema_to_arrow_schema(&data_file_schema)?.fields, + 0, + ), + combined_partition_fields, + table_metadata, + }) + } + + fn append(&mut self, manifest_file: &ManifestFile, data_file: &DataFile) -> Result<()> { + // Content type + self.field_builder::(0)? + .append_value(data_file.content as i32); + + // File path + self.field_builder::(1)? + .append_value(data_file.file_path()); + + // File format + self.field_builder::(2)? + .append_value(data_file.file_format().to_string().to_uppercase()); + + // Partitions + self.append_partition_values(manifest_file.partition_spec_id, data_file.partition())?; + + // Record count + self.field_builder::(4)? + .append_value(data_file.record_count() as i64); + + // File size in bytes + self.field_builder::(5)? + .append_value(data_file.file_size_in_bytes() as i64); + + // Column sizes + let (column_size_keys, column_size_values): (Vec, Vec) = data_file + .column_sizes() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, *v as i64)) + .unzip(); + self.append_to_map_field::( + 6, + |key_builder| key_builder.append_slice(column_size_keys.as_slice()), + |value_builder| value_builder.append_slice(column_size_values.as_slice()), + )?; + + // Value counts + let (value_count_keys, value_count_values): (Vec, Vec) = data_file + .value_counts() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, *v as i64)) + .unzip(); + self.append_to_primitive_map_field::( + 7, + value_count_keys.as_slice(), + value_count_values.as_slice(), + )?; + + // Null value counts + let (null_count_keys, null_count_values): (Vec, Vec) = data_file + .null_value_counts() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, *v as i64)) + .unzip(); + self.append_to_primitive_map_field::( + 8, + null_count_keys.as_slice(), + null_count_values.as_slice(), + )?; + + // Nan value counts + let (nan_count_keys, nan_count_values): (Vec, Vec) = data_file + .nan_value_counts() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, *v as i64)) + .unzip(); + self.append_to_primitive_map_field::( + 9, + nan_count_keys.as_slice(), + nan_count_values.as_slice(), + )?; + + // Lower bounds + let (lower_bound_keys, lower_bound_values): (Vec, Vec) = data_file + .lower_bounds() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, v.clone())) + .unzip(); + self.append_to_map_field::( + 10, + |key_builder| key_builder.append_slice(lower_bound_keys.as_slice()), + |value_builder| { + for v in &lower_bound_values { + value_builder.append_value(v.to_bytes().unwrap()) + } + }, + )?; + + // Upper bounds + let (upper_bound_keys, upper_bound_values): (Vec, Vec) = data_file + .upper_bounds() + .iter() + .sorted_by_key(|(k, _)| *k) + .map(|(k, v)| (k, v.clone())) + .unzip(); + self.append_to_map_field::( + 11, + |key_builder| key_builder.append_slice(upper_bound_keys.as_slice()), + |value_builder| { + for v in &upper_bound_values { + value_builder.append_value(v.to_bytes().unwrap()) + } + }, + )?; + + // Key metadata + self.field_builder::(12)? + .append_option(data_file.key_metadata()); + + // Split offsets + self.append_to_list_field::(13, data_file.split_offsets())?; + + // Equality ids + self.append_to_list_field::(14, data_file.equality_ids())?; + + // Sort order ids + self.field_builder::(15)? + .append_option(data_file.sort_order_id()); + + // Append an element in the struct + self.struct_builder.append(true); + Ok(()) + } + + fn field_builder(&mut self, index: usize) -> Result<&mut T> { + self.struct_builder.field_builder_or_err::(index) + } + + fn append_to_list_field( + &mut self, + index: usize, + values: &[T::Native], + ) -> Result<()> { + let list_builder = self.field_builder::>>(index)?; + list_builder + .values() + .cast_or_err::>()? + .append_slice(values); + list_builder.append(true); + Ok(()) + } + + fn append_to_map_field( + &mut self, + index: usize, + key_func: impl Fn(&mut K), + value_func: impl Fn(&mut V), + ) -> Result<()> { + let map_builder = + self.field_builder::, Box>>(index)?; + key_func(map_builder.keys().cast_or_err::()?); + value_func(map_builder.values().cast_or_err::()?); + Ok(map_builder.append(true)?) + } + + fn append_to_primitive_map_field( + &mut self, + index: usize, + keys: &[K::Native], + values: &[V::Native], + ) -> Result<()> { + let map_builder = + self.field_builder::, Box>>(index)?; + map_builder + .keys() + .cast_or_err::>()? + .append_slice(keys); + map_builder + .values() + .cast_or_err::>()? + .append_slice(values); + Ok(map_builder.append(true)?) + } + + fn append_partition_values( + &mut self, + partition_spec_id: i32, + partition_values: &Struct, + ) -> Result<()> { + // Get the partition fields for the partition spec id in the manifest file + let partition_spec = self + .table_metadata + .partition_spec_by_id(partition_spec_id) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Partition spec not found"))? + .fields(); + // Clone here so we don't hold an immutable reference as we mutably-borrow the builder below + let combined_partition_fields = self.combined_partition_fields.clone(); + // Get the partition struct builder + let partition_builder = self.field_builder::(3)?; + // Iterate the manifest's partition fields with the respect partition values from the data file + for (partition_field, partition_value) in + partition_spec.iter().zip_eq(partition_values.iter()) + { + let (combined_index, combined_partition_spec_field) = combined_partition_fields + .find(&partition_field.name) + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Partition field not found"))?; + let partition_type = combined_partition_spec_field.data_type(); + let partition_value: Option = partition_value + .map(|value| -> std::result::Result { + value.as_primitive_literal().ok_or({ + Error::new( + ErrorKind::FeatureUnsupported, + "Only primitive types support in partition struct", + ) + }) + }) + .transpose()?; + + // Append a literal to a field builder cast based on the expected partition field type. + // We cannot solely rely on the literal type, because it doesn't sufficiently specify + // the underlying type. E.g., a `PrimtivieLiteral::Long` could represent either a long + // or a timestamp. + match (partition_type, partition_value.clone()) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Boolean, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Int32, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Int64, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Float32, Some(PrimitiveLiteral::Float(OrderedFloat(value)))) => { + partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value) + } + (DataType::Float32, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Float64, Some(PrimitiveLiteral::Double(OrderedFloat(value)))) => { + partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value) + } + (DataType::Float64, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Utf8, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::FixedSizeBinary(_), Some(PrimitiveLiteral::Binary(value))) => { + partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value)? + } + (DataType::FixedSizeBinary(_), None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::LargeBinary, Some(PrimitiveLiteral::Binary(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::LargeBinary, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Date32, None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + ( + DataType::Timestamp(TimeUnit::Microsecond, _), + Some(PrimitiveLiteral::Long(value)), + ) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Timestamp(TimeUnit::Microsecond, _), None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + ( + DataType::Timestamp(TimeUnit::Nanosecond, _), + Some(PrimitiveLiteral::Long(value)), + ) => partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value), + (DataType::Timestamp(TimeUnit::Nanosecond, _), None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) => { + partition_builder + .field_builder_or_err::(combined_index)? + .append_value(value) + } + (DataType::Decimal128(_, _), None) => partition_builder + .field_builder_or_err::(combined_index)? + .append_null(), + (_, _) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Cannot build partition struct with field type {:?} and partition value {:?}", + partition_type, partition_value + ), + )); + } + } + } + + // Append an element in the struct + partition_builder.append(true); + Ok(()) + } + + fn finish(&mut self) -> StructArray { + self.struct_builder.finish() + } +} + +/// Join two schemas by concatenating fields. Return [`Error`] if the schemas have fields with the +/// same field id but different types. +fn join_schemas(left: &Schema, right: &Schema) -> Result { + let mut joined_fields: Vec = left.as_struct().fields().to_vec(); + + for right_field in right.as_struct().fields() { + match left.field_by_id(right_field.id) { + None => { + joined_fields.push(right_field.clone()); + } + Some(left_field) => { + if left_field != right_field { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Schemas have different columns with the same id: {:?}, {:?}", + left_field, right_field + ), + )); + } + } + } + } + + Schema::builder().with_fields(joined_fields).build() +} + +/// Helper to cast a field builder in a [`StructBuilder`] to a specific builder type or return an +/// [`Error`]. +trait StructBuilderExt { + fn field_builder_or_err(&mut self, index: usize) -> Result<&mut T>; +} + +impl StructBuilderExt for StructBuilder { + fn field_builder_or_err(&mut self, index: usize) -> Result<&mut T> { + self.field_builder::(index).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Field builder not found for index {index} and type {}", + type_name::() + ), + ) + }) + } +} + +/// Helper to cast a [`Box`] to a specific type or return an [`Error`]. +trait ArrayBuilderExt { + fn cast_or_err(&mut self) -> Result<&mut T>; +} + +impl ArrayBuilderExt for Box { + fn cast_or_err(&mut self) -> Result<&mut T> { + self.as_any_mut().downcast_mut::().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!("Cannot cast builder to type {}", type_name::()), + ) + }) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use crate::inspect::metadata_table::tests::check_record_batches; + use crate::scan::tests::TableTestFixture; + + #[tokio::test] + async fn test_entries_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + let table = fixture.table; + let inspect = table.inspect(); + let entries_table = inspect.entries(); + + let batch_stream = entries_table.scan().await.unwrap(); + + check_record_batches( + batch_stream, + expect![[r#" + Field { name: "status", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "0"} }, + Field { name: "snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, + Field { name: "file_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} }, + Field { name: "data_file", data_type: Struct([Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "134"} }, Field { name: "file_path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "100"} }, Field { name: "file_format", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "101"} }, Field { name: "partition", data_type: Struct([Field { name: "x", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1000"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "102"} }, Field { name: "record_count", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "103"} }, Field { name: "file_size_in_bytes", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "104"} }, Field { name: "column_sizes", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "117"} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "118"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "108"} }, Field { name: "value_counts", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "119"} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "120"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "109"} }, Field { name: "null_value_counts", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "121"} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "122"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "110"} }, Field { name: "nan_value_counts", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "138"} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "139"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "137"} }, Field { name: "lower_bounds", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "126"} }, Field { name: "value", data_type: LargeBinary, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "127"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "125"} }, Field { name: "upper_bounds", data_type: Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "129"} }, Field { name: "value", data_type: LargeBinary, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "130"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "128"} }, Field { name: "key_metadata", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "131"} }, Field { name: "split_offsets", data_type: List(Field { name: "element", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "133"} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "132"} }, Field { name: "equality_ids", data_type: List(Field { name: "element", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "136"} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "135"} }, Field { name: "sort_order_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "140"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "readable_metrics", data_type: Struct([Field { name: "a", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1001"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1002"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1003"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1004"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1005"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1006"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1007"} }, Field { name: "binary", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1008"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1009"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1010"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1011"} }, Field { name: "lower_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1012"} }, Field { name: "upper_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1013"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1014"} }, Field { name: "bool", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1015"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1016"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1017"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1018"} }, Field { name: "lower_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1019"} }, Field { name: "upper_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1020"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1021"} }, Field { name: "date", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1022"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1023"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1024"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1025"} }, Field { name: "lower_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1026"} }, Field { name: "upper_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1027"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1028"} }, Field { name: "dbl", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1029"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1030"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1031"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1032"} }, Field { name: "lower_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1033"} }, Field { name: "upper_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1034"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1035"} }, Field { name: "decimal", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1036"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1037"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1038"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1039"} }, Field { name: "lower_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1040"} }, Field { name: "upper_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1041"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1042"} }, Field { name: "float", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1043"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1044"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1045"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1046"} }, Field { name: "lower_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1047"} }, Field { name: "upper_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1048"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1049"} }, Field { name: "i32", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1050"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1051"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1052"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1053"} }, Field { name: "lower_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1054"} }, Field { name: "upper_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1055"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1056"} }, Field { name: "i64", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1057"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1058"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1059"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1060"} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1061"} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1062"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1063"} }, Field { name: "timestamp", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1064"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1065"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1066"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1067"} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1068"} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1069"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1070"} }, Field { name: "timestampns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1071"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1072"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1073"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1074"} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1075"} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1076"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1077"} }, Field { name: "timestamptz", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1078"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1079"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1080"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1081"} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1082"} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1083"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1084"} }, Field { name: "timestamptzns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1085"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1086"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1087"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1088"} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1089"} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1090"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1091"} }, Field { name: "x", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1092"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1093"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1094"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1095"} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1096"} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1097"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1098"} }, Field { name: "y", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1099"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1100"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1101"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1102"} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1103"} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1104"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1105"} }, Field { name: "z", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1106"} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1107"} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1108"} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1109"} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1110"} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1111"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1112"} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1113"} }"#]], + expect![[r#" + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | status | snapshot_id | sequence_number | file_sequence_number | data_file | readable_metrics | + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | 1 | 3055729675574597004 | 1 | 1 | {content: 0, file_format: PARQUET, partition: {x: 100}, record_count: 1, file_size_in_bytes: 100, column_sizes: {1: 1, 2: 1}, value_counts: {1: 2, 2: 2}, null_value_counts: {1: 3, 2: 3}, nan_value_counts: {1: 4, 2: 4}, lower_bounds: {1: 0100000000000000, 2: 0200000000000000, 3: 0300000000000000, 4: 417061636865, 5: 0000000000005940, 6: 64000000, 7: 6400000000000000, 8: 00, 9: 0000c842, 11: 00000000, 12: 0000000000000000, 13: 0000000000000000}, upper_bounds: {1: 0100000000000000, 2: 0500000000000000, 3: 0400000000000000, 4: 49636562657267, 5: 0000000000006940, 6: c8000000, 7: c800000000000000, 8: 01, 9: 00004843, 11: 00000000, 12: 0000000000000000, 13: 0000000000000000}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: Apache, upper_bound: Iceberg}, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: false, upper_bound: true}, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01, upper_bound: 1970-01-01}, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100.0, upper_bound: 200.0}, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100.0, upper_bound: 200.0}, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100, upper_bound: 200}, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100, upper_bound: 200}, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01T00:00:00, upper_bound: 1970-01-01T00:00:00}, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01T00:00:00Z, upper_bound: 1970-01-01T00:00:00Z}, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, x: {column_size: 1, value_count: 2, null_value_count: 3, nan_value_count: 4, lower_bound: 1, upper_bound: 1}, y: {column_size: 1, value_count: 2, null_value_count: 3, nan_value_count: 4, lower_bound: 2, upper_bound: 5}, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 3, upper_bound: 4}} | + | 2 | 3055729675574597004 | 0 | 0 | {content: 0, file_format: PARQUET, partition: {x: 200}, record_count: 1, file_size_in_bytes: 100, column_sizes: {}, value_counts: {}, null_value_counts: {}, nan_value_counts: {}, lower_bounds: {}, upper_bounds: {}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, x: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, y: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }} | + | 0 | 3051729675574597004 | 0 | 0 | {content: 0, file_format: PARQUET, partition: {x: 300}, record_count: 1, file_size_in_bytes: 100, column_sizes: {}, value_counts: {}, null_value_counts: {}, nan_value_counts: {}, lower_bounds: {}, upper_bounds: {}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, x: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, y: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }} | + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+"#]], + &[], + &["file_path"], + None, + ) + .await; + } +} diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index e94e48a45d..8f5da7f8f1 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -285,75 +285,13 @@ mod tests { Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} }, Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]], expect![[r#" - content: PrimitiveArray - [ - 0, - ], - path: (skipped), - length: (skipped), - partition_spec_id: PrimitiveArray - [ - 0, - ], - added_snapshot_id: PrimitiveArray - [ - 3055729675574597004, - ], - added_data_files_count: PrimitiveArray - [ - 1, - ], - existing_data_files_count: PrimitiveArray - [ - 1, - ], - deleted_data_files_count: PrimitiveArray - [ - 1, - ], - added_delete_files_count: PrimitiveArray - [ - 1, - ], - existing_delete_files_count: PrimitiveArray - [ - 1, - ], - deleted_delete_files_count: PrimitiveArray - [ - 1, - ], - partition_summaries: ListArray - [ - StructArray - -- validity: - [ - valid, - ] - [ - -- child 0: "contains_null" (Boolean) - BooleanArray - [ - false, - ] - -- child 1: "contains_nan" (Boolean) - BooleanArray - [ - false, - ] - -- child 2: "lower_bound" (Utf8) - StringArray - [ - "100", - ] - -- child 3: "upper_bound" (Utf8) - StringArray - [ - "300", - ] - ], - ]"#]], + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+ + | content | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | added_delete_files_count | existing_delete_files_count | deleted_delete_files_count | partition_summaries | + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+ + | 0 | 0 | 3055729675574597004 | 1 | 1 | 1 | 1 | 1 | 1 | [{contains_null: false, contains_nan: false, lower_bound: 100, upper_bound: 300}] | + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+"#]], &["path", "length"], + &[], Some("path"), ).await; } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index 75dbc74725..20d75caa94 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -16,6 +16,7 @@ // under the License. use super::{ManifestsTable, SnapshotsTable}; +use crate::inspect::entries::EntriesTable; use crate::table::Table; /// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. @@ -33,6 +34,11 @@ impl<'a> MetadataTable<'a> { Self(table) } + /// Returns the current manifest file's entries. + pub fn entries(&self) -> EntriesTable { + EntriesTable::new(self.0) + } + /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { SnapshotsTable::new(self.0) @@ -46,6 +52,11 @@ impl<'a> MetadataTable<'a> { #[cfg(test)] pub mod tests { + use std::sync::Arc; + + use arrow_array::{ArrayRef, RecordBatch, StructArray}; + use arrow_cast::pretty::pretty_format_batches; + use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; use expect_test::Expect; use futures::TryStreamExt; use itertools::Itertools; @@ -59,12 +70,14 @@ pub mod tests { /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). /// Check the doc of [`expect_test`] for more details. /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `ignore_check_struct_fields`: Same as `ignore_check_columns` but for (top-level) struct fields. /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. pub async fn check_record_batches( batch_stream: ArrowRecordBatchStream, expected_schema: Expect, expected_data: Expect, ignore_check_columns: &[&str], + ignore_check_struct_fields: &[&str], sort_column: Option<&str>, ) { let record_batches = batch_stream.try_collect::>().await.unwrap(); @@ -85,25 +98,48 @@ pub mod tests { .collect_vec(); } + // Filter columns + let (fields, columns): (Vec<_>, Vec<_>) = record_batch + .schema() + .fields + .iter() + .zip_eq(columns) + // Filter ignored columns + .filter(|(field, _)| !ignore_check_columns.contains(&field.name().as_str())) + // For struct fields, filter ignored struct fields + .map(|(field, column)| match field.data_type() { + DataType::Struct(fields) => { + let struct_array = column.as_any().downcast_ref::().unwrap(); + let filtered: Vec<(FieldRef, ArrayRef)> = fields + .iter() + .zip_eq(struct_array.columns().iter()) + .filter(|(f, _)| !ignore_check_struct_fields.contains(&f.name().as_str())) + .map(|(f, c)| (f.clone(), c.clone())) + .collect_vec(); + let filtered_struct_type: DataType = DataType::Struct( + filtered.iter().map(|(f, _)| f.clone()).collect_vec().into(), + ); + ( + Field::new(field.name(), filtered_struct_type, field.is_nullable()).into(), + Arc::new(StructArray::from(filtered)) as ArrayRef, + ) + } + _ => (field.clone(), column), + }) + .unzip(); + expected_schema.assert_eq(&format!( "{}", record_batch.schema().fields().iter().format(",\n") )); - expected_data.assert_eq(&format!( - "{}", - record_batch - .schema() - .fields() - .iter() - .zip_eq(columns) - .map(|(field, column)| { - if ignore_check_columns.contains(&field.name().as_str()) { - format!("{}: (skipped)", field.name()) - } else { - format!("{}: {:?}", field.name(), column) - } - }) - .format(",\n") - )); + expected_data.assert_eq( + &pretty_format_batches(&[RecordBatch::try_new( + Arc::new(ArrowSchema::new(fields)), + columns, + ) + .unwrap()]) + .unwrap() + .to_string(), + ); } } diff --git a/crates/iceberg/src/inspect/metrics.rs b/crates/iceberg/src/inspect/metrics.rs new file mode 100644 index 0000000000..d7d6d4d765 --- /dev/null +++ b/crates/iceberg/src/inspect/metrics.rs @@ -0,0 +1,428 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::type_name; +use std::sync::Arc; + +use arrow_array::builder::{ + ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, + Float32Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, StringBuilder, + StructBuilder, TimestampMicrosecondBuilder, TimestampNanosecondBuilder, +}; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + Date32Type, Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, + TimestampMicrosecondType, TimestampNanosecondType, +}; +use arrow_array::{ArrayRef, StructArray}; +use arrow_schema::{DataType, FieldRef, Fields}; +use itertools::Itertools; +use rust_decimal::prelude::ToPrimitive; + +use crate::arrow::{get_arrow_datum, schema_to_arrow_schema, type_to_arrow_type}; +use crate::spec::{ + DataFile, Datum, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, +}; +use crate::{Error, ErrorKind, Result}; + +/// Metrics for a column in a data file. +struct ColumnMetrics { + column_size: Option, + value_count: Option, + null_value_count: Option, + nan_value_count: Option, + lower_bound: Option, + upper_bound: Option, +} + +/// Builder for the `readable_metrics` struct in metadata tables. +pub(crate) struct ReadableMetricsStructBuilder { + column_builders: Vec, + column_fields: Fields, +} + +impl ReadableMetricsStructBuilder { + /// Calculates a dynamic schema for `readable_metrics` to add to metadata tables. The type + /// will be a nested struct containing all primitive columns in the data table. Within the + /// struct's fields are structs that represent [`ColumnMetrics`]. + /// + /// + /// We take the table's schema to get the set of fields in the table. We also take the manifest + /// entry schema to get the highest field ID in the entries metadata table to know which field + /// ID to begin with. + pub fn readable_metrics_schema( + data_table_schema: &Schema, + manifest_entry_schema: &Schema, + ) -> Result { + let mut field_ids = IncrementingFieldId(manifest_entry_schema.highest_field_id() + 1); + let mut column_metrics_fields: Vec = Vec::new(); + + let mut primitive_fields: Vec<&NestedFieldRef> = data_table_schema + .as_struct() + .fields() + .iter() + .filter(|field| field.field_type.is_primitive()) + .collect_vec(); + primitive_fields.sort_by_key(|field| field.name.clone()); + + for field in primitive_fields { + // We can expect a primitive type because we filtered for primitive fields above + let primitive_type = field.field_type.as_primitive_type().expect("is primitive"); + let metrics_schema_for_field = + ReadableColumnMetricsStructBuilder::schema(&mut field_ids, primitive_type)?; + + column_metrics_fields.push(Arc::new(NestedField::required( + field_ids.next_id(), + &field.name, + Type::Struct(metrics_schema_for_field.as_struct().clone()), + ))); + } + + Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + field_ids.next_id(), + "readable_metrics", + Type::Struct(StructType::new(column_metrics_fields)), + ))]) + .build() + } + + /// Takes a table schema and a readable metrics schema built by + /// [`Self::readable_metrics_schema`]. + pub fn new( + data_table_schema: &Schema, + readable_metrics_schema: &StructType, + ) -> Result { + let DataType::Struct(column_fields) = + type_to_arrow_type(&Type::Struct(readable_metrics_schema.clone()))? + else { + return Err(Error::new( + ErrorKind::Unexpected, + "Converted Arrow type was not struct", + )); + }; + + let column_builders = readable_metrics_schema + .fields() + .iter() + .map(|column_metrics_field| { + let fields = column_metrics_field + .field_type + .clone() + .to_struct_type() + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Expected struct type"))? + .fields() + .iter() + .cloned() + .collect_vec(); + let column_metrics_schema = Schema::builder().with_fields(fields).build()?; + let data_field = data_table_schema + .field_by_name(&column_metrics_field.name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "{} in readable metrics schema does not exist in table", + &column_metrics_field.name + ), + ) + })?; + let primitive_type = data_field + .field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + "Readable metrics only supported for primitive types", + ) + })? + .clone(); + + ReadableColumnMetricsStructBuilder::new( + data_field.id, + primitive_type, + column_metrics_schema, + ) + }) + .collect::>>()?; + + Ok(Self { + column_fields, + column_builders, + }) + } + + pub fn append(&mut self, data_file: &DataFile) -> Result<()> { + for column_builder in &mut self.column_builders { + column_builder.append_data_file(data_file)?; + } + Ok(()) + } + + pub fn finish(&mut self) -> StructArray { + let arrays: Vec = self + .column_builders + .iter_mut() + .map::(|builder| Arc::new(builder.finish())) + .collect(); + + let inner_arrays: Vec<(FieldRef, ArrayRef)> = self + .column_fields + .into_iter() + .cloned() + .zip_eq(arrays) + .collect_vec(); + + StructArray::from(inner_arrays) + } +} + +struct ReadableColumnMetricsStructBuilder { + /// Field id of the column in the data table. + field_id: i32, + /// Type of the column in the data table for which these are the metrics. + primitive_type: PrimitiveType, + /// The struct builder for this column's readable metrics. + struct_builder: StructBuilder, +} + +/// Builds a readable metrics struct for a single column. +/// +/// For reference, see [Java][1] and [Python][2] implementations. +/// +/// [1]: https://github.com/apache/iceberg/blob/4a432839233f2343a9eae8255532f911f06358ef/core/src/main/java/org/apache/iceberg/MetricsUtil.java#L337 +/// [2]: https://github.com/apache/iceberg-python/blob/a051584a3684392d2db6556449eb299145d47d15/pyiceberg/table/inspect.py#L101-L110 +impl ReadableColumnMetricsStructBuilder { + /// Return the readable metrics schema for a column of the given data type. + fn schema(field_ids: &mut IncrementingFieldId, data_type: &PrimitiveType) -> Result { + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + field_ids.next_id(), + "column_size", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + field_ids.next_id(), + "value_count", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + field_ids.next_id(), + "null_value_count", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + field_ids.next_id(), + "nan_value_count", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + field_ids.next_id(), + "lower_bound", + Type::Primitive(data_type.clone()), + )), + Arc::new(NestedField::optional( + field_ids.next_id(), + "upper_bound", + Type::Primitive(data_type.clone()), + )), + ]) + .build() + } + + fn new(field_id: i32, primitive_type: PrimitiveType, schema: Schema) -> Result { + Ok(Self { + field_id, + primitive_type, + struct_builder: StructBuilder::from_fields(schema_to_arrow_schema(&schema)?.fields, 0), + }) + } + + fn append_data_file(&mut self, data_file: &DataFile) -> Result<()> { + let column_metrics = Self::get_column_metrics_from_data_file(&self.field_id, data_file); + self.append_column_metrics(column_metrics) + } + + fn get_column_metrics_from_data_file(field_id: &i32, data_file: &DataFile) -> ColumnMetrics { + ColumnMetrics { + column_size: data_file.column_sizes().get(field_id).map(|&v| v as i64), + value_count: data_file.value_counts().get(field_id).map(|&v| v as i64), + null_value_count: data_file + .null_value_counts() + .get(field_id) + .map(|&v| v as i64), + nan_value_count: data_file + .nan_value_counts() + .get(field_id) + .map(|&v| v as i64), + lower_bound: data_file.lower_bounds().get(field_id).cloned(), + upper_bound: data_file.upper_bounds().get(field_id).cloned(), + } + } + + fn append_column_metrics(&mut self, column_metrics: ColumnMetrics) -> Result<()> { + let ColumnMetrics { + column_size, + value_count, + null_value_count, + nan_value_count, + lower_bound, + upper_bound, + } = column_metrics; + + self.field_builder::(0) + .append_option(column_size); + self.field_builder::(1) + .append_option(value_count); + self.field_builder::(2) + .append_option(null_value_count); + self.field_builder::(3) + .append_option(nan_value_count); + self.append_bounds(4, lower_bound)?; + self.append_bounds(5, upper_bound)?; + self.struct_builder.append(true); + Ok(()) + } + + fn append_bounds(&mut self, index: usize, datum: Option) -> Result<()> { + let datum = datum.map(|datum| get_arrow_datum(&datum)).transpose()?; + let array = if let Some(datum) = &datum { + let (array, is_scalar) = datum.get(); + if is_scalar { + Some(array) + } else { + return Err(Error::new( + ErrorKind::Unexpected, + "Can only append scalar datum", + )); + } + } else { + None + }; + + match self.primitive_type { + PrimitiveType::Boolean => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_boolean().value(0))); + } + PrimitiveType::Int => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_primitive::().value(0))); + } + PrimitiveType::Long => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_primitive::().value(0))); + } + PrimitiveType::Float => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_primitive::().value(0))); + } + PrimitiveType::Double => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_primitive::().value(0))); + } + PrimitiveType::Date => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_primitive::().value(0))); + } + PrimitiveType::Time | PrimitiveType::Timestamp | PrimitiveType::Timestamptz => { + self.field_builder::(index) + .append_option( + array + .map(|array| array.as_primitive::().value(0)), + ); + } + PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs => { + self.field_builder::(index) + .append_option( + array.map(|array| array.as_primitive::().value(0)), + ); + } + PrimitiveType::String => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_string::().value(0))); + } + PrimitiveType::Binary => { + self.field_builder::(index) + .append_option(array.map(|array| array.as_binary::().value(0))); + } + PrimitiveType::Decimal { .. } => { + self.field_builder::(index) + .append_option( + array.map(|array| array.as_primitive::().value(0)), + ); + } + PrimitiveType::Fixed(len) => { + if len.to_i32().is_some() { + let builder = self.field_builder::(index); + // FixedSizeBinaryBuilder does not have append_option + match array { + Some(array) => { + builder.append_value(array.as_fixed_size_binary().value(0))?; + } + None => { + builder.append_null(); + } + } + } else { + self.field_builder::(index) + .append_option(array.map(|array| array.as_binary::().value(0))); + } + } + PrimitiveType::Uuid => { + let builder = self.field_builder::(index); + // FixedSizeBinaryBuilder does not have append_option + match array { + Some(array) => { + builder.append_value(array.as_fixed_size_binary().value(0))?; + } + None => { + builder.append_null(); + } + } + } + }; + Ok(()) + } + + fn finish(&mut self) -> StructArray { + self.struct_builder.finish() + } + + // Shorthand to select a field builder with a specific type. + fn field_builder(&mut self, index: usize) -> &mut T { + match self.struct_builder.field_builder::(index) { + Some(builder) => builder, + None => panic!( + "Field builder not found for index {index} and type {}", + type_name::(), + ), + } + } +} + +/// Helper to serve incrementing field ids. +struct IncrementingFieldId(i32); + +impl IncrementingFieldId { + fn next_id(&mut self) -> i32 { + let current = self.0; + self.0 += 1; + current + } +} diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs index b64420ea11..5e3f15d567 100644 --- a/crates/iceberg/src/inspect/mod.rs +++ b/crates/iceberg/src/inspect/mod.rs @@ -17,10 +17,13 @@ //! Metadata table APIs. +mod entries; mod manifests; mod metadata_table; +mod metrics; mod snapshots; +pub use entries::EntriesTable; pub use manifests::ManifestsTable; pub use metadata_table::*; pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 1ee89963d6..f5703b0437 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -130,59 +130,14 @@ mod tests { Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], expect![[r#" - committed_at: PrimitiveArray - [ - 2018-01-04T21:22:35.770+00:00, - 2019-04-12T20:29:15.770+00:00, - ], - snapshot_id: PrimitiveArray - [ - 3051729675574597004, - 3055729675574597004, - ], - parent_id: PrimitiveArray - [ - null, - 3051729675574597004, - ], - operation: StringArray - [ - "append", - "append", - ], - manifest_list: (skipped), - summary: MapArray - [ - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - ]"#]], + +--------------------------+---------------------+---------------------+-----------+---------+ + | committed_at | snapshot_id | parent_id | operation | summary | + +--------------------------+---------------------+---------------------+-----------+---------+ + | 2018-01-04T21:22:35.770Z | 3051729675574597004 | | append | {} | + | 2019-04-12T20:29:15.770Z | 3055729675574597004 | 3051729675574597004 | append | {} | + +--------------------------+---------------------+---------------------+-----------+---------+"#]], &["manifest_list"], + &[], Some("committed_at"), ).await; } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index bfa1266dd0..b2b01eba5f 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1123,8 +1123,11 @@ pub mod tests { use std::sync::Arc; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, + TimestampMicrosecondArray, TimestampNanosecondArray, }; + use arrow_schema::{DataType, TimeUnit}; use futures::{stream, TryStreamExt}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; @@ -1284,6 +1287,49 @@ pub mod tests { .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) + // Note: The bounds below need to agree with the test data written + // into the Parquet file below. If not, tests that rely on filter + // scans will fail because of wrong bounds. + .lower_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(2)), + (3, Datum::long(3)), + (4, Datum::string("Apache")), + (5, Datum::double(100)), + (6, Datum::int(100)), + (7, Datum::long(100)), + (8, Datum::bool(false)), + (9, Datum::float(100.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .upper_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(5)), + (3, Datum::long(4)), + (4, Datum::string("Iceberg")), + (5, Datum::double(200)), + (6, Datum::int(200)), + (7, Datum::long(200)), + (8, Datum::bool(true)), + (9, Datum::float(200.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .column_sizes(HashMap::from([(1, 1u64), (2, 1u64)])) + .value_counts(HashMap::from([(1, 2u64), (2, 2u64)])) + .null_value_counts(HashMap::from([(1, 3u64), (2, 3u64)])) + .nan_value_counts(HashMap::from([(1, 4u64), (2, 4u64)])) .build() .unwrap(), ) @@ -1392,6 +1438,69 @@ pub mod tests { PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(), )])), + arrow_schema::Field::new("float", arrow_schema::DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "9".to_string(), + )])), + arrow_schema::Field::new( + "decimal", + arrow_schema::DataType::Decimal128(3, 2), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + arrow_schema::Field::new("date", arrow_schema::DataType::Date32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + arrow_schema::Field::new( + "timestamp", + arrow_schema::DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + arrow_schema::Field::new( + "timestamptz", + arrow_schema::DataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".into()), + ), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + arrow_schema::Field::new( + "timestampns", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "14".to_string(), + )])), + arrow_schema::Field::new( + "timestamptzns", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "15".to_string(), + )])), + arrow_schema::Field::new("binary", arrow_schema::DataType::LargeBinary, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "16".to_string(), + )])), ]; Arc::new(arrow_schema::Schema::new(fields)) }; @@ -1441,8 +1550,54 @@ pub mod tests { let values: BooleanArray = values.into(); let col8 = Arc::new(values) as ArrayRef; + // float: + let mut values = vec![100.0f32; 512]; + values.append(vec![150.0f32; 12].as_mut()); + values.append(vec![200.0f32; 500].as_mut()); + let col9 = Arc::new(Float32Array::from_iter_values(values)) as ArrayRef; + + // decimal: + let values = vec![123i128; 1024]; + let col10 = Arc::new( + Decimal128Array::from_iter_values(values) + .with_data_type(DataType::Decimal128(3, 2)), + ); + + // date: + let values = vec![0i32; 1024]; + let col11 = Arc::new(Date32Array::from_iter_values(values)); + + // timestamp: + let values = vec![0i64; 1024]; + let col12 = Arc::new(TimestampMicrosecondArray::from_iter_values(values)); + + // timestamptz: + let values = vec![0i64; 1024]; + let col13 = Arc::new( + TimestampMicrosecondArray::from_iter_values(values).with_data_type( + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ), + ); + + // timestampns: + let values = vec![0i64; 1024]; + let col14 = Arc::new(TimestampNanosecondArray::from_iter_values(values)); + + // timestamptzns: + let values = vec![0i64; 1024]; + let col15 = Arc::new( + TimestampNanosecondArray::from_iter_values(values).with_data_type( + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + ), + ); + + // binary: + let values = vec![[0u8; 8]; 1024]; + let col16 = Arc::new(LargeBinaryArray::from_iter_values(values)); + let to_write = RecordBatch::try_new(schema.clone(), vec![ - col1, col2, col3, col4, col5, col6, col7, col8, + col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, + col14, col15, col16, ]) .unwrap(); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index c82f353fe1..4f5f5e2991 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -561,7 +561,7 @@ impl ManifestWriter { } /// This is a helper module that defines the schema field of the manifest list entry. -mod _const_schema { +pub(crate) mod _const_schema { use std::sync::Arc; use apache_avro::Schema as AvroSchema; @@ -862,7 +862,7 @@ mod _const_schema { }) }; - fn data_file_fields_v2(partition_type: &StructType) -> Vec { + pub(crate) fn data_file_fields_v2(partition_type: &StructType) -> Vec { vec![ CONTENT.clone(), FILE_PATH.clone(), @@ -894,8 +894,8 @@ mod _const_schema { schema_to_avro_schema("data_file", &schema) } - pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result { - let fields = vec![ + pub(crate) fn manifest_schema_fields_v2(partition_type: &StructType) -> Vec { + vec![ STATUS.clone(), SNAPSHOT_ID_V2.clone(), SEQUENCE_NUMBER.clone(), @@ -905,8 +905,13 @@ mod _const_schema { "data_file", Type::Struct(StructType::new(data_file_fields_v2(partition_type))), )), - ]; - let schema = Schema::builder().with_fields(fields).build()?; + ] + } + + pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result { + let schema = Schema::builder() + .with_fields(manifest_schema_fields_v2(partition_type)) + .build()?; schema_to_avro_schema("manifest_entry", &schema) } @@ -941,8 +946,8 @@ mod _const_schema { schema_to_avro_schema("data_file", &schema) } - pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result { - let fields = vec![ + fn manifest_schema_fields_v1(partition_type: &StructType) -> Vec { + vec![ STATUS.clone(), SNAPSHOT_ID_V1.clone(), Arc::new(NestedField::required( @@ -950,8 +955,13 @@ mod _const_schema { "data_file", Type::Struct(StructType::new(data_file_fields_v1(partition_type))), )), - ]; - let schema = Schema::builder().with_fields(fields).build()?; + ] + } + + pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result { + let schema = Schema::builder() + .with_fields(manifest_schema_fields_v1(partition_type)) + .build()?; schema_to_avro_schema("manifest_entry", &schema) } } @@ -1196,6 +1206,12 @@ impl ManifestEntry { self.sequence_number } + /// File sequence number. + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// File size in bytes. #[inline] pub fn file_size_in_bytes(&self) -> u64 { diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index e6405be4c0..b2c7fdd500 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -18,13 +18,16 @@ /*! * Partitioning */ + +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::transform::Transform; -use super::{NestedField, Schema, SchemaRef, StructType}; +use super::{NestedField, Schema, SchemaRef, StructType, TableMetadata, Type}; use crate::{Error, ErrorKind, Result}; pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; @@ -616,12 +619,12 @@ trait CorePartitionSpecValidator { if let Some(collision) = collision { Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", - source_id, transform.dedup_name(), collision.name - ), - )) + ErrorKind::DataInvalid, + format!( + "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", + source_id, transform.dedup_name(), collision.name + ), + )) } else { Ok(()) } @@ -657,10 +660,130 @@ impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { } } +/// Builds a unified partition type considering all specs in the table. +/// +/// Based on Iceberg Java's [`Partitioning#partitionType`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/7e0cd3fa1e51d3c80f6c8cff23a03dca86f942fa/core/src/main/java/org/apache/iceberg/Partitioning.java#L240 +pub(crate) fn partition_type(table_metadata: &TableMetadata) -> Result { + let partition_spec = table_metadata.partition_specs_iter().cloned().collect_vec(); + let all_field_ids = all_field_ids(&partition_spec); + + build_partition_projection_type( + table_metadata.current_schema(), + partition_spec, + all_field_ids, + ) +} + +// Based on Iceberg Java's [`Partitioning#buildPartitionProjectionType`][1]. +// +// [1]:https://github.com/apache/iceberg/blob/apache-iceberg-1.8.0/core/src/main/java/org/apache/iceberg/Partitioning.java#L255 +fn build_partition_projection_type( + schema: &Schema, + specs: Vec, + projected_field_ids: HashSet, +) -> Result { + // Check for unknown transforms because we cannot know the output type + for spec in &specs { + for field in &spec.fields { + if field.transform == Transform::Unknown { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unknown transform in partition spec: {field:?}",), + )); + } + } + } + + let mut field_map: HashMap = HashMap::new(); + let mut type_map: HashMap = HashMap::new(); + let mut name_map: HashMap = HashMap::new(); + + // Sort specs by ID in descending order to get latest field names + let sorted_specs = specs + .iter() + .sorted_by_key(|spec| spec.spec_id()) + .rev() + .collect_vec(); + + for spec in sorted_specs { + for field in spec.fields() { + let field_id = field.field_id; + + if !projected_field_ids.contains(&field_id) { + continue; + } + + let partition_type = spec.partition_type(schema)?; + let struct_field = partition_type.field_by_id(field_id).unwrap(); + let existing_field = field_map.get(&field_id); + + match existing_field { + None => { + field_map.insert(field_id, field.clone()); + type_map.insert(field_id, struct_field.field_type.as_ref().clone()); + name_map.insert(field_id, struct_field.name.clone()); + } + Some(existing_field) => { + // verify the fields are compatible as they may conflict in v1 tables + if !equivalent_ignoring_name(existing_field, field) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Conflicting partition fields: ['{existing_field:?}', '{field:?}']", + ), + )); + } + + // use the correct type for dropped partitions in v1 tables + if is_void_transform(existing_field) && !is_void_transform(field) { + field_map.insert(field_id, field.clone()); + type_map.insert(field_id, struct_field.field_type.as_ref().clone()); + } + } + } + } + } + + let sorted_struct_fields = field_map + .into_keys() + .sorted() + .map(|field_id| { + NestedField::optional(field_id, &name_map[&field_id], type_map[&field_id].clone()) + }) + .map(Arc::new) + .collect_vec(); + + Ok(StructType::new(sorted_struct_fields)) +} + +fn is_void_transform(field: &PartitionField) -> bool { + field.transform == Transform::Void +} + +fn equivalent_ignoring_name(field: &PartitionField, another_field: &PartitionField) -> bool { + field.field_id == another_field.field_id + && field.source_id == another_field.source_id + && compatible_transforms(field.transform, another_field.transform) +} + +fn compatible_transforms(t1: Transform, t2: Transform) -> bool { + t1 == t2 || t1 == Transform::Void || t2 == Transform::Void +} + +// Collects IDs of all partition field used across specs +fn all_field_ids(vec: &[PartitionSpecRef]) -> HashSet { + vec.iter() + .flat_map(|partition_spec| partition_spec.fields()) + .map(|partition_field| partition_field.field_id) + .collect() +} + #[cfg(test)] mod tests { use super::*; - use crate::spec::{PrimitiveType, Type}; + use crate::spec::{FormatVersion, PrimitiveType, SortOrder, TableMetadataBuilder, Type}; #[test] fn test_partition_spec() { @@ -1733,4 +1856,172 @@ mod tests { assert_eq!(1002, spec.fields[1].field_id); assert!(!spec.has_sequential_ids()); } + + #[test] + fn test_combine_partition_type() -> Result<()> { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "ts", Type::Primitive(PrimitiveType::Timestamp)).into(), + ]) + .build()?; + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec(), + SortOrder::unsorted_order(), + "my_location".to_string(), + FormatVersion::V2, + HashMap::new(), + )? + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_fields(vec![ + UnboundPartitionField { + source_id: 1, + field_id: Some(1001), + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 1, + field_id: Some(1002), + name: "id_truncate".to_string(), + transform: Transform::Truncate(4), + }, + ])? + .build(), + )? + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(2) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 3, + field_id: Some(1000), + name: "ts_day".to_string(), + transform: Transform::Day, + }])? + .build(), + )? + // Spec id 3 overrides a partition field name with the same id + // We'll later assert that the new name is used instead of the old one + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(3) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 3, + field_id: Some(1000), + name: "ts_day_overridden".to_string(), + transform: Transform::Day, + }])? + .build(), + )? + // Add a void transform + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(4) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 2, + field_id: Some(9999), + name: "name_partition".to_string(), + transform: Transform::Void, + }])? + .build(), + )? + // Newer partition fields can override partition void fields + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(5) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 2, + field_id: Some(9999), + name: "name_partition".to_string(), + transform: Transform::Identity, + }])? + .build(), + )? + .build()? + .metadata; + + assert_eq!( + partition_type(&metadata)?, + // Assert that fields are sorted + StructType::new(vec![ + NestedField::optional( + 1000, + "ts_day_overridden", + Type::Primitive(PrimitiveType::Date), + ) + .into(), + NestedField::optional(1001, "id_bucket", Type::Primitive(PrimitiveType::Int),) + .into(), + NestedField::optional(1002, "id_truncate", Type::Primitive(PrimitiveType::Int),) + .into(), + NestedField::optional( + 9999, + "name_partition", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ]) + ); + + Ok(()) + } + + #[test] + fn test_combine_partition_type_incompatible_specs() -> Result<()> { + let metadata = TableMetadataBuilder::new( + Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build()?, + PartitionSpec::unpartition_spec(), + SortOrder::unsorted_order(), + "my_location".to_string(), + FormatVersion::V2, + HashMap::new(), + )? + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 1, + field_id: Some(2), + name: "id_bucket".to_string(), + transform: Transform::Bucket(4), + }])? + .build(), + )? + // Change the partition field incompatibly + .add_partition_spec( + UnboundPartitionSpec::builder() + .with_spec_id(2) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 1, + field_id: Some(2), + name: "id_bucket".to_string(), + // Change bucket[4] to bucket[8] + transform: Transform::Bucket(8), + }])? + .build(), + )? + .build()? + .metadata; + + let result = partition_type(&metadata); + + assert!(result + .unwrap_err() + .to_string() + .contains("Conflicting partition fields")); + + Ok(()) + } } diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index 17bbd7d99a..8ac9377511 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -25,7 +25,15 @@ {"id": 5, "name": "dbl", "required": true, "type": "double"}, {"id": 6, "name": "i32", "required": true, "type": "int"}, {"id": 7, "name": "i64", "required": true, "type": "long"}, - {"id": 8, "name": "bool", "required": true, "type": "boolean"} + {"id": 8, "name": "bool", "required": true, "type": "boolean"}, + {"id": 9, "name": "float", "required": true, "type": "float"}, + {"id": 10, "name": "decimal", "required": true, "type": "decimal(3,2)"}, + {"id": 11, "name": "date", "required": true, "type": "date"}, + {"id": 12, "name": "timestamp", "required": true, "type": "timestamp"}, + {"id": 13, "name": "timestamptz", "required": true, "type": "timestamptz"}, + {"id": 14, "name": "timestampns", "required": true, "type": "timestamp_ns"}, + {"id": 15, "name": "timestamptzns", "required": true, "type": "timestamptz_ns"}, + {"id": 16, "name": "binary", "required": true, "type": "binary"} ] } ],