From 3188ea6e19a47f5c24fbd01feed941867c6a47ad Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Mon, 8 Sep 2025 20:22:03 +0200 Subject: [PATCH 01/14] Add support for advanced Avro types (Map, Enum, Decimal, Interval) to arrow-avro encoder --- arrow-avro/src/writer/encoder.rs | 613 ++++++++++++++++++++++++++++++- 1 file changed, 609 insertions(+), 4 deletions(-) diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index ccf80fd8d1ac..a397f07edbe6 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -21,15 +21,19 @@ use crate::codec::{AvroDataType, AvroField, Codec}; use crate::schema::Nullability; use arrow_array::cast::AsArray; use arrow_array::types::{ - ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType, + ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType, + IntervalMonthDayNanoType, IntervalYearMonthType, TimestampMicrosecondType, }; use arrow_array::{ - Array, GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, ListArray, - OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray, + Array, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array, DictionaryArray, + FixedSizeBinaryArray, GenericBinaryArray, GenericListArray, GenericStringArray, LargeListArray, + ListArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, }; use arrow_buffer::NullBuffer; -use arrow_schema::{ArrowError, DataType, Field, Schema as ArrowSchema, TimeUnit}; +use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit}; use std::io::Write; +use std::sync::Arc; +use uuid::Uuid; /// Encode a single Avro-`long` using ZigZag + variable length, buffered. /// @@ -69,6 +73,77 @@ fn write_bool(out: &mut W, v: bool) -> Result<(), ArrowError> .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e)) } +/// Minimal two's-complement big-endian representation helper for Avro decimal (bytes). +/// +/// For positive numbers, trim leading 0x00 while the next byte's MSB is 0. +/// For negative numbers, trim leading 0xFF while the next byte's MSB is 1. +/// The resulting slice still encodes the same signed value. +/// +/// See Avro spec: decimal over `bytes` uses two's-complement big-endian +/// representation of the unscaled integer value. 1.11.1 specification. +#[inline] +fn minimal_twos_complement(be: &[u8]) -> &[u8] { + if be.is_empty() { + return be; + } + let mut i = 0usize; + let sign = (be[0] & 0x80) != 0; + while i + 1 < be.len() { + let b = be[i]; + let next = be[i + 1]; + let trim_pos = !sign && b == 0x00 && (next & 0x80) == 0; + let trim_neg = sign && b == 0xFF && (next & 0x80) != 0; + if trim_pos || trim_neg { + i += 1; + } else { + break; + } + } + &be[i..] +} + +/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n` bytes. +/// +/// If `src_be` is longer than `n`, ensure that dropped leading bytes are all sign bytes, +/// and that the MSB of the first kept byte matches the sign; otherwise return an overflow error. +/// If shorter than `n`, left-pad with the sign byte. +/// +/// Used for Avro decimal over `fixed(N)`. +#[inline] +fn sign_extend_to_exact(src_be: &[u8], n: usize) -> Result, ArrowError> { + let len = src_be.len(); + let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 { + 0xFF + } else { + 0x00 + }; + if len == n { + return Ok(src_be.to_vec()); + } + if len > n { + let extra = len - n; + if src_be[..extra].iter().any(|&b| b != sign_byte) { + return Err(ArrowError::InvalidArgumentError(format!( + "Decimal value with {} bytes cannot be represented in {} bytes without overflow", + len, n + ))); + } + if n > 0 { + let first_kept = src_be[extra]; + if ((first_kept ^ sign_byte) & 0x80) != 0 { + return Err(ArrowError::InvalidArgumentError(format!( + "Decimal value with {} bytes cannot be represented in {} bytes without overflow", + len, n + ))); + } + } + return Ok(src_be[extra..].to_vec()); + } + let mut out = vec![sign_byte; n]; + out[n - len..].copy_from_slice(src_be); + Ok(out) +} + /// Write the union branch index for an optional field. /// /// Branch index is 0-based per Avro unions: @@ -174,12 +249,119 @@ impl<'a> FieldEncoder<'a> { DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder( array.as_primitive::(), )), + DataType::Interval(unit) => match unit { + IntervalUnit::MonthDayNano => { + Encoder::IntervalMonthDayNano(IntervalMonthDayNanoEncoder( + array.as_primitive::(), + )) + } + IntervalUnit::YearMonth => { + Encoder::IntervalYearMonth(IntervalYearMonthEncoder( + array.as_primitive::(), + )) + } + IntervalUnit::DayTime => Encoder::IntervalDayTime(IntervalDayTimeEncoder( + array.as_primitive::(), + )), + } other => { return Err(ArrowError::NotYetImplemented(format!( "Avro scalar type not yet supported: {other:?}" ))); } }, + FieldPlan::Decimal {size} => match array.data_type() { + DataType::Decimal32(_,_) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size)) + } + DataType::Decimal64(_,_) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size)) + } + DataType::Decimal128(_,_) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size)) + } + DataType::Decimal256(_,_) => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size)) + } + other => { + return Err(ArrowError::SchemaError(format!( + "Avro decimal site requires Arrow Decimal 32, 64, 128, or 256, found: {other:?}" + ))) + } + }, + FieldPlan::Map { values_nullability, + value_plan } => { + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::SchemaError("Expected MapArray".into()))?; + Encoder::Map(Box::new(MapEncoder::try_new(arr, *values_nullability, value_plan.as_ref())?)) + } + FieldPlan::Enum { symbols} => match array.data_type() { + DataType::Dictionary(key_dt, value_dt) => { + // Enforce the same shape we validated during plan build: + if **key_dt != DataType::Int32 || **value_dt != DataType::Utf8 { + return Err(ArrowError::SchemaError( + "Avro enum requires Dictionary".into(), + )); + } + let dict = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + ArrowError::SchemaError("Expected DictionaryArray".into()) + })?; + + // Dictionary values must exactly match schema `symbols` (order & content) + let values = dict + .values() + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::SchemaError("Dictionary values must be Utf8".into()) + })?; + if values.len() != symbols.len() { + return Err(ArrowError::SchemaError(format!( + "Enum symbol length {} != dictionary size {}", + symbols.len(), + values.len() + ))); + } + for i in 0..values.len() { + if values.value(i) != symbols[i].as_str() { + return Err(ArrowError::SchemaError(format!( + "Enum symbol mismatch at {i}: schema='{}' dict='{}'", + symbols[i], + values.value(i) + ))); + } + } + // Keys are the Avro enum indices (zero-based position in `symbols`). + let keys = dict.keys(); + Encoder::Enum(EnumEncoder { keys }) + } + other => { + return Err(ArrowError::SchemaError(format!( + "Avro enum site requires DataType::Dictionary, found: {other:?}" + ))) + } + } other => { return Err(ArrowError::NotYetImplemented(format!( "Avro writer: {other:?} not yet supported", @@ -256,6 +438,16 @@ enum FieldPlan { items_nullability: Option, item_plan: Box, }, + /// Avro map with value‑site nullability and nested plan + Map { + values_nullability: Option, + value_plan: Box, + }, + /// Avro decimal logical type (bytes or fixed). `size=None` => bytes(decimal), `Some(n)` => fixed(n) + Decimal { size: Option }, + /// Avro enum; maps to Arrow Dictionary with dictionary values + /// exactly equal and ordered as the Avro enum `symbols`. + Enum { symbols: Arc<[String]> }, } #[derive(Debug, Clone)] @@ -366,6 +558,13 @@ fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option< fields.iter().position(|f| f.name() == name) } +fn find_map_value_field_index(fields: &arrow_schema::Fields) -> Option { + // Prefer common Arrow field names; fall back to second child if exactly two + find_struct_child_index(fields, "value") + .or_else(|| find_struct_child_index(fields, "values")) + .or_else(|| if fields.len() == 2 { Some(1) } else { None }) +} + impl FieldPlan { fn build(avro_dt: &AvroDataType, arrow_field: &Field) -> Result { match avro_dt.codec() { @@ -408,6 +607,88 @@ impl FieldPlan { "Avro array maps to Arrow List/LargeList, found: {other:?}" ))), }, + Codec::Map(values_dt) => { + // Avro map -> Arrow DataType::Map(entries_struct, sorted) + let entries_field = match arrow_field.data_type() { + DataType::Map(entries, _sorted) => entries.as_ref(), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro map maps to Arrow DataType::Map, found: {other:?}" + ))) + } + }; + let entries_struct_fields = match entries_field.data_type() { + DataType::Struct(fs) => fs, + other => { + return Err(ArrowError::SchemaError(format!( + "Arrow Map entries must be Struct, found: {other:?}" + ))) + } + }; + let value_idx = + find_map_value_field_index(entries_struct_fields).ok_or_else(|| { + ArrowError::SchemaError("Map entries struct missing value field".into()) + })?; + let value_field = entries_struct_fields[value_idx].as_ref(); + let value_plan = FieldPlan::build(values_dt.as_ref(), value_field)?; + Ok(FieldPlan::Map { + values_nullability: values_dt.nullability(), + value_plan: Box::new(value_plan), + }) + } + Codec::Enum(symbols) => match arrow_field.data_type() { + DataType::Dictionary(key_dt, value_dt) => { + // Enforce the exact reader-compatible shape: Dictionary + if **key_dt != DataType::Int32 { + return Err(ArrowError::SchemaError( + "Avro enum requires Dictionary".into(), + )); + } + if **value_dt != DataType::Utf8 { + return Err(ArrowError::SchemaError( + "Avro enum requires Dictionary".into(), + )); + } + Ok(FieldPlan::Enum { + symbols: symbols.clone(), + }) + } + other => Err(ArrowError::SchemaError(format!( + "Avro enum maps to Arrow Dictionary, found: {other:?}" + ))), + }, + // decimal site (bytes or fixed(N)) with precision/scale validation + Codec::Decimal(precision, scale_opt, fixed_size_opt) => { + let (ap, as_) = match arrow_field.data_type() { + DataType::Decimal32(p, s) => (*p as usize, *s as i32), + DataType::Decimal64(p, s) => (*p as usize, *s as i32), + DataType::Decimal128(p, s) => (*p as usize, *s as i32), + DataType::Decimal256(p, s) => (*p as usize, *s as i32), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro decimal requires Arrow decimal, got {other:?} for field '{}'", + arrow_field.name() + ))) + } + }; + let sc = scale_opt.unwrap_or(0) as i32; // Avro scale defaults to 0 if absent + if ap != *precision || as_ != sc { + return Err(ArrowError::SchemaError(format!( + "Decimal precision/scale mismatch for field '{}': Avro({precision},{sc}) vs Arrow({ap},{as_})", + arrow_field.name() + ))); + } + Ok(FieldPlan::Decimal { + size: *fixed_size_opt, + }) + } + Codec::Interval => match arrow_field.data_type() { + DataType::Interval(IntervalUnit::MonthDayNano | IntervalUnit::YearMonth | IntervalUnit::DayTime + ) => Ok(FieldPlan::Scalar), + other => Err(ArrowError::SchemaError(format!( + "Avro duration logical type requires Arrow Interval(MonthDayNano), found: {other:?}" + ))), + } _ => Ok(FieldPlan::Scalar), } } @@ -427,6 +708,23 @@ enum Encoder<'a> { List(Box>), LargeList(Box>), Struct(Box>), + /// Avro `fixed` encoder (raw bytes, no length) + Fixed(FixedEncoder<'a>), + /// Avro `uuid` logical type encoder (string with RFC‑4122 hyphenated text) + Uuid(UuidEncoder<'a>), + /// Avro `duration` logical type (Arrow Interval(MonthDayNano)) encoder + IntervalMonthDayNano(IntervalMonthDayNanoEncoder<'a>), + /// Avro `duration` logical type (Arrow Interval(YearMonth)) encoder + IntervalYearMonth(IntervalYearMonthEncoder<'a>), + /// Avro `duration` logical type (Arrow Interval(DayTime)) encoder + IntervalDayTime(IntervalDayTimeEncoder<'a>), + /// Avro `enum` encoder: writes the key (int) as the enum index. + Enum(EnumEncoder<'a>), + Decimal32(Decimal32Encoder<'a>), + Decimal64(Decimal64Encoder<'a>), + Decimal128(Decimal128Encoder<'a>), + Decimal256(Decimal256Encoder<'a>), + Map(Box>), } impl<'a> Encoder<'a> { @@ -446,6 +744,17 @@ impl<'a> Encoder<'a> { Encoder::List(e) => e.encode(out, idx), Encoder::LargeList(e) => e.encode(out, idx), Encoder::Struct(e) => e.encode(out, idx), + Encoder::Fixed(e) => (e).encode(out, idx), + Encoder::Uuid(e) => (e).encode(out, idx), + Encoder::IntervalMonthDayNano(e) => (e).encode(out, idx), + Encoder::IntervalYearMonth(e) => (e).encode(out, idx), + Encoder::IntervalDayTime(e) => (e).encode(out, idx), + Encoder::Enum(e) => (e).encode(out, idx), + Encoder::Decimal32(e) => (e).encode(out, idx), + Encoder::Decimal64(e) => (e).encode(out, idx), + Encoder::Decimal128(e) => (e).encode(out, idx), + Encoder::Decimal256(e) => (e).encode(out, idx), + Encoder::Map(e) => (e).encode(out, idx), } } } @@ -512,6 +821,113 @@ impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> { type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>; type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>; +/// Internal key array kind used by Map encoder. +enum KeyKind<'a> { + Utf8(&'a GenericStringArray), + LargeUtf8(&'a GenericStringArray), +} +struct MapEncoder<'a> { + map: &'a MapArray, + keys: KeyKind<'a>, + values: FieldEncoder<'a>, + keys_offset: usize, + values_offset: usize, +} + +fn encode_map_entries( + out: &mut W, + keys: &GenericStringArray, + keys_offset: usize, + start: usize, + end: usize, + mut write_item: impl FnMut(&mut W, usize) -> Result<(), ArrowError>, +) -> Result<(), ArrowError> +where + W: Write + ?Sized, + O: OffsetSizeTrait, +{ + encode_blocked_range(out, start, end, |out, j| { + let j_key = j.saturating_sub(keys_offset); + write_len_prefixed(out, keys.value(j_key).as_bytes())?; + write_item(out, j) + }) +} + +impl<'a> MapEncoder<'a> { + fn try_new( + map: &'a MapArray, + values_nullability: Option, + value_plan: &FieldPlan, + ) -> Result { + let keys_arr = map.keys(); + let keys_kind = match keys_arr.data_type() { + DataType::Utf8 => KeyKind::Utf8(keys_arr.as_string::()), + DataType::LargeUtf8 => KeyKind::LargeUtf8(keys_arr.as_string::()), + other => { + return Err(ArrowError::SchemaError(format!( + "Avro map requires string keys; Arrow key type must be Utf8/LargeUtf8, found: {other:?}" + ))) + } + }; + + let entries_struct_fields = match map.data_type() { + DataType::Map(entries, _) => match entries.data_type() { + DataType::Struct(fs) => fs, + other => { + return Err(ArrowError::SchemaError(format!( + "Arrow Map entries must be Struct, found: {other:?}" + ))) + } + }, + _ => { + return Err(ArrowError::SchemaError( + "Expected MapArray with DataType::Map".into(), + )) + } + }; + + let v_idx = find_map_value_field_index(entries_struct_fields).ok_or_else(|| { + ArrowError::SchemaError("Map entries struct missing value field".into()) + })?; + let value_field = entries_struct_fields[v_idx].as_ref(); + + let values_enc = prepare_value_site_encoder( + map.values().as_ref(), + value_field, + values_nullability, + value_plan, + )?; + + Ok(Self { + map, + keys: keys_kind, + values: values_enc, + keys_offset: keys_arr.offset(), + values_offset: map.values().offset(), + }) + } + + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let offsets = self.map.offsets(); + let start = offsets[idx] as usize; + let end = offsets[idx + 1] as usize; + + let mut write_item = |out: &mut W, j: usize| { + let j_val = j.saturating_sub(self.values_offset); + self.values.encode(out, j_val) + }; + + match self.keys { + KeyKind::Utf8(arr) => { + encode_map_entries(out, arr, self.keys_offset, start, end, write_item) + } + KeyKind::LargeUtf8(arr) => { + encode_map_entries(out, arr, self.keys_offset, start, end, write_item) + } + } + } +} + struct StructEncoder<'a> { encoders: Vec>, } @@ -653,6 +1069,195 @@ fn prepare_value_site_encoder<'a>( FieldEncoder::make_encoder(values_array, value_field, plan, nullability) } +/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`. +/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix. +struct FixedEncoder<'a>(&'a FixedSizeBinaryArray); +impl FixedEncoder<'_> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let v = self.0.value(idx); // &[u8] of fixed width + out.write_all(v) + .map_err(|e| ArrowError::IoError(format!("write fixed bytes: {e}"), e)) + } +} + +/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string (UUID). +/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated form. +struct UuidEncoder<'a>(&'a FixedSizeBinaryArray); +impl UuidEncoder<'_> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let v = self.0.value(idx); + if v.len() != 16 { + return Err(ArrowError::InvalidArgumentError( + "logicalType=uuid requires FixedSizeBinary(16)".into(), + )); + } + let u = Uuid::from_slice(v) + .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid UUID bytes: {e}")))?; + let mut tmp = [0u8; uuid::fmt::Hyphenated::LENGTH]; + let s = u.hyphenated().encode_lower(&mut tmp); + write_len_prefixed(out, s.as_bytes()) + } +} + +/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::MonthDayNano)`. +/// Spec: `duration` annotates Avro fixed(12) with three **little‑endian u32**: +/// months, days, milliseconds (no negatives). +struct IntervalMonthDayNanoEncoder<'a>(&'a PrimitiveArray); +impl IntervalMonthDayNanoEncoder<'_> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let native = self.0.value(idx); + let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native); + if months < 0 || days < 0 || nanos < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative months/days/nanoseconds".into(), + )); + } + if nanos % 1_000_000 != 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000" + .into(), + )); + } + let millis = nanos / 1_000_000; + if millis > u32::MAX as i64 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' milliseconds exceed u32::MAX".into(), + )); + } + let mut buf = [0u8; 12]; + buf[0..4].copy_from_slice(&(months as u32).to_le_bytes()); + buf[4..8].copy_from_slice(&(days as u32).to_le_bytes()); + buf[8..12].copy_from_slice(&(millis as u32).to_le_bytes()); + out.write_all(&buf) + .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e)) + } +} + +/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::YearMonth)`. +struct IntervalYearMonthEncoder<'a>(&'a PrimitiveArray); +impl IntervalYearMonthEncoder<'_> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let months_i32 = self.0.value(idx); + + if months_i32 < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative months".into(), + )); + } + + let mut buf = [0u8; 12]; + buf[0..4].copy_from_slice(&(months_i32 as u32).to_le_bytes()); + // Days and Milliseconds are zero, so their bytes are already 0. + // buf[4..8] is [0, 0, 0, 0] + // buf[8..12] is [0, 0, 0, 0] + + out.write_all(&buf) + .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e)) + } +} + +/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::DayTime)`. +struct IntervalDayTimeEncoder<'a>(&'a PrimitiveArray); +impl IntervalDayTimeEncoder<'_> { + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + // A DayTime interval is a packed (days: i32, milliseconds: i32). + let native = self.0.value(idx); + let (days, millis) = IntervalDayTimeType::to_parts(native); + + if days < 0 || millis < 0 { + return Err(ArrowError::InvalidArgumentError( + "Avro 'duration' cannot encode negative days or milliseconds".into(), + )); + } + + // (months=0, days, millis) + let mut buf = [0u8; 12]; + // Months is zero. buf[0..4] is already [0, 0, 0, 0]. + buf[4..8].copy_from_slice(&(days as u32).to_le_bytes()); + buf[8..12].copy_from_slice(&(millis as u32).to_le_bytes()); + + out.write_all(&buf) + .map_err(|e| ArrowError::IoError(format!("write duration: {e}"), e)) + } +} + +/// Minimal trait to obtain a big-endian fixed-size byte array for a decimal's +/// unscaled integer value at `idx`. +trait DecimalBeBytes { + fn value_be_bytes(&self, idx: usize) -> [u8; N]; +} + +impl DecimalBeBytes<4> for Decimal32Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 4] { + self.value(idx).to_be_bytes() + } +} +impl DecimalBeBytes<8> for Decimal64Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 8] { + self.value(idx).to_be_bytes() + } +} +impl DecimalBeBytes<16> for Decimal128Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 16] { + self.value(idx).to_be_bytes() + } +} +impl DecimalBeBytes<32> for Decimal256Array { + fn value_be_bytes(&self, idx: usize) -> [u8; 32] { + // Arrow i256 → [u8; 32] big-endian + self.value(idx).to_be_bytes() + } +} + +/// Generic Avro decimal encoder over Arrow decimal arrays. +/// - When `fixed_size` is `None` → Avro `bytes(decimal)`; writes the minimal +/// two's-complement representation with a length prefix. +/// - When `Some(n)` → Avro `fixed(n, decimal)`; sign-extends (or validates) +/// to exactly `n` bytes and writes them directly. +struct DecimalEncoder<'a, const N: usize, A: DecimalBeBytes> { + arr: &'a A, + fixed_size: Option, +} + +impl<'a, const N: usize, A: DecimalBeBytes> DecimalEncoder<'a, N, A> { + fn new(arr: &'a A, fixed_size: Option) -> Self { + Self { arr, fixed_size } + } + + fn encode(&mut self, out: &mut W, idx: usize) -> Result<(), ArrowError> { + let be = self.arr.value_be_bytes(idx); + match self.fixed_size { + Some(n) => { + let bytes = sign_extend_to_exact(&be, n)?; + out.write_all(&bytes) + .map_err(|e| ArrowError::IoError(format!("write decimal fixed: {e}"), e)) + } + None => write_len_prefixed(out, minimal_twos_complement(&be)), + } + } +} + +type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>; +type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>; +type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>; +type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>; + +/// Avro `enum` encoder for Arrow `DictionaryArray`. +/// +/// Per Avro spec, an enum is encoded as an **int** equal to the +/// zero-based position of the symbol in the schema’s `symbols` list. +/// We validate at construction that the dictionary values equal the symbols, +/// so we can directly write the key value here. +struct EnumEncoder<'a> { + keys: &'a PrimitiveArray, +} +impl EnumEncoder<'_> { + fn encode(&mut self, out: &mut W, row: usize) -> Result<(), ArrowError> { + let idx = self.keys.value(row); + write_int(out, idx) + } +} + #[cfg(test)] mod tests { use super::*; From e74a6c4b3bc2c43a2c7df1d039ca38e15b097d67 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Mon, 8 Sep 2025 22:58:49 +0200 Subject: [PATCH 02/14] Add comprehensive benchmarking for arrow-avro, covering additional types including utf8, fixed-size binary, interval, struct, and decimals --- arrow-avro/benches/avro_writer.rs | 392 +++++++++++++++++++++++++++++- arrow-avro/src/schema.rs | 24 +- arrow-avro/src/writer/encoder.rs | 44 ++++ 3 files changed, 450 insertions(+), 10 deletions(-) diff --git a/arrow-avro/benches/avro_writer.rs b/arrow-avro/benches/avro_writer.rs index 924cbbdc84bd..5daf00dfb09a 100644 --- a/arrow-avro/benches/avro_writer.rs +++ b/arrow-avro/benches/avro_writer.rs @@ -15,19 +15,22 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for `arrow‑avro` **Writer** (Avro Object Container Files) -//! +//! Benchmarks for `arrow-avro` Writer (Avro Object Container File) extern crate arrow_avro; extern crate criterion; extern crate once_cell; use arrow_array::{ - types::{Int32Type, Int64Type, TimestampMicrosecondType}, - ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, PrimitiveArray, RecordBatch, + builder::{ListBuilder, StringBuilder}, + types::{Int32Type, Int64Type, IntervalMonthDayNanoType, TimestampMicrosecondType}, + ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Decimal256Array, Decimal32Array, + Decimal64Array, FixedSizeBinaryArray, Float32Array, Float64Array, ListArray, PrimitiveArray, + RecordBatch, StringArray, StructArray, }; use arrow_avro::writer::AvroWriter; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_buffer::i256; +use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; use rand::{ @@ -35,6 +38,7 @@ use rand::{ rngs::StdRng, Rng, SeedableRng, }; +use std::collections::HashMap; use std::io::Cursor; use std::sync::Arc; use std::time::Duration; @@ -63,7 +67,9 @@ where #[inline] fn make_bool_array_with_tag(n: usize, tag: u64) -> BooleanArray { let mut rng = rng_for(tag, n); + // Can't use SampleUniform for bool; use the RNG's boolean helper let values = (0..n).map(|_| rng.random_bool(0.5)); + // This repo exposes `from_iter`, not `from_iter_values` for BooleanArray BooleanArray::from_iter(values.map(Some)) } @@ -81,6 +87,21 @@ fn make_i64_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { PrimitiveArray::::from_iter_values(values) } +#[inline] +fn rand_ascii_string(rng: &mut StdRng, min_len: usize, max_len: usize) -> String { + let len = rng.random_range(min_len..=max_len); + (0..len) + .map(|_| (rng.random_range(b'a'..=b'z') as char)) + .collect() +} + +#[inline] +fn make_utf8_array_with_tag(n: usize, tag: u64) -> StringArray { + let mut rng = rng_for(tag, n); + let data: Vec = (0..n).map(|_| rand_ascii_string(&mut rng, 3, 16)).collect(); + StringArray::from_iter_values(data) +} + #[inline] fn make_f32_array_with_tag(n: usize, tag: u64) -> Float32Array { let mut rng = rng_for(tag, n); @@ -98,14 +119,52 @@ fn make_f64_array_with_tag(n: usize, tag: u64) -> Float64Array { #[inline] fn make_binary_array_with_tag(n: usize, tag: u64) -> BinaryArray { let mut rng = rng_for(tag, n); - let mut payloads: Vec<[u8; 16]> = vec![[0; 16]; n]; - for p in payloads.iter_mut() { + let mut payloads: Vec> = Vec::with_capacity(n); + for _ in 0..n { + let len = rng.random_range(1..=16); + let mut p = vec![0u8; len]; rng.fill(&mut p[..]); + payloads.push(p); } let views: Vec<&[u8]> = payloads.iter().map(|p| &p[..]).collect(); + // This repo exposes a simple `from_vec` for BinaryArray BinaryArray::from_vec(views) } +#[inline] +fn make_fixed16_array_with_tag(n: usize, tag: u64) -> FixedSizeBinaryArray { + let mut rng = rng_for(tag, n); + let payloads = (0..n) + .map(|_| { + let mut b = [0u8; 16]; + rng.fill(&mut b); + b + }) + .collect::>(); + // Fixed-size constructor available in this repo + FixedSizeBinaryArray::try_from_iter(payloads.into_iter()).expect("build FixedSizeBinaryArray") +} + +/// Make an Arrow `Interval(IntervalUnit::MonthDayNano)` array with **non-negative** +/// (months, days, nanos) values, and nanos as **multiples of 1_000_000** (whole ms), +/// per Avro `duration` constraints used by the writer. +#[inline] +fn make_interval_mdn_array_with_tag( + n: usize, + tag: u64, +) -> PrimitiveArray { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| { + let months: i32 = rng.random_range(0..=120); + let days: i32 = rng.random_range(0..=31); + // pick millis within a day (safe within u32::MAX and realistic) + let millis: u32 = rng.random_range(0..=86_400_000); + let nanos: i64 = (millis as i64) * 1_000_000; + IntervalMonthDayNanoType::make_value(months, days, nanos) + }); + PrimitiveArray::::from_iter_values(values) +} + #[inline] fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { let mut rng = rng_for(tag, n); @@ -115,9 +174,80 @@ fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray::from_iter_values(values) } +// === Decimal helpers & generators === + +#[inline] +fn pow10_i32(p: u8) -> i32 { + (0..p).fold(1i32, |acc, _| acc.saturating_mul(10)) +} + +#[inline] +fn pow10_i64(p: u8) -> i64 { + (0..p).fold(1i64, |acc, _| acc.saturating_mul(10)) +} + +#[inline] +fn pow10_i128(p: u8) -> i128 { + (0..p).fold(1i128, |acc, _| acc.saturating_mul(10)) +} + +#[inline] +fn make_decimal32_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal32Array { + let mut rng = rng_for(tag, n); + let max = pow10_i32(precision).saturating_sub(1); + let values = (0..n).map(|_| rng.random_range(-max..=max)); + Decimal32Array::from_iter_values(values) + .with_precision_and_scale(precision, scale) + .expect("set precision/scale on Decimal32Array") +} + +#[inline] +fn make_decimal64_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal64Array { + let mut rng = rng_for(tag, n); + let max = pow10_i64(precision).saturating_sub(1); + let values = (0..n).map(|_| rng.random_range(-max..=max)); + Decimal64Array::from_iter_values(values) + .with_precision_and_scale(precision, scale) + .expect("set precision/scale on Decimal64Array") +} + +#[inline] +fn make_decimal128_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal128Array { + let mut rng = rng_for(tag, n); + let max = pow10_i128(precision).saturating_sub(1); + let values = (0..n).map(|_| rng.random_range(-max..=max)); + Decimal128Array::from_iter_values(values) + .with_precision_and_scale(precision, scale) + .expect("set precision/scale on Decimal128Array") +} + +#[inline] +fn make_decimal256_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal256Array { + // Generate within i128 range and widen to i256 to keep generation cheap and portable + let mut rng = rng_for(tag, n); + let max128 = pow10_i128(30).saturating_sub(1); + let values = (0..n).map(|_| { + let v: i128 = rng.random_range(-max128..=max128); + i256::from_i128(v) + }); + Decimal256Array::from_iter_values(values) + .with_precision_and_scale(precision, scale) + .expect("set precision/scale on Decimal256Array") +} + +#[inline] +fn make_fixed16_array(n: usize) -> FixedSizeBinaryArray { + make_fixed16_array_with_tag(n, 0xF15E_D016) +} + +#[inline] +fn make_interval_mdn_array(n: usize) -> PrimitiveArray { + make_interval_mdn_array_with_tag(n, 0xD0_1E_AD) +} + #[inline] fn make_bool_array(n: usize) -> BooleanArray { - make_bool_array_with_tag(n, 0xB001) + make_bool_array_with_tag(n, 0xB00_1) } #[inline] fn make_i32_array(n: usize) -> PrimitiveArray { @@ -143,6 +273,57 @@ fn make_binary_array(n: usize) -> BinaryArray { fn make_ts_micros_array(n: usize) -> PrimitiveArray { make_ts_micros_array_with_tag(n, 0x7157_0001) } +#[inline] +fn make_utf8_array(n: usize) -> StringArray { + make_utf8_array_with_tag(n, 0x5712_07F8) +} +#[inline] +fn make_list_utf8_array(n: usize) -> ListArray { + make_list_utf8_array_with_tag(n, 0x0A11_57ED) +} +#[inline] +fn make_struct_array(n: usize) -> StructArray { + make_struct_array_with_tag(n, 0x57_AB_C7) +} + +#[inline] +fn make_list_utf8_array_with_tag(n: usize, tag: u64) -> ListArray { + let mut rng = rng_for(tag, n); + let mut builder = ListBuilder::new(StringBuilder::new()); + for _ in 0..n { + let items = rng.random_range(0..=5); + for _ in 0..items { + let s = rand_ascii_string(&mut rng, 1, 12); + builder.values().append_value(s.as_str()); + } + builder.append(true); + } + builder.finish() +} + +#[inline] +fn make_struct_array_with_tag(n: usize, tag: u64) -> StructArray { + let s_tag = tag ^ 0x5u64; + let i_tag = tag ^ 0x6u64; + let f_tag = tag ^ 0x7u64; + let s_col: ArrayRef = Arc::new(make_utf8_array_with_tag(n, s_tag)); + let i_col: ArrayRef = Arc::new(make_i32_array_with_tag(n, i_tag)); + let f_col: ArrayRef = Arc::new(make_f64_array_with_tag(n, f_tag)); + StructArray::from(vec![ + ( + Arc::new(Field::new("s1", DataType::Utf8, false)), + s_col.clone(), + ), + ( + Arc::new(Field::new("s2", DataType::Int32, false)), + i_col.clone(), + ), + ( + Arc::new(Field::new("s3", DataType::Float64, false)), + f_col.clone(), + ), + ]) +} #[inline] fn schema_single(name: &str, dt: DataType) -> Arc { @@ -159,6 +340,36 @@ fn schema_mixed() -> Arc { ])) } +#[inline] +fn schema_fixed16() -> Arc { + schema_single("field1", DataType::FixedSizeBinary(16)) +} + +#[inline] +fn schema_uuid16() -> Arc { + let mut md = HashMap::new(); + md.insert("logicalType".to_string(), "uuid".to_string()); + let field = Field::new("uuid", DataType::FixedSizeBinary(16), false).with_metadata(md); + Arc::new(Schema::new(vec![field])) +} + +#[inline] +fn schema_interval_mdn() -> Arc { + schema_single("duration", DataType::Interval(IntervalUnit::MonthDayNano)) +} + +#[inline] +fn schema_decimal_with_size(name: &str, dt: DataType, size_meta: Option) -> Arc { + let field = if let Some(size) = size_meta { + let mut md = HashMap::new(); + md.insert("size".to_string(), size.to_string()); + Field::new(name, dt, false).with_metadata(md) + } else { + Field::new(name, dt, false) + }; + Arc::new(Schema::new(vec![field])) +} + static BOOLEAN_DATA: Lazy> = Lazy::new(|| { let schema = schema_single("field1", DataType::Boolean); SIZES @@ -225,6 +436,40 @@ static BINARY_DATA: Lazy> = Lazy::new(|| { .collect() }); +static FIXED16_DATA: Lazy> = Lazy::new(|| { + let schema = schema_fixed16(); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_fixed16_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static UUID16_DATA: Lazy> = Lazy::new(|| { + let schema = schema_uuid16(); + SIZES + .iter() + .map(|&n| { + // Same values as Fixed16; writer path differs because of field metadata + let col: ArrayRef = Arc::new(make_fixed16_array_with_tag(n, 0x7575_6964_7575_6964)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static INTERVAL_MDN_DATA: Lazy> = Lazy::new(|| { + let schema = schema_interval_mdn(); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_interval_mdn_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + static TIMESTAMP_US_DATA: Lazy> = Lazy::new(|| { let schema = schema_single("field1", DataType::Timestamp(TimeUnit::Microsecond, None)); SIZES @@ -250,6 +495,126 @@ static MIXED_DATA: Lazy> = Lazy::new(|| { .collect() }); +static UTF8_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Utf8); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_utf8_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static LIST_UTF8_DATA: Lazy> = Lazy::new(|| { + // IMPORTANT: ListBuilder creates a child field named "item" that is nullable by default. + // Make the schema's list item nullable to match the array we construct. + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = schema_single("field1", DataType::List(item_field)); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_list_utf8_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static STRUCT_DATA: Lazy> = Lazy::new(|| { + let struct_dt = DataType::Struct( + vec![ + Field::new("s1", DataType::Utf8, false), + Field::new("s2", DataType::Int32, false), + Field::new("s3", DataType::Float64, false), + ] + .into(), + ); + let schema = schema_single("field1", struct_dt); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_struct_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +// === NEW: Decimal datasets === + +static DECIMAL32_DATA: Lazy> = Lazy::new(|| { + // Choose a representative precision/scale within Decimal32 limits + let precision: u8 = 7; + let scale: i8 = 2; + let schema = schema_single("amount", DataType::Decimal32(precision, scale)); + SIZES + .iter() + .map(|&n| { + let arr = make_decimal32_array_with_tag(n, 0xDEC_0032, precision, scale); + let col: ArrayRef = Arc::new(arr); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static DECIMAL64_DATA: Lazy> = Lazy::new(|| { + let precision: u8 = 13; + let scale: i8 = 3; + let schema = schema_single("amount", DataType::Decimal64(precision, scale)); + SIZES + .iter() + .map(|&n| { + let arr = make_decimal64_array_with_tag(n, 0xDEC_0064, precision, scale); + let col: ArrayRef = Arc::new(arr); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static DECIMAL128_BYTES_DATA: Lazy> = Lazy::new(|| { + let precision: u8 = 25; + let scale: i8 = 6; + let schema = schema_single("amount", DataType::Decimal128(precision, scale)); + SIZES + .iter() + .map(|&n| { + let arr = make_decimal128_array_with_tag(n, 0xDEC_0128, precision, scale); + let col: ArrayRef = Arc::new(arr); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static DECIMAL128_FIXED16_DATA: Lazy> = Lazy::new(|| { + // Same logical type as above but force Avro fixed(16) via metadata "size": "16" + let precision: u8 = 25; + let scale: i8 = 6; + let schema = + schema_decimal_with_size("amount", DataType::Decimal128(precision, scale), Some(16)); + SIZES + .iter() + .map(|&n| { + let arr = make_decimal128_array_with_tag(n, 0xDEC_F128, precision, scale); + let col: ArrayRef = Arc::new(arr); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static DECIMAL256_DATA: Lazy> = Lazy::new(|| { + // Use a higher precision typical of 256-bit decimals + let precision: u8 = 50; + let scale: i8 = 10; + let schema = schema_single("amount", DataType::Decimal256(precision, scale)); + SIZES + .iter() + .map(|&n| { + let arr = make_decimal256_array_with_tag(n, 0xDEC_0256, precision, scale); + let col: ArrayRef = Arc::new(arr); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + fn ocf_size_for_batch(batch: &RecordBatch) -> usize { let schema_owned: Schema = (*batch.schema()).clone(); let cursor = Cursor::new(Vec::::with_capacity(1024)); @@ -314,6 +679,17 @@ fn criterion_benches(c: &mut Criterion) { bench_writer_scenario(c, "write-Binary(Bytes)", &BINARY_DATA); bench_writer_scenario(c, "write-TimestampMicros", &TIMESTAMP_US_DATA); bench_writer_scenario(c, "write-Mixed", &MIXED_DATA); + bench_writer_scenario(c, "write-Utf8", &UTF8_DATA); + bench_writer_scenario(c, "write-List", &LIST_UTF8_DATA); + bench_writer_scenario(c, "write-Struct", &STRUCT_DATA); + bench_writer_scenario(c, "write-FixedSizeBinary16", &FIXED16_DATA); + bench_writer_scenario(c, "write-UUID(logicalType)", &UUID16_DATA); + bench_writer_scenario(c, "write-IntervalMonthDayNanoDuration", &INTERVAL_MDN_DATA); + bench_writer_scenario(c, "write-Decimal32(bytes)", &DECIMAL32_DATA); + bench_writer_scenario(c, "write-Decimal64(bytes)", &DECIMAL64_DATA); + bench_writer_scenario(c, "write-Decimal128(bytes)", &DECIMAL128_BYTES_DATA); + bench_writer_scenario(c, "write-Decimal128(fixed16)", &DECIMAL128_FIXED16_DATA); + bench_writer_scenario(c, "write-Decimal256(bytes)", &DECIMAL256_DATA); } criterion_group! { diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 6e343736c1e9..adfee4bcebec 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -1013,8 +1013,28 @@ fn datatype_to_avro( }) } } - DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { - // Prefer fixed if original size info present + DataType::Decimal32(precision, scale) + | DataType::Decimal64(precision, scale) + | DataType::Decimal128(precision, scale) + | DataType::Decimal256(precision, scale) => { + // Scale must be >= 0 and <= precision; otherwise the logical + // type is invalid. We surface a schema error at generation time + // to avoid silently downgrading to the base type later. + // Ref: Specification §Logical Types / Decimal. + if *scale < 0 { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0" + ))); + } + let s = *scale as usize; + if s > *precision as usize { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) \ + must be <= precision ({precision})" + ))); + } + // Prefer fixed if an original size hint is present in field metadata. + // Otherwise, emit bytes-backed decimal (reader/writer compatible). let mut meta = JsonMap::from_iter([ ("logicalType".into(), json!("decimal")), ("precision".into(), json!(*precision)), diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index a397f07edbe6..0e79bd5a79ed 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -246,6 +246,30 @@ impl<'a> FieldEncoder<'a> { DataType::LargeBinary => { Encoder::LargeBinary(BinaryEncoder(array.as_binary::())) } + DataType::FixedSizeBinary(len) => { + // Decide between Avro `fixed` (raw bytes) and `uuid` logical string + // based on Field metadata, mirroring schema generation rules. + let arr = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::SchemaError("Expected FixedSizeBinaryArray".into()) + })?; + let md = field.metadata(); + let is_uuid = md.get("logicalType").is_some_and(|v| v == "uuid") + || (*len == 16 + && md.get("ARROW:extension:name").is_some_and(|v| v == "uuid")); + if is_uuid { + if *len != 16 { + return Err(ArrowError::InvalidArgumentError( + "logicalType=uuid requires FixedSizeBinary(16)".into(), + )); + } + Encoder::Uuid(UuidEncoder(arr)) + } else { + Encoder::Fixed(FixedEncoder(arr)) + } + } DataType::Timestamp(TimeUnit::Microsecond, _) => Encoder::Timestamp(LongEncoder( array.as_primitive::(), )), @@ -264,6 +288,26 @@ impl<'a> FieldEncoder<'a> { array.as_primitive::(), )), } + DataType::Duration(_) => { + return Err(ArrowError::NotYetImplemented( + "Avro writer: Arrow Duration(TimeUnit) has no standard Avro mapping; cast to Interval(MonthDayNano) to use Avro 'duration'".into(), + )); + } + // Composite or mismatched types under scalar plan + DataType::List(_) + | DataType::LargeList(_) + | DataType::Map(_, _) + | DataType::Struct(_) + | DataType::Dictionary(_, _) + | DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => { + return Err(ArrowError::SchemaError(format!( + "Avro scalar site incompatible with Arrow type: {:?}", + array.data_type() + ))) + } other => { return Err(ArrowError::NotYetImplemented(format!( "Avro scalar type not yet supported: {other:?}" From c140ed6b52f33ea2397c10f3cb3f6000e795408c Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Mon, 8 Sep 2025 23:04:44 +0200 Subject: [PATCH 03/14] reverting changes to benchmarks, to be included on a follow up PR --- arrow-avro/benches/avro_writer.rs | 394 +----------------------------- 1 file changed, 9 insertions(+), 385 deletions(-) diff --git a/arrow-avro/benches/avro_writer.rs b/arrow-avro/benches/avro_writer.rs index 5daf00dfb09a..b3d623027bbe 100644 --- a/arrow-avro/benches/avro_writer.rs +++ b/arrow-avro/benches/avro_writer.rs @@ -15,22 +15,19 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for `arrow-avro` Writer (Avro Object Container File) +//! Benchmarks for `arrow‑avro` **Writer** (Avro Object Container Files) +//! extern crate arrow_avro; extern crate criterion; extern crate once_cell; use arrow_array::{ - builder::{ListBuilder, StringBuilder}, - types::{Int32Type, Int64Type, IntervalMonthDayNanoType, TimestampMicrosecondType}, - ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Decimal256Array, Decimal32Array, - Decimal64Array, FixedSizeBinaryArray, Float32Array, Float64Array, ListArray, PrimitiveArray, - RecordBatch, StringArray, StructArray, + types::{Int32Type, Int64Type, TimestampMicrosecondType}, + ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, PrimitiveArray, RecordBatch, }; use arrow_avro::writer::AvroWriter; -use arrow_buffer::i256; -use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; use rand::{ @@ -38,7 +35,6 @@ use rand::{ rngs::StdRng, Rng, SeedableRng, }; -use std::collections::HashMap; use std::io::Cursor; use std::sync::Arc; use std::time::Duration; @@ -67,9 +63,7 @@ where #[inline] fn make_bool_array_with_tag(n: usize, tag: u64) -> BooleanArray { let mut rng = rng_for(tag, n); - // Can't use SampleUniform for bool; use the RNG's boolean helper let values = (0..n).map(|_| rng.random_bool(0.5)); - // This repo exposes `from_iter`, not `from_iter_values` for BooleanArray BooleanArray::from_iter(values.map(Some)) } @@ -87,21 +81,6 @@ fn make_i64_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { PrimitiveArray::::from_iter_values(values) } -#[inline] -fn rand_ascii_string(rng: &mut StdRng, min_len: usize, max_len: usize) -> String { - let len = rng.random_range(min_len..=max_len); - (0..len) - .map(|_| (rng.random_range(b'a'..=b'z') as char)) - .collect() -} - -#[inline] -fn make_utf8_array_with_tag(n: usize, tag: u64) -> StringArray { - let mut rng = rng_for(tag, n); - let data: Vec = (0..n).map(|_| rand_ascii_string(&mut rng, 3, 16)).collect(); - StringArray::from_iter_values(data) -} - #[inline] fn make_f32_array_with_tag(n: usize, tag: u64) -> Float32Array { let mut rng = rng_for(tag, n); @@ -119,52 +98,14 @@ fn make_f64_array_with_tag(n: usize, tag: u64) -> Float64Array { #[inline] fn make_binary_array_with_tag(n: usize, tag: u64) -> BinaryArray { let mut rng = rng_for(tag, n); - let mut payloads: Vec> = Vec::with_capacity(n); - for _ in 0..n { - let len = rng.random_range(1..=16); - let mut p = vec![0u8; len]; + let mut payloads: Vec<[u8; 16]> = vec![[0; 16]; n]; + for p in payloads.iter_mut() { rng.fill(&mut p[..]); - payloads.push(p); } let views: Vec<&[u8]> = payloads.iter().map(|p| &p[..]).collect(); - // This repo exposes a simple `from_vec` for BinaryArray BinaryArray::from_vec(views) } -#[inline] -fn make_fixed16_array_with_tag(n: usize, tag: u64) -> FixedSizeBinaryArray { - let mut rng = rng_for(tag, n); - let payloads = (0..n) - .map(|_| { - let mut b = [0u8; 16]; - rng.fill(&mut b); - b - }) - .collect::>(); - // Fixed-size constructor available in this repo - FixedSizeBinaryArray::try_from_iter(payloads.into_iter()).expect("build FixedSizeBinaryArray") -} - -/// Make an Arrow `Interval(IntervalUnit::MonthDayNano)` array with **non-negative** -/// (months, days, nanos) values, and nanos as **multiples of 1_000_000** (whole ms), -/// per Avro `duration` constraints used by the writer. -#[inline] -fn make_interval_mdn_array_with_tag( - n: usize, - tag: u64, -) -> PrimitiveArray { - let mut rng = rng_for(tag, n); - let values = (0..n).map(|_| { - let months: i32 = rng.random_range(0..=120); - let days: i32 = rng.random_range(0..=31); - // pick millis within a day (safe within u32::MAX and realistic) - let millis: u32 = rng.random_range(0..=86_400_000); - let nanos: i64 = (millis as i64) * 1_000_000; - IntervalMonthDayNanoType::make_value(months, days, nanos) - }); - PrimitiveArray::::from_iter_values(values) -} - #[inline] fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { let mut rng = rng_for(tag, n); @@ -174,80 +115,9 @@ fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray::from_iter_values(values) } -// === Decimal helpers & generators === - -#[inline] -fn pow10_i32(p: u8) -> i32 { - (0..p).fold(1i32, |acc, _| acc.saturating_mul(10)) -} - -#[inline] -fn pow10_i64(p: u8) -> i64 { - (0..p).fold(1i64, |acc, _| acc.saturating_mul(10)) -} - -#[inline] -fn pow10_i128(p: u8) -> i128 { - (0..p).fold(1i128, |acc, _| acc.saturating_mul(10)) -} - -#[inline] -fn make_decimal32_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal32Array { - let mut rng = rng_for(tag, n); - let max = pow10_i32(precision).saturating_sub(1); - let values = (0..n).map(|_| rng.random_range(-max..=max)); - Decimal32Array::from_iter_values(values) - .with_precision_and_scale(precision, scale) - .expect("set precision/scale on Decimal32Array") -} - -#[inline] -fn make_decimal64_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal64Array { - let mut rng = rng_for(tag, n); - let max = pow10_i64(precision).saturating_sub(1); - let values = (0..n).map(|_| rng.random_range(-max..=max)); - Decimal64Array::from_iter_values(values) - .with_precision_and_scale(precision, scale) - .expect("set precision/scale on Decimal64Array") -} - -#[inline] -fn make_decimal128_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal128Array { - let mut rng = rng_for(tag, n); - let max = pow10_i128(precision).saturating_sub(1); - let values = (0..n).map(|_| rng.random_range(-max..=max)); - Decimal128Array::from_iter_values(values) - .with_precision_and_scale(precision, scale) - .expect("set precision/scale on Decimal128Array") -} - -#[inline] -fn make_decimal256_array_with_tag(n: usize, tag: u64, precision: u8, scale: i8) -> Decimal256Array { - // Generate within i128 range and widen to i256 to keep generation cheap and portable - let mut rng = rng_for(tag, n); - let max128 = pow10_i128(30).saturating_sub(1); - let values = (0..n).map(|_| { - let v: i128 = rng.random_range(-max128..=max128); - i256::from_i128(v) - }); - Decimal256Array::from_iter_values(values) - .with_precision_and_scale(precision, scale) - .expect("set precision/scale on Decimal256Array") -} - -#[inline] -fn make_fixed16_array(n: usize) -> FixedSizeBinaryArray { - make_fixed16_array_with_tag(n, 0xF15E_D016) -} - -#[inline] -fn make_interval_mdn_array(n: usize) -> PrimitiveArray { - make_interval_mdn_array_with_tag(n, 0xD0_1E_AD) -} - #[inline] fn make_bool_array(n: usize) -> BooleanArray { - make_bool_array_with_tag(n, 0xB00_1) + make_bool_array_with_tag(n, 0xB001) } #[inline] fn make_i32_array(n: usize) -> PrimitiveArray { @@ -273,57 +143,6 @@ fn make_binary_array(n: usize) -> BinaryArray { fn make_ts_micros_array(n: usize) -> PrimitiveArray { make_ts_micros_array_with_tag(n, 0x7157_0001) } -#[inline] -fn make_utf8_array(n: usize) -> StringArray { - make_utf8_array_with_tag(n, 0x5712_07F8) -} -#[inline] -fn make_list_utf8_array(n: usize) -> ListArray { - make_list_utf8_array_with_tag(n, 0x0A11_57ED) -} -#[inline] -fn make_struct_array(n: usize) -> StructArray { - make_struct_array_with_tag(n, 0x57_AB_C7) -} - -#[inline] -fn make_list_utf8_array_with_tag(n: usize, tag: u64) -> ListArray { - let mut rng = rng_for(tag, n); - let mut builder = ListBuilder::new(StringBuilder::new()); - for _ in 0..n { - let items = rng.random_range(0..=5); - for _ in 0..items { - let s = rand_ascii_string(&mut rng, 1, 12); - builder.values().append_value(s.as_str()); - } - builder.append(true); - } - builder.finish() -} - -#[inline] -fn make_struct_array_with_tag(n: usize, tag: u64) -> StructArray { - let s_tag = tag ^ 0x5u64; - let i_tag = tag ^ 0x6u64; - let f_tag = tag ^ 0x7u64; - let s_col: ArrayRef = Arc::new(make_utf8_array_with_tag(n, s_tag)); - let i_col: ArrayRef = Arc::new(make_i32_array_with_tag(n, i_tag)); - let f_col: ArrayRef = Arc::new(make_f64_array_with_tag(n, f_tag)); - StructArray::from(vec![ - ( - Arc::new(Field::new("s1", DataType::Utf8, false)), - s_col.clone(), - ), - ( - Arc::new(Field::new("s2", DataType::Int32, false)), - i_col.clone(), - ), - ( - Arc::new(Field::new("s3", DataType::Float64, false)), - f_col.clone(), - ), - ]) -} #[inline] fn schema_single(name: &str, dt: DataType) -> Arc { @@ -340,36 +159,6 @@ fn schema_mixed() -> Arc { ])) } -#[inline] -fn schema_fixed16() -> Arc { - schema_single("field1", DataType::FixedSizeBinary(16)) -} - -#[inline] -fn schema_uuid16() -> Arc { - let mut md = HashMap::new(); - md.insert("logicalType".to_string(), "uuid".to_string()); - let field = Field::new("uuid", DataType::FixedSizeBinary(16), false).with_metadata(md); - Arc::new(Schema::new(vec![field])) -} - -#[inline] -fn schema_interval_mdn() -> Arc { - schema_single("duration", DataType::Interval(IntervalUnit::MonthDayNano)) -} - -#[inline] -fn schema_decimal_with_size(name: &str, dt: DataType, size_meta: Option) -> Arc { - let field = if let Some(size) = size_meta { - let mut md = HashMap::new(); - md.insert("size".to_string(), size.to_string()); - Field::new(name, dt, false).with_metadata(md) - } else { - Field::new(name, dt, false) - }; - Arc::new(Schema::new(vec![field])) -} - static BOOLEAN_DATA: Lazy> = Lazy::new(|| { let schema = schema_single("field1", DataType::Boolean); SIZES @@ -436,40 +225,6 @@ static BINARY_DATA: Lazy> = Lazy::new(|| { .collect() }); -static FIXED16_DATA: Lazy> = Lazy::new(|| { - let schema = schema_fixed16(); - SIZES - .iter() - .map(|&n| { - let col: ArrayRef = Arc::new(make_fixed16_array(n)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static UUID16_DATA: Lazy> = Lazy::new(|| { - let schema = schema_uuid16(); - SIZES - .iter() - .map(|&n| { - // Same values as Fixed16; writer path differs because of field metadata - let col: ArrayRef = Arc::new(make_fixed16_array_with_tag(n, 0x7575_6964_7575_6964)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static INTERVAL_MDN_DATA: Lazy> = Lazy::new(|| { - let schema = schema_interval_mdn(); - SIZES - .iter() - .map(|&n| { - let col: ArrayRef = Arc::new(make_interval_mdn_array(n)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - static TIMESTAMP_US_DATA: Lazy> = Lazy::new(|| { let schema = schema_single("field1", DataType::Timestamp(TimeUnit::Microsecond, None)); SIZES @@ -495,126 +250,6 @@ static MIXED_DATA: Lazy> = Lazy::new(|| { .collect() }); -static UTF8_DATA: Lazy> = Lazy::new(|| { - let schema = schema_single("field1", DataType::Utf8); - SIZES - .iter() - .map(|&n| { - let col: ArrayRef = Arc::new(make_utf8_array(n)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static LIST_UTF8_DATA: Lazy> = Lazy::new(|| { - // IMPORTANT: ListBuilder creates a child field named "item" that is nullable by default. - // Make the schema's list item nullable to match the array we construct. - let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); - let schema = schema_single("field1", DataType::List(item_field)); - SIZES - .iter() - .map(|&n| { - let col: ArrayRef = Arc::new(make_list_utf8_array(n)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static STRUCT_DATA: Lazy> = Lazy::new(|| { - let struct_dt = DataType::Struct( - vec![ - Field::new("s1", DataType::Utf8, false), - Field::new("s2", DataType::Int32, false), - Field::new("s3", DataType::Float64, false), - ] - .into(), - ); - let schema = schema_single("field1", struct_dt); - SIZES - .iter() - .map(|&n| { - let col: ArrayRef = Arc::new(make_struct_array(n)); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -// === NEW: Decimal datasets === - -static DECIMAL32_DATA: Lazy> = Lazy::new(|| { - // Choose a representative precision/scale within Decimal32 limits - let precision: u8 = 7; - let scale: i8 = 2; - let schema = schema_single("amount", DataType::Decimal32(precision, scale)); - SIZES - .iter() - .map(|&n| { - let arr = make_decimal32_array_with_tag(n, 0xDEC_0032, precision, scale); - let col: ArrayRef = Arc::new(arr); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static DECIMAL64_DATA: Lazy> = Lazy::new(|| { - let precision: u8 = 13; - let scale: i8 = 3; - let schema = schema_single("amount", DataType::Decimal64(precision, scale)); - SIZES - .iter() - .map(|&n| { - let arr = make_decimal64_array_with_tag(n, 0xDEC_0064, precision, scale); - let col: ArrayRef = Arc::new(arr); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static DECIMAL128_BYTES_DATA: Lazy> = Lazy::new(|| { - let precision: u8 = 25; - let scale: i8 = 6; - let schema = schema_single("amount", DataType::Decimal128(precision, scale)); - SIZES - .iter() - .map(|&n| { - let arr = make_decimal128_array_with_tag(n, 0xDEC_0128, precision, scale); - let col: ArrayRef = Arc::new(arr); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static DECIMAL128_FIXED16_DATA: Lazy> = Lazy::new(|| { - // Same logical type as above but force Avro fixed(16) via metadata "size": "16" - let precision: u8 = 25; - let scale: i8 = 6; - let schema = - schema_decimal_with_size("amount", DataType::Decimal128(precision, scale), Some(16)); - SIZES - .iter() - .map(|&n| { - let arr = make_decimal128_array_with_tag(n, 0xDEC_F128, precision, scale); - let col: ArrayRef = Arc::new(arr); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - -static DECIMAL256_DATA: Lazy> = Lazy::new(|| { - // Use a higher precision typical of 256-bit decimals - let precision: u8 = 50; - let scale: i8 = 10; - let schema = schema_single("amount", DataType::Decimal256(precision, scale)); - SIZES - .iter() - .map(|&n| { - let arr = make_decimal256_array_with_tag(n, 0xDEC_0256, precision, scale); - let col: ArrayRef = Arc::new(arr); - RecordBatch::try_new(schema.clone(), vec![col]).unwrap() - }) - .collect() -}); - fn ocf_size_for_batch(batch: &RecordBatch) -> usize { let schema_owned: Schema = (*batch.schema()).clone(); let cursor = Cursor::new(Vec::::with_capacity(1024)); @@ -679,17 +314,6 @@ fn criterion_benches(c: &mut Criterion) { bench_writer_scenario(c, "write-Binary(Bytes)", &BINARY_DATA); bench_writer_scenario(c, "write-TimestampMicros", &TIMESTAMP_US_DATA); bench_writer_scenario(c, "write-Mixed", &MIXED_DATA); - bench_writer_scenario(c, "write-Utf8", &UTF8_DATA); - bench_writer_scenario(c, "write-List", &LIST_UTF8_DATA); - bench_writer_scenario(c, "write-Struct", &STRUCT_DATA); - bench_writer_scenario(c, "write-FixedSizeBinary16", &FIXED16_DATA); - bench_writer_scenario(c, "write-UUID(logicalType)", &UUID16_DATA); - bench_writer_scenario(c, "write-IntervalMonthDayNanoDuration", &INTERVAL_MDN_DATA); - bench_writer_scenario(c, "write-Decimal32(bytes)", &DECIMAL32_DATA); - bench_writer_scenario(c, "write-Decimal64(bytes)", &DECIMAL64_DATA); - bench_writer_scenario(c, "write-Decimal128(bytes)", &DECIMAL128_BYTES_DATA); - bench_writer_scenario(c, "write-Decimal128(fixed16)", &DECIMAL128_FIXED16_DATA); - bench_writer_scenario(c, "write-Decimal256(bytes)", &DECIMAL256_DATA); } criterion_group! { @@ -697,4 +321,4 @@ criterion_group! { config = Criterion::default().configure_from_args(); targets = criterion_benches } -criterion_main!(avro_writer); +criterion_main!(avro_writer); \ No newline at end of file From c01df2bd9cf4d777aad5df896b134e45d76d64ea Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Mon, 8 Sep 2025 23:15:04 +0200 Subject: [PATCH 04/14] Remove redundant newline at EOF in avro_writer.rs benchmark file --- arrow-avro/benches/avro_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/benches/avro_writer.rs b/arrow-avro/benches/avro_writer.rs index b3d623027bbe..924cbbdc84bd 100644 --- a/arrow-avro/benches/avro_writer.rs +++ b/arrow-avro/benches/avro_writer.rs @@ -321,4 +321,4 @@ criterion_group! { config = Criterion::default().configure_from_args(); targets = criterion_benches } -criterion_main!(avro_writer); \ No newline at end of file +criterion_main!(avro_writer); From 10fe4357c28cd52bf0b2f54f9b6f52a0cae99a2f Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 00:04:22 +0200 Subject: [PATCH 05/14] Add unit tests for complex Avro encoder types (List, Struct, Enum, Decimal, Map) in arrow-avro --- arrow-avro/src/writer/encoder.rs | 126 +++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 0e79bd5a79ed..3874d0d6a22a 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -1412,4 +1412,130 @@ mod tests { let got = encode_all(&arr, &FieldPlan::Scalar, None); assert_bytes_eq(&got, &expected); } + + #[test] + fn list_encoder_int32() { + // Build ListArray [[1,2], [], [3]] + let values = Int32Array::from(vec![1, 2, 3]); + let offsets = vec![0, 2, 2, 3]; + let list = ListArray::new( + Field::new("item", DataType::Int32, true).into(), + arrow_buffer::OffsetBuffer::new(offsets.into()), + Arc::new(values) as ArrayRef, + None, + ); + // Avro array encoding per row + let mut expected = Vec::new(); + // row 0: block len 2, items 1,2 then 0 + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(1)); + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(0)); + // row 1: empty + expected.extend(avro_long_bytes(0)); + // row 2: one item 3 + expected.extend(avro_long_bytes(1)); + expected.extend(avro_long_bytes(3)); + expected.extend(avro_long_bytes(0)); + + let plan = FieldPlan::List { items_nullability: None, item_plan: Box::new(FieldPlan::Scalar) }; + let got = encode_all(&list, &plan, None); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn struct_encoder_two_fields() { + // Struct { a: Int32, b: Utf8 } + let a = Int32Array::from(vec![1, 2]); + let b = StringArray::from(vec!["x", "y"]); + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ]); + let struct_arr = StructArray::new(fields.clone(), vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], None); + let plan = FieldPlan::Struct { encoders: vec![ + FieldBinding { arrow_index: 0, nullability: None, plan: FieldPlan::Scalar }, + FieldBinding { arrow_index: 1, nullability: None, plan: FieldPlan::Scalar }, + ]}; + let got = encode_all(&struct_arr, &plan, None); + // Expected: rows concatenated: a then b + let mut expected = Vec::new(); + expected.extend(avro_long_bytes(1)); // a=1 + expected.extend(avro_len_prefixed_bytes(b"x")); // b="x" + expected.extend(avro_long_bytes(2)); // a=2 + expected.extend(avro_len_prefixed_bytes(b"y")); // b="y" + assert_bytes_eq(&got, &expected); + } + + #[test] + fn enum_encoder_dictionary() { + // symbols: ["A","B","C"], keys [2,0,1] + let dict_values = StringArray::from(vec!["A","B","C"]); + let keys = Int32Array::from(vec![2,0,1]); + let dict = DictionaryArray::::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap(); + let symbols = Arc::<[String]>::from(vec!["A".to_string(),"B".to_string(),"C".to_string()].into_boxed_slice()); + let plan = FieldPlan::Enum { symbols }; + let got = encode_all(&dict, &plan, None); + let mut expected = Vec::new(); + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(0)); + expected.extend(avro_long_bytes(1)); + assert_bytes_eq(&got, &expected); + } + + #[test] + fn decimal_bytes_and_fixed() { + // Decimal64 with small positives and negatives + let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64]) + .with_precision_and_scale(10, 0) + .unwrap(); + // bytes(decimal): minimal two's complement length-prefixed + let plan_bytes = FieldPlan::Decimal { size: None }; + let got_bytes = encode_all(&dec, &plan_bytes, None); + // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00 + let mut expected_bytes = Vec::new(); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01][7..])); // 0x01 + expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); + expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); + assert_bytes_eq(&got_bytes, &expected_bytes); + + // fixed(8): sign-extend to 8 bytes as-is + let plan_fixed = FieldPlan::Decimal { size: Some(8) }; + let got_fixed = encode_all(&dec, &plan_fixed, None); + let mut expected_fixed = Vec::new(); + expected_fixed.extend_from_slice(&1i64.to_be_bytes()); + expected_fixed.extend_from_slice(&(-1i64).to_be_bytes()); + expected_fixed.extend_from_slice(&0i64.to_be_bytes()); + assert_bytes_eq(&got_fixed, &expected_fixed); + } + + #[test] + fn map_encoder_string_keys_int_values() { + // Build MapArray with two rows + // Row0: {"k1":1, "k2":2} + // Row1: {} + let keys = StringArray::from(vec!["k1","k2"]); + let values = Int32Array::from(vec![1,2]); + let entries_fields = Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ]); + let entries = StructArray::new(entries_fields, vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], None); + let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into()); + let map = MapArray::new(Field::new("entries", entries.data_type().clone(), false).into(), offsets, entries, None, false); + let plan = FieldPlan::Map { values_nullability: None, value_plan: Box::new(FieldPlan::Scalar) }; + let got = encode_all(&map, &plan, None); + // Expected Avro per row: arrays of key,value + let mut expected = Vec::new(); + // Row0: block 2 then pairs + expected.extend(avro_long_bytes(2)); + expected.extend(avro_len_prefixed_bytes(b"k1")); + expected.extend(avro_long_bytes(1)); + expected.extend(avro_len_prefixed_bytes(b"k2")); + expected.extend(avro_long_bytes(2)); + expected.extend(avro_long_bytes(0)); + // Row1: empty + expected.extend(avro_long_bytes(0)); + assert_bytes_eq(&got, &expected); + } } From 1ce31cec53c741da0bcfb83968147bf9f8b2c3bf Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 00:31:45 +0200 Subject: [PATCH 06/14] Refactor: format FieldPlan and related structs in tests for improved readability --- arrow-avro/src/writer/encoder.rs | 67 ++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index 3874d0d6a22a..ff6222952f7a 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -1438,7 +1438,10 @@ mod tests { expected.extend(avro_long_bytes(3)); expected.extend(avro_long_bytes(0)); - let plan = FieldPlan::List { items_nullability: None, item_plan: Box::new(FieldPlan::Scalar) }; + let plan = FieldPlan::List { + items_nullability: None, + item_plan: Box::new(FieldPlan::Scalar), + }; let got = encode_all(&list, &plan, None); assert_bytes_eq(&got, &expected); } @@ -1452,11 +1455,25 @@ mod tests { Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), ]); - let struct_arr = StructArray::new(fields.clone(), vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], None); - let plan = FieldPlan::Struct { encoders: vec![ - FieldBinding { arrow_index: 0, nullability: None, plan: FieldPlan::Scalar }, - FieldBinding { arrow_index: 1, nullability: None, plan: FieldPlan::Scalar }, - ]}; + let struct_arr = StructArray::new( + fields.clone(), + vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef], + None, + ); + let plan = FieldPlan::Struct { + encoders: vec![ + FieldBinding { + arrow_index: 0, + nullability: None, + plan: FieldPlan::Scalar, + }, + FieldBinding { + arrow_index: 1, + nullability: None, + plan: FieldPlan::Scalar, + }, + ], + }; let got = encode_all(&struct_arr, &plan, None); // Expected: rows concatenated: a then b let mut expected = Vec::new(); @@ -1470,10 +1487,13 @@ mod tests { #[test] fn enum_encoder_dictionary() { // symbols: ["A","B","C"], keys [2,0,1] - let dict_values = StringArray::from(vec!["A","B","C"]); - let keys = Int32Array::from(vec![2,0,1]); - let dict = DictionaryArray::::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap(); - let symbols = Arc::<[String]>::from(vec!["A".to_string(),"B".to_string(),"C".to_string()].into_boxed_slice()); + let dict_values = StringArray::from(vec!["A", "B", "C"]); + let keys = Int32Array::from(vec![2, 0, 1]); + let dict = + DictionaryArray::::try_new(keys, Arc::new(dict_values) as ArrayRef).unwrap(); + let symbols = Arc::<[String]>::from( + vec!["A".to_string(), "B".to_string(), "C".to_string()].into_boxed_slice(), + ); let plan = FieldPlan::Enum { symbols }; let got = encode_all(&dict, &plan, None); let mut expected = Vec::new(); @@ -1494,7 +1514,9 @@ mod tests { let got_bytes = encode_all(&dec, &plan_bytes, None); // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00 let mut expected_bytes = Vec::new(); - expected_bytes.extend(avro_len_prefixed_bytes(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01][7..])); // 0x01 + expected_bytes.extend(avro_len_prefixed_bytes( + &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01][7..], + )); // 0x01 expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); assert_bytes_eq(&got_bytes, &expected_bytes); @@ -1514,16 +1536,29 @@ mod tests { // Build MapArray with two rows // Row0: {"k1":1, "k2":2} // Row1: {} - let keys = StringArray::from(vec!["k1","k2"]); - let values = Int32Array::from(vec![1,2]); + let keys = StringArray::from(vec!["k1", "k2"]); + let values = Int32Array::from(vec![1, 2]); let entries_fields = Fields::from(vec![ Field::new("key", DataType::Utf8, false), Field::new("value", DataType::Int32, true), ]); - let entries = StructArray::new(entries_fields, vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], None); + let entries = StructArray::new( + entries_fields, + vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef], + None, + ); let offsets = arrow_buffer::OffsetBuffer::new(vec![0i32, 2, 2].into()); - let map = MapArray::new(Field::new("entries", entries.data_type().clone(), false).into(), offsets, entries, None, false); - let plan = FieldPlan::Map { values_nullability: None, value_plan: Box::new(FieldPlan::Scalar) }; + let map = MapArray::new( + Field::new("entries", entries.data_type().clone(), false).into(), + offsets, + entries, + None, + false, + ); + let plan = FieldPlan::Map { + values_nullability: None, + value_plan: Box::new(FieldPlan::Scalar), + }; let got = encode_all(&map, &plan, None); // Expected Avro per row: arrays of key,value let mut expected = Vec::new(); From 5c5ea8f32becdcea576b7844c3edd1539e7fac65 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 15:51:43 +0200 Subject: [PATCH 07/14] Add support for `small_decimals` feature in arrow-avro encoder --- arrow-avro/src/writer/encoder.rs | 38 +++++++++++++++++++------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index ff6222952f7a..0c3886e54c42 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -315,6 +315,7 @@ impl<'a> FieldEncoder<'a> { } }, FieldPlan::Decimal {size} => match array.data_type() { + #[cfg(feature = "small_decimals")] DataType::Decimal32(_,_) => { let arr = array .as_any() @@ -322,25 +323,26 @@ impl<'a> FieldEncoder<'a> { .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; Encoder::Decimal32(DecimalEncoder::<4, Decimal32Array>::new(arr, *size)) } + #[cfg(feature = "small_decimals")] DataType::Decimal64(_,_) => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal64Array".into()))?; Encoder::Decimal64(DecimalEncoder::<8, Decimal64Array>::new(arr, *size)) } DataType::Decimal128(_,_) => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal128Array".into()))?; Encoder::Decimal128(DecimalEncoder::<16, Decimal128Array>::new(arr, *size)) } DataType::Decimal256(_,_) => { let arr = array .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::SchemaError("Expected Decimal32Array".into()))?; + .ok_or_else(|| ArrowError::SchemaError("Expected Decimal256Array".into()))?; Encoder::Decimal256(DecimalEncoder::<32, Decimal256Array>::new(arr, *size)) } other => { @@ -704,7 +706,9 @@ impl FieldPlan { // decimal site (bytes or fixed(N)) with precision/scale validation Codec::Decimal(precision, scale_opt, fixed_size_opt) => { let (ap, as_) = match arrow_field.data_type() { + #[cfg(feature = "small_decimals")] DataType::Decimal32(p, s) => (*p as usize, *s as i32), + #[cfg(feature = "small_decimals")] DataType::Decimal64(p, s) => (*p as usize, *s as i32), DataType::Decimal128(p, s) => (*p as usize, *s as i32), DataType::Decimal256(p, s) => (*p as usize, *s as i32), @@ -764,7 +768,9 @@ enum Encoder<'a> { IntervalDayTime(IntervalDayTimeEncoder<'a>), /// Avro `enum` encoder: writes the key (int) as the enum index. Enum(EnumEncoder<'a>), + #[cfg(feature = "small_decimals")] Decimal32(Decimal32Encoder<'a>), + #[cfg(feature = "small_decimals")] Decimal64(Decimal64Encoder<'a>), Decimal128(Decimal128Encoder<'a>), Decimal256(Decimal256Encoder<'a>), @@ -794,7 +800,9 @@ impl<'a> Encoder<'a> { Encoder::IntervalYearMonth(e) => (e).encode(out, idx), Encoder::IntervalDayTime(e) => (e).encode(out, idx), Encoder::Enum(e) => (e).encode(out, idx), + #[cfg(feature = "small_decimals")] Encoder::Decimal32(e) => (e).encode(out, idx), + #[cfg(feature = "small_decimals")] Encoder::Decimal64(e) => (e).encode(out, idx), Encoder::Decimal128(e) => (e).encode(out, idx), Encoder::Decimal256(e) => (e).encode(out, idx), @@ -1230,12 +1238,13 @@ impl IntervalDayTimeEncoder<'_> { trait DecimalBeBytes { fn value_be_bytes(&self, idx: usize) -> [u8; N]; } - +#[cfg(feature = "small_decimals")] impl DecimalBeBytes<4> for Decimal32Array { fn value_be_bytes(&self, idx: usize) -> [u8; 4] { self.value(idx).to_be_bytes() } } +#[cfg(feature = "small_decimals")] impl DecimalBeBytes<8> for Decimal64Array { fn value_be_bytes(&self, idx: usize) -> [u8; 8] { self.value(idx).to_be_bytes() @@ -1281,7 +1290,9 @@ impl<'a, const N: usize, A: DecimalBeBytes> DecimalEncoder<'a, N, A> { } } +#[cfg(feature = "small_decimals")] type Decimal32Encoder<'a> = DecimalEncoder<'a, 4, Decimal32Array>; +#[cfg(feature = "small_decimals")] type Decimal64Encoder<'a> = DecimalEncoder<'a, 8, Decimal64Array>; type Decimal128Encoder<'a> = DecimalEncoder<'a, 16, Decimal128Array>; type Decimal256Encoder<'a> = DecimalEncoder<'a, 32, Decimal256Array>; @@ -1505,29 +1516,26 @@ mod tests { #[test] fn decimal_bytes_and_fixed() { - // Decimal64 with small positives and negatives - let dec = Decimal64Array::from(vec![1i64, -1i64, 0i64]) - .with_precision_and_scale(10, 0) + // Use Decimal128 with small positives and negatives + let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128]) + .with_precision_and_scale(20, 0) .unwrap(); // bytes(decimal): minimal two's complement length-prefixed let plan_bytes = FieldPlan::Decimal { size: None }; let got_bytes = encode_all(&dec, &plan_bytes, None); // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00 let mut expected_bytes = Vec::new(); - expected_bytes.extend(avro_len_prefixed_bytes( - &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01][7..], - )); // 0x01 + expected_bytes.extend(avro_len_prefixed_bytes(&[0x01])); expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF])); expected_bytes.extend(avro_len_prefixed_bytes(&[0x00])); assert_bytes_eq(&got_bytes, &expected_bytes); - // fixed(8): sign-extend to 8 bytes as-is - let plan_fixed = FieldPlan::Decimal { size: Some(8) }; + let plan_fixed = FieldPlan::Decimal { size: Some(16) }; let got_fixed = encode_all(&dec, &plan_fixed, None); let mut expected_fixed = Vec::new(); - expected_fixed.extend_from_slice(&1i64.to_be_bytes()); - expected_fixed.extend_from_slice(&(-1i64).to_be_bytes()); - expected_fixed.extend_from_slice(&0i64.to_be_bytes()); + expected_fixed.extend_from_slice(&1i128.to_be_bytes()); + expected_fixed.extend_from_slice(&(-1i128).to_be_bytes()); + expected_fixed.extend_from_slice(&0i128.to_be_bytes()); assert_bytes_eq(&got_fixed, &expected_fixed); } From a39386b7923634e21368323faa335dfb42e658f4 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 16:20:55 +0200 Subject: [PATCH 08/14] Additiona conditional support for `small_decimals` feature. --- arrow-avro/src/schema.rs | 74 +++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index adfee4bcebec..e73b1050c797 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -984,6 +984,36 @@ fn datatype_to_avro( null_order: Nullability, ) -> Result<(Value, JsonMap), ArrowError> { let mut extras = JsonMap::new(); + let mut handle_decimal = |precision: &u8, scale: &i8| -> Result { + if *scale < 0 { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0" + ))); + } + if (*scale as usize) > (*precision as usize) { + return Err(ArrowError::SchemaError(format!( + "Invalid Avro decimal for field '{field_name}': scale ({scale}) \ + must be <= precision ({precision})" + ))); + } + + let mut meta = JsonMap::from_iter([ + ("logicalType".into(), json!("decimal")), + ("precision".into(), json!(*precision)), + ("scale".into(), json!(*scale)), + ]); + if let Some(size) = metadata + .get("size") + .and_then(|val| val.parse::().ok()) + { + meta.insert("type".into(), json!("fixed")); + meta.insert("size".into(), json!(size)); + meta.insert("name".into(), json!(name_gen.make_unique(field_name))); + } else { + meta.insert("type".into(), json!("bytes")); + } + Ok(Value::Object(meta)) + }; let val = match dt { DataType::Null => Value::String("null".into()), DataType::Boolean => Value::String("boolean".into()), @@ -1013,44 +1043,12 @@ fn datatype_to_avro( }) } } - DataType::Decimal32(precision, scale) - | DataType::Decimal64(precision, scale) - | DataType::Decimal128(precision, scale) - | DataType::Decimal256(precision, scale) => { - // Scale must be >= 0 and <= precision; otherwise the logical - // type is invalid. We surface a schema error at generation time - // to avoid silently downgrading to the base type later. - // Ref: Specification §Logical Types / Decimal. - if *scale < 0 { - return Err(ArrowError::SchemaError(format!( - "Invalid Avro decimal for field '{field_name}': scale ({scale}) must be >= 0" - ))); - } - let s = *scale as usize; - if s > *precision as usize { - return Err(ArrowError::SchemaError(format!( - "Invalid Avro decimal for field '{field_name}': scale ({scale}) \ - must be <= precision ({precision})" - ))); - } - // Prefer fixed if an original size hint is present in field metadata. - // Otherwise, emit bytes-backed decimal (reader/writer compatible). - let mut meta = JsonMap::from_iter([ - ("logicalType".into(), json!("decimal")), - ("precision".into(), json!(*precision)), - ("scale".into(), json!(*scale)), - ]); - if let Some(size) = metadata - .get("size") - .and_then(|val| val.parse::().ok()) - { - meta.insert("type".into(), json!("fixed")); - meta.insert("size".into(), json!(size)); - meta.insert("name".into(), json!(name_gen.make_unique(field_name))); - } else { - meta.insert("type".into(), json!("bytes")); - } - Value::Object(meta) + #[cfg(feature = "small_decimals")] + DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) => { + handle_decimal(precision, scale)? + } + DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { + handle_decimal(precision, scale)? } DataType::Date32 => json!({ "type": "int", "logicalType": "date" }), DataType::Date64 => json!({ "type": "long", "logicalType": "local-timestamp-millis" }), From 620715002f8cb4e199f44e1dd949d4f7b324691f Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 16:50:41 +0200 Subject: [PATCH 09/14] Add unit tests for round-trip validation of Avro encoder, covering Fixed, Enum, Map, Decimal, and Interval types --- arrow-avro/src/writer/mod.rs | 217 +++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index a5b2691bb816..f8aaaea54746 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -415,4 +415,221 @@ mod tests { ); Ok(()) } + + #[test] + fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> { + let path = arrow_test_data("avro/simple_fixed.avro"); + let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build avro reader"); + let schema = reader.schema(); + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&schema, &input_batches).expect("concat input"); + let tmp = NamedTempFile::new().expect("create temp file"); + let out_file = File::create(tmp.path()).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + drop(writer); + let rt_file = File::open(tmp.path()).expect("open round_trip avro"); + let mut rt_reader = ReaderBuilder::new() + .build(BufReader::new(rt_file)) + .expect("build round_trip reader"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let round_trip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); + assert_eq!(round_trip, original); + Ok(()) + } + + #[test] + fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> { + let in_file = + File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(in_file)) + .expect("build reader for duration_uuid.avro"); + let in_schema = reader.schema(); + let has_mdn = in_schema.fields().iter().any(|f| { + matches!( + f.data_type(), + DataType::Interval(IntervalUnit::MonthDayNano) + ) + }); + assert!( + has_mdn, + "expected at least one Interval(MonthDayNano) field in duration_uuid.avro" + ); + let has_uuid_fixed = in_schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))); + assert!( + has_uuid_fixed, + "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro" + ); + let input_batches = reader.collect::, _>>()?; + let input = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + let tmp = NamedTempFile::new().expect("create temp file"); + { + let out_file = File::create(tmp.path()).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?; + writer.write(&input)?; + writer.finish()?; + } + let rt_file = File::open(tmp.path()).expect("open round_trip avro"); + let mut rt_reader = ReaderBuilder::new() + .build(BufReader::new(rt_file)) + .expect("build round_trip reader"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let round_trip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); + assert_eq!(round_trip, input); + Ok(()) + } + + // This test reads the same 'nonnullable.impala.avro' used by the reader tests, + // writes it back out with the writer (hitting Map encoding paths), then reads it + // again and asserts exact Arrow equivalence. + #[test] + fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> { + // Load source Avro with Map fields + let path = arrow_test_data("avro/nonnullable.impala.avro"); + let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build reader for nonnullable.impala.avro"); + // Collect all input batches and concatenate to a single RecordBatch + let in_schema = reader.schema(); + // Sanity: ensure the file actually contains at least one Map field + let has_map = in_schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::Map(_, _))); + assert!( + has_map, + "expected at least one Map field in avro/nonnullable.impala.avro" + ); + + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + // Write out using the OCF writer into an in-memory Vec + let buffer = Vec::::new(); + let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + let out_bytes = writer.into_inner(); + // Read the produced bytes back with the Reader + let mut rt_reader = ReaderBuilder::new() + .build(Cursor::new(out_bytes)) + .expect("build reader for round-tripped in-memory OCF"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); + // Exact value fidelity (schema + data) + assert_eq!( + roundtrip, original, + "Round-trip Avro map data mismatch for nonnullable.impala.avro" + ); + Ok(()) + } + + #[test] + fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> { + // (file, resolve via ARROW_TEST_DATA?) + let files: [(&str, bool); 8] = [ + ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2) + ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2) + ("avro/int32_decimal.avro", true), // bytes-backed -> Decimal32(4,2) + ("avro/int64_decimal.avro", true), // bytes-backed -> Decimal64(10,2) + ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2) + ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10) + ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2) + ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2) + ]; + for (rel, in_test_data_dir) in files { + // Resolve path the same way as reader::test_decimal + let path: String = if in_test_data_dir { + arrow_test_data(rel) + } else { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join(rel) + .to_string_lossy() + .into_owned() + }; + // Read original file into a single RecordBatch for comparison + let f_in = File::open(&path).expect("open input avro"); + let mut rdr = ReaderBuilder::new().build(BufReader::new(f_in))?; + let in_schema = rdr.schema(); + let in_batches = rdr.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input"); + // Write it out with the OCF writer (no special compression) + let tmp = NamedTempFile::new().expect("create temp file"); + let out_path = tmp.into_temp_path(); + let out_file = File::create(&out_path).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + // Read back the file we just wrote and compare equality (schema + data) + let f_rt = File::open(&out_path).expect("open roundtrip avro"); + let mut rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?; + let rt_schema = rt_rdr.schema(); + let rt_batches = rt_rdr.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt"); + assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}"); + } + Ok(()) + } + + #[test] + fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> { + // Read the known-good enum file (same as reader::test_simple) + let path = arrow_test_data("avro/simple_enum.avro"); + let rdr_file = File::open(&path).expect("open avro/simple_enum.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build reader for simple_enum.avro"); + // Concatenate all batches to one RecordBatch for a clean equality check + let in_schema = reader.schema(); + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum) + let has_enum_dict = in_schema.fields().iter().any(|f| { + matches!( + f.data_type(), + DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8 + ) + }); + assert!( + has_enum_dict, + "Expected at least one enum-mapped Dictionary field" + ); + // Write with OCF writer into memory using the reader-provided Arrow schema. + // The writer will embed the Avro JSON from `avro.schema` metadata if present. + let buffer: Vec = Vec::new(); + let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + let bytes = writer.into_inner(); + // Read back and compare for exact equality (schema + data) + let mut rt_reader = ReaderBuilder::new() + .build(Cursor::new(bytes)) + .expect("reader for round-trip"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); + assert_eq!(roundtrip, original, "Avro enum round-trip mismatch"); + Ok(()) + } } From 2752bbcc303959615acb3130f34131ff23c0e4b5 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 17:35:36 +0200 Subject: [PATCH 10/14] Update UUID logical type handling to match "arrow.uuid" naming convention --- arrow-avro/src/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index e73b1050c797..b18b2f6129b1 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -1032,7 +1032,7 @@ fn datatype_to_avro( || (*len == 16 && metadata .get("ARROW:extension:name") - .is_some_and(|value| value == "uuid")); + .is_some_and(|value| value == "arrow.uuid")); if is_uuid { json!({ "type": "string", "logicalType": "uuid" }) } else { From 17e83825f8057f1856735714a0cd57f947fb1e1a Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 17:44:29 +0200 Subject: [PATCH 11/14] Revert name change --- arrow-avro/src/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index b18b2f6129b1..e73b1050c797 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -1032,7 +1032,7 @@ fn datatype_to_avro( || (*len == 16 && metadata .get("ARROW:extension:name") - .is_some_and(|value| value == "arrow.uuid")); + .is_some_and(|value| value == "uuid")); if is_uuid { json!({ "type": "string", "logicalType": "uuid" }) } else { From f9435dc8aa493a021e1faad1e84f147a59c6ec3d Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 18:31:09 +0200 Subject: [PATCH 12/14] Remove Avro writer unit tests to be included later --- arrow-avro/src/writer/mod.rs | 217 ----------------------------------- 1 file changed, 217 deletions(-) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index f8aaaea54746..a5b2691bb816 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -415,221 +415,4 @@ mod tests { ); Ok(()) } - - #[test] - fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> { - let path = arrow_test_data("avro/simple_fixed.avro"); - let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro"); - let mut reader = ReaderBuilder::new() - .build(BufReader::new(rdr_file)) - .expect("build avro reader"); - let schema = reader.schema(); - let input_batches = reader.collect::, _>>()?; - let original = - arrow::compute::concat_batches(&schema, &input_batches).expect("concat input"); - let tmp = NamedTempFile::new().expect("create temp file"); - let out_file = File::create(tmp.path()).expect("create temp avro"); - let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; - writer.write(&original)?; - writer.finish()?; - drop(writer); - let rt_file = File::open(tmp.path()).expect("open round_trip avro"); - let mut rt_reader = ReaderBuilder::new() - .build(BufReader::new(rt_file)) - .expect("build round_trip reader"); - let rt_schema = rt_reader.schema(); - let rt_batches = rt_reader.collect::, _>>()?; - let round_trip = - arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); - assert_eq!(round_trip, original); - Ok(()) - } - - #[test] - fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> { - let in_file = - File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro"); - let mut reader = ReaderBuilder::new() - .build(BufReader::new(in_file)) - .expect("build reader for duration_uuid.avro"); - let in_schema = reader.schema(); - let has_mdn = in_schema.fields().iter().any(|f| { - matches!( - f.data_type(), - DataType::Interval(IntervalUnit::MonthDayNano) - ) - }); - assert!( - has_mdn, - "expected at least one Interval(MonthDayNano) field in duration_uuid.avro" - ); - let has_uuid_fixed = in_schema - .fields() - .iter() - .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))); - assert!( - has_uuid_fixed, - "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro" - ); - let input_batches = reader.collect::, _>>()?; - let input = - arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); - let tmp = NamedTempFile::new().expect("create temp file"); - { - let out_file = File::create(tmp.path()).expect("create temp avro"); - let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?; - writer.write(&input)?; - writer.finish()?; - } - let rt_file = File::open(tmp.path()).expect("open round_trip avro"); - let mut rt_reader = ReaderBuilder::new() - .build(BufReader::new(rt_file)) - .expect("build round_trip reader"); - let rt_schema = rt_reader.schema(); - let rt_batches = rt_reader.collect::, _>>()?; - let round_trip = - arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); - assert_eq!(round_trip, input); - Ok(()) - } - - // This test reads the same 'nonnullable.impala.avro' used by the reader tests, - // writes it back out with the writer (hitting Map encoding paths), then reads it - // again and asserts exact Arrow equivalence. - #[test] - fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> { - // Load source Avro with Map fields - let path = arrow_test_data("avro/nonnullable.impala.avro"); - let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro"); - let mut reader = ReaderBuilder::new() - .build(BufReader::new(rdr_file)) - .expect("build reader for nonnullable.impala.avro"); - // Collect all input batches and concatenate to a single RecordBatch - let in_schema = reader.schema(); - // Sanity: ensure the file actually contains at least one Map field - let has_map = in_schema - .fields() - .iter() - .any(|f| matches!(f.data_type(), DataType::Map(_, _))); - assert!( - has_map, - "expected at least one Map field in avro/nonnullable.impala.avro" - ); - - let input_batches = reader.collect::, _>>()?; - let original = - arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); - // Write out using the OCF writer into an in-memory Vec - let buffer = Vec::::new(); - let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; - writer.write(&original)?; - writer.finish()?; - let out_bytes = writer.into_inner(); - // Read the produced bytes back with the Reader - let mut rt_reader = ReaderBuilder::new() - .build(Cursor::new(out_bytes)) - .expect("build reader for round-tripped in-memory OCF"); - let rt_schema = rt_reader.schema(); - let rt_batches = rt_reader.collect::, _>>()?; - let roundtrip = - arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); - // Exact value fidelity (schema + data) - assert_eq!( - roundtrip, original, - "Round-trip Avro map data mismatch for nonnullable.impala.avro" - ); - Ok(()) - } - - #[test] - fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> { - // (file, resolve via ARROW_TEST_DATA?) - let files: [(&str, bool); 8] = [ - ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2) - ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2) - ("avro/int32_decimal.avro", true), // bytes-backed -> Decimal32(4,2) - ("avro/int64_decimal.avro", true), // bytes-backed -> Decimal64(10,2) - ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2) - ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10) - ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2) - ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2) - ]; - for (rel, in_test_data_dir) in files { - // Resolve path the same way as reader::test_decimal - let path: String = if in_test_data_dir { - arrow_test_data(rel) - } else { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join(rel) - .to_string_lossy() - .into_owned() - }; - // Read original file into a single RecordBatch for comparison - let f_in = File::open(&path).expect("open input avro"); - let mut rdr = ReaderBuilder::new().build(BufReader::new(f_in))?; - let in_schema = rdr.schema(); - let in_batches = rdr.collect::, _>>()?; - let original = - arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input"); - // Write it out with the OCF writer (no special compression) - let tmp = NamedTempFile::new().expect("create temp file"); - let out_path = tmp.into_temp_path(); - let out_file = File::create(&out_path).expect("create temp avro"); - let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; - writer.write(&original)?; - writer.finish()?; - // Read back the file we just wrote and compare equality (schema + data) - let f_rt = File::open(&out_path).expect("open roundtrip avro"); - let mut rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?; - let rt_schema = rt_rdr.schema(); - let rt_batches = rt_rdr.collect::, _>>()?; - let roundtrip = - arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt"); - assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}"); - } - Ok(()) - } - - #[test] - fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> { - // Read the known-good enum file (same as reader::test_simple) - let path = arrow_test_data("avro/simple_enum.avro"); - let rdr_file = File::open(&path).expect("open avro/simple_enum.avro"); - let mut reader = ReaderBuilder::new() - .build(BufReader::new(rdr_file)) - .expect("build reader for simple_enum.avro"); - // Concatenate all batches to one RecordBatch for a clean equality check - let in_schema = reader.schema(); - let input_batches = reader.collect::, _>>()?; - let original = - arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); - // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum) - let has_enum_dict = in_schema.fields().iter().any(|f| { - matches!( - f.data_type(), - DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8 - ) - }); - assert!( - has_enum_dict, - "Expected at least one enum-mapped Dictionary field" - ); - // Write with OCF writer into memory using the reader-provided Arrow schema. - // The writer will embed the Avro JSON from `avro.schema` metadata if present. - let buffer: Vec = Vec::new(); - let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; - writer.write(&original)?; - writer.finish()?; - let bytes = writer.into_inner(); - // Read back and compare for exact equality (schema + data) - let mut rt_reader = ReaderBuilder::new() - .build(Cursor::new(bytes)) - .expect("reader for round-trip"); - let rt_schema = rt_reader.schema(); - let rt_batches = rt_reader.collect::, _>>()?; - let roundtrip = - arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); - assert_eq!(roundtrip, original, "Avro enum round-trip mismatch"); - Ok(()) - } } From 14cf1f58702ca16e56a54dd558450b913c91b540 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Tue, 9 Sep 2025 19:27:48 +0200 Subject: [PATCH 13/14] Avro writer unit tests for round-trip validation covering Fixed, Enum, Map, Decimal, Interval, and other types --- arrow-avro/src/writer/mod.rs | 217 +++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index a5b2691bb816..f8aaaea54746 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -415,4 +415,221 @@ mod tests { ); Ok(()) } + + #[test] + fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> { + let path = arrow_test_data("avro/simple_fixed.avro"); + let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build avro reader"); + let schema = reader.schema(); + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&schema, &input_batches).expect("concat input"); + let tmp = NamedTempFile::new().expect("create temp file"); + let out_file = File::create(tmp.path()).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + drop(writer); + let rt_file = File::open(tmp.path()).expect("open round_trip avro"); + let mut rt_reader = ReaderBuilder::new() + .build(BufReader::new(rt_file)) + .expect("build round_trip reader"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let round_trip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); + assert_eq!(round_trip, original); + Ok(()) + } + + #[test] + fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> { + let in_file = + File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(in_file)) + .expect("build reader for duration_uuid.avro"); + let in_schema = reader.schema(); + let has_mdn = in_schema.fields().iter().any(|f| { + matches!( + f.data_type(), + DataType::Interval(IntervalUnit::MonthDayNano) + ) + }); + assert!( + has_mdn, + "expected at least one Interval(MonthDayNano) field in duration_uuid.avro" + ); + let has_uuid_fixed = in_schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))); + assert!( + has_uuid_fixed, + "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro" + ); + let input_batches = reader.collect::, _>>()?; + let input = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + let tmp = NamedTempFile::new().expect("create temp file"); + { + let out_file = File::create(tmp.path()).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?; + writer.write(&input)?; + writer.finish()?; + } + let rt_file = File::open(tmp.path()).expect("open round_trip avro"); + let mut rt_reader = ReaderBuilder::new() + .build(BufReader::new(rt_file)) + .expect("build round_trip reader"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let round_trip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip"); + assert_eq!(round_trip, input); + Ok(()) + } + + // This test reads the same 'nonnullable.impala.avro' used by the reader tests, + // writes it back out with the writer (hitting Map encoding paths), then reads it + // again and asserts exact Arrow equivalence. + #[test] + fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> { + // Load source Avro with Map fields + let path = arrow_test_data("avro/nonnullable.impala.avro"); + let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build reader for nonnullable.impala.avro"); + // Collect all input batches and concatenate to a single RecordBatch + let in_schema = reader.schema(); + // Sanity: ensure the file actually contains at least one Map field + let has_map = in_schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::Map(_, _))); + assert!( + has_map, + "expected at least one Map field in avro/nonnullable.impala.avro" + ); + + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + // Write out using the OCF writer into an in-memory Vec + let buffer = Vec::::new(); + let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + let out_bytes = writer.into_inner(); + // Read the produced bytes back with the Reader + let mut rt_reader = ReaderBuilder::new() + .build(Cursor::new(out_bytes)) + .expect("build reader for round-tripped in-memory OCF"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); + // Exact value fidelity (schema + data) + assert_eq!( + roundtrip, original, + "Round-trip Avro map data mismatch for nonnullable.impala.avro" + ); + Ok(()) + } + + #[test] + fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> { + // (file, resolve via ARROW_TEST_DATA?) + let files: [(&str, bool); 8] = [ + ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2) + ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2) + ("avro/int32_decimal.avro", true), // bytes-backed -> Decimal32(4,2) + ("avro/int64_decimal.avro", true), // bytes-backed -> Decimal64(10,2) + ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2) + ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10) + ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2) + ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2) + ]; + for (rel, in_test_data_dir) in files { + // Resolve path the same way as reader::test_decimal + let path: String = if in_test_data_dir { + arrow_test_data(rel) + } else { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join(rel) + .to_string_lossy() + .into_owned() + }; + // Read original file into a single RecordBatch for comparison + let f_in = File::open(&path).expect("open input avro"); + let mut rdr = ReaderBuilder::new().build(BufReader::new(f_in))?; + let in_schema = rdr.schema(); + let in_batches = rdr.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input"); + // Write it out with the OCF writer (no special compression) + let tmp = NamedTempFile::new().expect("create temp file"); + let out_path = tmp.into_temp_path(); + let out_file = File::create(&out_path).expect("create temp avro"); + let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + // Read back the file we just wrote and compare equality (schema + data) + let f_rt = File::open(&out_path).expect("open roundtrip avro"); + let mut rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?; + let rt_schema = rt_rdr.schema(); + let rt_batches = rt_rdr.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt"); + assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}"); + } + Ok(()) + } + + #[test] + fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> { + // Read the known-good enum file (same as reader::test_simple) + let path = arrow_test_data("avro/simple_enum.avro"); + let rdr_file = File::open(&path).expect("open avro/simple_enum.avro"); + let mut reader = ReaderBuilder::new() + .build(BufReader::new(rdr_file)) + .expect("build reader for simple_enum.avro"); + // Concatenate all batches to one RecordBatch for a clean equality check + let in_schema = reader.schema(); + let input_batches = reader.collect::, _>>()?; + let original = + arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input"); + // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum) + let has_enum_dict = in_schema.fields().iter().any(|f| { + matches!( + f.data_type(), + DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8 + ) + }); + assert!( + has_enum_dict, + "Expected at least one enum-mapped Dictionary field" + ); + // Write with OCF writer into memory using the reader-provided Arrow schema. + // The writer will embed the Avro JSON from `avro.schema` metadata if present. + let buffer: Vec = Vec::new(); + let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?; + writer.write(&original)?; + writer.finish()?; + let bytes = writer.into_inner(); + // Read back and compare for exact equality (schema + data) + let mut rt_reader = ReaderBuilder::new() + .build(Cursor::new(bytes)) + .expect("reader for round-trip"); + let rt_schema = rt_reader.schema(); + let rt_batches = rt_reader.collect::, _>>()?; + let roundtrip = + arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"); + assert_eq!(roundtrip, original, "Avro enum round-trip mismatch"); + Ok(()) + } } From f8e63488781fa3d41fed7840c2cb0069ecf4a536 Mon Sep 17 00:00:00 2001 From: nathaniel-d-ef Date: Wed, 10 Sep 2025 14:29:20 +0200 Subject: [PATCH 14/14] Feature flags RT test with canonical_extension_types --- arrow-avro/src/writer/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index f8aaaea54746..f5e84eeb50bb 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -445,6 +445,7 @@ mod tests { Ok(()) } + #[cfg(not(feature = "canonical_extension_types"))] #[test] fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> { let in_file =