diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 91dfe85e9c..b590c8bc8c 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -29,6 +29,7 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; +use num_bigint::BigInt; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; @@ -739,9 +740,15 @@ macro_rules! get_parquet_stat_as_datum { let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { return Ok(None); }; + let unscaled_value = BigInt::from_signed_bytes_be(bytes); Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't convert bytes to i128: {:?}", bytes), + ) + })?), )) } ( diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c0..5561b19134 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -478,15 +478,18 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + Array, ArrayRef, BooleanArray, Decimal128Array, Int32Array, Int64Array, ListArray, + RecordBatch, StructArray, }; use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use rust_decimal::Decimal; use tempfile::TempDir; use uuid::Uuid; use super::*; + use crate::arrow::schema_to_arrow_schema; use crate::io::FileIOBuilder; use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; @@ -1169,4 +1172,245 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_decimal_bound() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let loccation_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // test 1.1 and 2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(22000000000), Some(11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test -1.1 and -2.2 + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 28, + scale: 10, + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + loccation_gen.clone(), + file_name_gen.clone(), + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)]) + .with_data_type(DataType::Decimal128(28, 10)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-11000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal_with_precision(Decimal::new(-22000000000_i64, 10), 28).unwrap()) + .as_ref() + ); + + // test max and min of rust_decimal + let decimal_max = Decimal::MAX; + let decimal_min = Decimal::MIN; + assert_eq!(decimal_max.scale(), decimal_min.scale()); + let schema = Arc::new( + Schema::builder() + .with_fields(vec![NestedField::optional( + 0, + "decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: decimal_max.scale(), + }), + ) + .into()]) + .build() + .unwrap(), + ); + let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema, + file_io.clone(), + loccation_gen, + file_name_gen, + ) + .build() + .await?; + let col0 = Arc::new( + Decimal128Array::from(vec![ + Some(decimal_max.mantissa()), + Some(decimal_min.mantissa()), + ]) + .with_data_type(DataType::Decimal128(38, 0)), + ) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + pw.write(&to_write).await?; + let res = pw.close().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .content(crate::spec::DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + assert_eq!( + data_file.upper_bounds().get(&0), + Some(Datum::decimal(decimal_max).unwrap()).as_ref() + ); + assert_eq!( + data_file.lower_bounds().get(&0), + Some(Datum::decimal(decimal_min).unwrap()).as_ref() + ); + + // test max and min for scale 38 + // # TODO + // Readd this case after resolve https://github.com/apache/iceberg-rust/issues/669 + // let schema = Arc::new( + // Schema::builder() + // .with_fields(vec![NestedField::optional( + // 0, + // "decimal", + // Type::Primitive(PrimitiveType::Decimal { + // precision: 38, + // scale: 0, + // }), + // ) + // .into()]) + // .build() + // .unwrap(), + // ); + // let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + // let mut pw = ParquetWriterBuilder::new( + // WriterProperties::builder().build(), + // schema, + // file_io.clone(), + // loccation_gen, + // file_name_gen, + // ) + // .build() + // .await?; + // let col0 = Arc::new( + // Decimal128Array::from(vec![ + // Some(99999999999999999999999999999999999999_i128), + // Some(-99999999999999999999999999999999999999_i128), + // ]) + // .with_data_type(DataType::Decimal128(38, 0)), + // ) as ArrayRef; + // let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col0]).unwrap(); + // pw.write(&to_write).await?; + // let res = pw.close().await?; + // assert_eq!(res.len(), 1); + // let data_file = res + // .into_iter() + // .next() + // .unwrap() + // .content(crate::spec::DataContentType::Data) + // .partition(Struct::empty()) + // .build() + // .unwrap(); + // assert_eq!( + // data_file.upper_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + // assert_eq!( + // data_file.lower_bounds().get(&0), + // Some(Datum::new( + // PrimitiveType::Decimal { + // precision: 38, + // scale: 0 + // }, + // PrimitiveLiteral::Int128(-99999999999999999999999999999999999999_i128) + // )) + // .as_ref() + // ); + + Ok(()) + } }