Skip to content

Commit d6d856d

Browse files
committed
refactor: consolidate parquet stat min/max parsing in one place
1 parent 6568945 commit d6d856d

File tree

5 files changed

+148
-219
lines changed

5 files changed

+148
-219
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ once_cell = "1"
7272
opendal = "0.49"
7373
ordered-float = "4"
7474
parquet = "52"
75+
paste = "1"
7576
pilota = "0.11.2"
7677
pretty_assertions = "1.4"
7778
port_scanner = "0.1.5"
79+
rand = "0.8"
7880
regex = "1.10.5"
7981
reqwest = { version = "0.12", default-features = false, features = ["json"] }
8082
rust_decimal = "1.31"

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ once_cell = { workspace = true }
6565
opendal = { workspace = true }
6666
ordered-float = { workspace = true }
6767
parquet = { workspace = true, features = ["async"] }
68+
paste = { workspace = true }
6869
reqwest = { workspace = true }
6970
rust_decimal = { workspace = true }
7071
serde = { workspace = true }
@@ -83,6 +84,6 @@ ctor = { workspace = true }
8384
iceberg-catalog-memory = { workspace = true }
8485
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
8586
pretty_assertions = { workspace = true }
86-
rand = "0.8"
87+
rand = { workspace = true }
8788
tempfile = { workspace = true }
8889
tera = { workspace = true }

crates/iceberg/src/arrow/schema.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use arrow_array::{
3030
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
3131
use bitvec::macros::internal::funty::Fundamental;
3232
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
33+
use parquet::file::statistics::Statistics;
3334
use rust_decimal::prelude::ToPrimitive;
35+
use uuid::Uuid;
3436

3537
use crate::error::Result;
3638
use crate::spec::{
@@ -645,6 +647,101 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
645647
}
646648
}
647649

650+
macro_rules! get_parquet_stat_as_datum {
651+
($limit_type:ident) => {
652+
paste::paste! {
653+
/// Gets the $limit_type value from a parquet Statistics struct, as a Datum
654+
pub(crate) fn [<get_parquet_stat_ $limit_type _as_datum>](
655+
primitive_type: &PrimitiveType, stats: &Statistics
656+
) -> Result<Option<Datum>> {
657+
Ok(Some(match (primitive_type, stats) {
658+
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.$limit_type()),
659+
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.$limit_type()),
660+
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.$limit_type()),
661+
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.$limit_type()),
662+
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.$limit_type())?,
663+
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
664+
Datum::timestamp_micros(*stats.$limit_type())
665+
}
666+
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
667+
Datum::timestamptz_micros(*stats.$limit_type())
668+
}
669+
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.$limit_type()),
670+
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.$limit_type()),
671+
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
672+
Datum::string(stats.$limit_type().as_utf8()?)
673+
}
674+
(PrimitiveType::Decimal {
675+
precision: _,
676+
scale: _,
677+
}, Statistics::ByteArray(stats)) => {
678+
Datum::new(
679+
primitive_type.clone(),
680+
PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type _bytes>]().try_into()?)),
681+
)
682+
}
683+
(
684+
PrimitiveType::Decimal {
685+
precision: _,
686+
scale: _,
687+
},
688+
Statistics::Int32(stats)) => {
689+
Datum::new(
690+
primitive_type.clone(),
691+
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
692+
)
693+
}
694+
695+
(
696+
PrimitiveType::Decimal {
697+
precision: _,
698+
scale: _,
699+
},
700+
Statistics::Int64(stats),
701+
) => {
702+
Datum::new(
703+
primitive_type.clone(),
704+
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
705+
)
706+
}
707+
(PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
708+
let raw = stats.[<$limit_type _bytes>]();
709+
if raw.len() != 16 {
710+
return Err(Error::new(
711+
ErrorKind::Unexpected,
712+
"Invalid length of uuid bytes.",
713+
));
714+
}
715+
Datum::uuid(Uuid::from_bytes(
716+
raw[..16].try_into().unwrap(),
717+
))
718+
}
719+
(PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
720+
let raw = stat.[<$limit_type _bytes>]();
721+
if raw.len() != *len as usize {
722+
return Err(Error::new(
723+
ErrorKind::Unexpected,
724+
"Invalid length of fixed bytes.",
725+
));
726+
}
727+
Datum::fixed(raw.to_vec())
728+
}
729+
(PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
730+
Datum::binary(stat.[<$limit_type _bytes>]().to_vec())
731+
}
732+
_ => {
733+
return Ok(None);
734+
}
735+
}))
736+
}
737+
}
738+
}
739+
}
740+
741+
get_parquet_stat_as_datum!(min);
742+
743+
get_parquet_stat_as_datum!(max);
744+
648745
impl TryFrom<&ArrowSchema> for crate::spec::Schema {
649746
type Error = Error;
650747

crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use fnv::FnvHashSet;
2323
use parquet::file::metadata::RowGroupMetaData;
2424
use parquet::file::statistics::Statistics;
2525

26+
use crate::arrow::{get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum};
2627
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
2728
use crate::expr::{BoundPredicate, BoundReference};
2829
use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
@@ -144,35 +145,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
144145
return Ok(None);
145146
}
146147

147-
Ok(Some(match (primitive_type, stats) {
148-
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.min()),
149-
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.min()),
150-
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.min()),
151-
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.min()),
152-
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.min())?,
153-
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
154-
Datum::timestamp_micros(*stats.min())
155-
}
156-
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
157-
Datum::timestamptz_micros(*stats.min())
158-
}
159-
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.min()),
160-
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.min()),
161-
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
162-
Datum::string(stats.min().as_utf8()?)
163-
}
164-
// TODO:
165-
// * Decimal
166-
// * Uuid
167-
// * Fixed
168-
// * Binary
169-
(primitive_type, _) => {
170-
return Err(Error::new(
171-
ErrorKind::FeatureUnsupported,
172-
format!("Conversion of min value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type)
173-
));
174-
}
175-
}))
148+
get_parquet_stat_min_as_datum(&primitive_type, stats)
176149
}
177150

178151
fn max_value(&self, field_id: i32) -> Result<Option<Datum>> {
@@ -184,35 +157,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
184157
return Ok(None);
185158
}
186159

187-
Ok(Some(match (primitive_type, stats) {
188-
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.max()),
189-
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.max()),
190-
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.max()),
191-
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.max()),
192-
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.max())?,
193-
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
194-
Datum::timestamp_micros(*stats.max())
195-
}
196-
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
197-
Datum::timestamptz_micros(*stats.max())
198-
}
199-
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.max()),
200-
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.max()),
201-
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
202-
Datum::string(stats.max().as_utf8()?)
203-
}
204-
// TODO:
205-
// * Decimal
206-
// * Uuid
207-
// * Fixed
208-
// * Binary
209-
(primitive_type, _) => {
210-
return Err(Error::new(
211-
ErrorKind::FeatureUnsupported,
212-
format!("Conversion of max value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type)
213-
));
214-
}
215-
}))
160+
get_parquet_stat_max_as_datum(&primitive_type, stats)
216161
}
217162

218163
fn visit_inequality(

0 commit comments

Comments
 (0)