Skip to content

Commit d6d6fbe

Browse files
committed
Support 'entries' metadata table
1 parent e4ca871 commit d6d6fbe

File tree

11 files changed

+1121
-139
lines changed

11 files changed

+1121
-139
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ arrow-schema = { workspace = true }
5353
arrow-select = { workspace = true }
5454
arrow-string = { workspace = true }
5555
async-std = { workspace = true, optional = true, features = ["attributes"] }
56+
async-stream = { workspace = true }
5657
async-trait = { workspace = true }
5758
bimap = { workspace = true }
5859
bitvec = { workspace = true }
@@ -86,6 +87,7 @@ uuid = { workspace = true }
8687
zstd = { workspace = true }
8788

8889
[dev-dependencies]
90+
arrow-cast = { workspace = true, features = ["prettyprint"] }
8991
ctor = { workspace = true }
9092
expect-test = { workspace = true }
9193
iceberg-catalog-memory = { workspace = true }

crates/iceberg/src/arrow/schema.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,193 @@ get_parquet_stat_as_datum!(min);
827827

828828
get_parquet_stat_as_datum!(max);
829829

830+
/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
831+
pub(crate) mod builder {
832+
use arrow_array::builder::*;
833+
use arrow_array::cast::AsArray;
834+
use arrow_array::types::*;
835+
use arrow_array::{ArrayRef, Datum as ArrowDatum};
836+
use arrow_schema::{DataType, TimeUnit};
837+
use ordered_float::OrderedFloat;
838+
839+
use crate::spec::{Literal, PrimitiveLiteral};
840+
use crate::{Error, ErrorKind};
841+
842+
/// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at
843+
/// compile-time when types are determined dynamically (e.g. based on some column type).
844+
/// A [DataType] is given at construction time which is used to later downcast the inner array
845+
/// and provided values.
846+
pub(crate) struct AnyArrayBuilder {
847+
data_type: DataType,
848+
inner: Box<dyn ArrayBuilder>,
849+
}
850+
851+
impl AnyArrayBuilder {
852+
pub(crate) fn new(data_type: &DataType) -> Self {
853+
Self {
854+
data_type: data_type.clone(),
855+
inner: make_builder(data_type, 0),
856+
}
857+
}
858+
859+
pub(crate) fn finish(&mut self) -> ArrayRef {
860+
self.inner.finish()
861+
}
862+
863+
/// Append an [[arrow_array::Datum]] value.
864+
pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> {
865+
let (array, is_scalar) = value.get();
866+
assert!(is_scalar, "Can only append scalar datum");
867+
868+
match array.data_type() {
869+
DataType::Boolean => self
870+
.builder::<BooleanBuilder>()?
871+
.append_value(array.as_boolean().value(0)),
872+
DataType::Int32 => self
873+
.builder::<Int32Builder>()?
874+
.append_value(array.as_primitive::<Int32Type>().value(0)),
875+
DataType::Int64 => self
876+
.builder::<Int64Builder>()?
877+
.append_value(array.as_primitive::<Int64Type>().value(0)),
878+
DataType::Float32 => self
879+
.builder::<Float32Builder>()?
880+
.append_value(array.as_primitive::<Float32Type>().value(0)),
881+
DataType::Float64 => self
882+
.builder::<Float64Builder>()?
883+
.append_value(array.as_primitive::<Float64Type>().value(0)),
884+
DataType::Decimal128(_, _) => self
885+
.builder::<Decimal128Builder>()?
886+
.append_value(array.as_primitive::<Decimal128Type>().value(0)),
887+
DataType::Date32 => self
888+
.builder::<Date32Builder>()?
889+
.append_value(array.as_primitive::<Date32Type>().value(0)),
890+
DataType::Time64(TimeUnit::Microsecond) => self
891+
.builder::<Time64MicrosecondBuilder>()?
892+
.append_value(array.as_primitive::<Time64MicrosecondType>().value(0)),
893+
DataType::Timestamp(TimeUnit::Microsecond, _) => self
894+
.builder::<TimestampMicrosecondBuilder>()?
895+
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
896+
DataType::Timestamp(TimeUnit::Nanosecond, _) => self
897+
.builder::<TimestampNanosecondBuilder>()?
898+
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),
899+
DataType::Utf8 => self
900+
.builder::<StringBuilder>()?
901+
.append_value(array.as_string::<i32>().value(0)),
902+
DataType::FixedSizeBinary(_) => self
903+
.builder::<BinaryBuilder>()?
904+
.append_value(array.as_fixed_size_binary().value(0)),
905+
DataType::LargeBinary => self
906+
.builder::<LargeBinaryBuilder>()?
907+
.append_value(array.as_binary::<i64>().value(0)),
908+
_ => {
909+
return Err(Error::new(
910+
ErrorKind::FeatureUnsupported,
911+
format!("Cannot append data type: {:?}", array.data_type(),),
912+
));
913+
}
914+
}
915+
Ok(())
916+
}
917+
918+
/// Append a literal with the provided [DataType]. We're not solely relying on the literal to
919+
/// infer the type because [Literal] values do not specify the expected type of builder. E.g.,
920+
/// a [PrimitiveLiteral::Long] may go into an array builder for longs but also for timestamps.
921+
pub(crate) fn append_literal(&mut self, value: &Literal) -> crate::Result<()> {
922+
let Some(primitive) = value.as_primitive_literal() else {
923+
return Err(Error::new(
924+
ErrorKind::FeatureUnsupported,
925+
"Expected primitive type",
926+
));
927+
};
928+
929+
match (&self.data_type, primitive.clone()) {
930+
(DataType::Boolean, PrimitiveLiteral::Boolean(value)) => {
931+
self.builder::<BooleanBuilder>()?.append_value(value)
932+
}
933+
(DataType::Int32, PrimitiveLiteral::Int(value)) => {
934+
self.builder::<Int32Builder>()?.append_value(value)
935+
}
936+
(DataType::Int64, PrimitiveLiteral::Long(value)) => {
937+
self.builder::<Int64Builder>()?.append_value(value)
938+
}
939+
(DataType::Float32, PrimitiveLiteral::Float(OrderedFloat(value))) => {
940+
self.builder::<Float32Builder>()?.append_value(value)
941+
}
942+
(DataType::Float64, PrimitiveLiteral::Double(OrderedFloat(value))) => {
943+
self.builder::<Float64Builder>()?.append_value(value)
944+
}
945+
(DataType::Utf8, PrimitiveLiteral::String(value)) => {
946+
self.builder::<StringBuilder>()?.append_value(value)
947+
}
948+
(DataType::FixedSizeBinary(_), PrimitiveLiteral::Binary(value)) => self
949+
.builder::<FixedSizeBinaryBuilder>()?
950+
.append_value(value)?,
951+
(DataType::LargeBinary, PrimitiveLiteral::Binary(value)) => {
952+
self.builder::<LargeBinaryBuilder>()?.append_value(value)
953+
}
954+
(_, _) => {
955+
return Err(Error::new(
956+
ErrorKind::FeatureUnsupported,
957+
format!(
958+
"Builder of type {:?} does not accept literal {:?}",
959+
self.data_type, primitive
960+
),
961+
));
962+
}
963+
}
964+
965+
Ok(())
966+
}
967+
968+
/// Append a null value for the provided [DataType].
969+
pub(crate) fn append_null(&mut self) -> crate::Result<()> {
970+
match self.data_type {
971+
DataType::Boolean => self.builder::<BooleanBuilder>()?.append_null(),
972+
DataType::Int32 => self.builder::<Int32Builder>()?.append_null(),
973+
DataType::Int64 => self.builder::<Int64Builder>()?.append_null(),
974+
DataType::Float32 => self.builder::<Float32Builder>()?.append_null(),
975+
DataType::Float64 => self.builder::<Float64Builder>()?.append_null(),
976+
DataType::Decimal128(_, _) => self.builder::<Decimal128Builder>()?.append_null(),
977+
DataType::Date32 => self.builder::<Date32Builder>()?.append_null(),
978+
DataType::Time64(TimeUnit::Microsecond) => {
979+
self.builder::<Time64MicrosecondBuilder>()?.append_null()
980+
}
981+
DataType::Timestamp(TimeUnit::Microsecond, _) => {
982+
self.builder::<TimestampMicrosecondBuilder>()?.append_null()
983+
}
984+
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
985+
self.builder::<TimestampNanosecondBuilder>()?.append_null()
986+
}
987+
DataType::Utf8 => self.builder::<StringBuilder>()?.append_null(),
988+
DataType::FixedSizeBinary(_) => {
989+
self.builder::<FixedSizeBinaryBuilder>()?.append_null()
990+
}
991+
DataType::LargeBinary => self.builder::<LargeBinaryBuilder>()?.append_null(),
992+
_ => {
993+
return Err(Error::new(
994+
ErrorKind::FeatureUnsupported,
995+
format!(
996+
"Cannot append null values for data type: {:?}",
997+
self.data_type
998+
),
999+
))
1000+
}
1001+
}
1002+
Ok(())
1003+
}
1004+
1005+
/// Cast the `inner` builder to a specific type or return [Error].
1006+
fn builder<T: ArrayBuilder>(&mut self) -> crate::Result<&mut T> {
1007+
self.inner.as_any_mut().downcast_mut::<T>().ok_or_else(|| {
1008+
Error::new(
1009+
ErrorKind::Unexpected,
1010+
"Failed to cast builder to expected type",
1011+
)
1012+
})
1013+
}
1014+
}
1015+
}
1016+
8301017
impl TryFrom<&ArrowSchema> for crate::spec::Schema {
8311018
type Error = Error;
8321019

0 commit comments

Comments
 (0)