Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl Decoder {
Self::Record(arrow_fields.into(), encodings)
}
(Codec::Map(child), _) => {
let val_field = child.field_with_name("value").with_nullable(true);
let val_field = child.field_with_name("value");
let map_field = Arc::new(ArrowField::new(
"entries",
DataType::Struct(Fields::from(vec![
Expand Down Expand Up @@ -590,10 +590,23 @@ impl Decoder {
)));
}
}
// Extract the value field nullability from the schema
let is_value_nullable = match map_field.data_type() {
DataType::Struct(fields) => fields
.iter()
.find(|f| f.name() == "value")
.map(|f| f.is_nullable())
.unwrap_or(false),
_ => true, // default to nullable
};
let entries_struct = StructArray::new(
Fields::from(vec![
Arc::new(ArrowField::new("key", DataType::Utf8, false)),
Arc::new(ArrowField::new("value", val_arr.data_type().clone(), true)),
Arc::new(ArrowField::new(
"value",
val_arr.data_type().clone(),
is_value_nullable,
Comment on lines +593 to +608
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a slightly different approach in #8220 that avoids the field scan and new Field allocation while preserving the existing schema metadata.

It's slightly more performant, especially on smaller batches:

Map/100                 time:   [6.2237 µs 6.2594 µs 6.2899 µs]
                        thrpt:  [507.32 MiB/s 509.79 MiB/s 512.71 MiB/s]
                 change:
                        time:   [−0.3038% +0.5799% +1.4930%] (p = 0.19 > 0.05)
                        thrpt:  [−1.4710% −0.5766% +0.3047%]
                        No change in performance detected.
Map/10000               time:   [250.40 µs 253.75 µs 258.62 µs]
                        thrpt:  [1.2573 GiB/s 1.2814 GiB/s 1.2986 GiB/s]
                 change:
                        time:   [−2.5344% −1.1670% +0.1631%] (p = 0.08 > 0.05)
                        thrpt:  [−0.1628% +1.1808% +2.6003%]
                        No change in performance detected.
Found 6 outliers among 25 measurements (24.00%)
  6 (24.00%) low mild
Map/1000000             time:   [252.99 µs 255.93 µs 260.24 µs]
                        thrpt:  [130.21 GiB/s 132.40 GiB/s 133.94 GiB/s]
                 change:
                        time:   [−1.9418% −0.4373% +1.0680%] (p = 0.60 > 0.05)
                        thrpt:  [−1.0568% +0.4393% +1.9803%]
                        No change in performance detected.

vs

Map/100                 time:   [6.4487 µs 6.4584 µs 6.4687 µs]
                        thrpt:  [493.30 MiB/s 494.09 MiB/s 494.83 MiB/s]
                 change:
                        time:   [+4.2911% +5.0113% +5.7191%] (p = 0.00 < 0.05)
                        thrpt:  [−5.4097% −4.7721% −4.1145%]
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
Map/10000               time:   [258.90 µs 260.78 µs 263.17 µs]
                        thrpt:  [1.2356 GiB/s 1.2469 GiB/s 1.2559 GiB/s]
                 change:
                        time:   [−0.6514% +0.8318% +2.5189%] (p = 0.34 > 0.05)
                        thrpt:  [−2.4570% −0.8249% +0.6557%]
                        No change in performance detected.
Found 1 outliers among 25 measurements (4.00%)
  1 (4.00%) high severe
Map/1000000             time:   [265.48 µs 268.25 µs 270.56 µs]
                        thrpt:  [125.24 GiB/s 126.33 GiB/s 127.64 GiB/s]
                 change:
                        time:   [+3.0081% +4.1202% +5.2124%] (p = 0.00 < 0.05)
                        thrpt:  [−4.9542% −3.9572% −2.9203%]
                        Performance has regressed.
Found 1 outliers among 10 measurements (10.00%)

If you wanted to check it out:

let entries_fields = match map_field.data_type() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jecsand838 Thanks for the context. Should I close this PR if #8220 covers this issue?

Copy link
Contributor

@jecsand838 jecsand838 Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think there's value in the tests you wrote if you wanted to merge those in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I'll close this PR and raise a new PR later with only tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments!

)),
]),
vec![Arc::new(key_arr), val_arr],
None,
Expand Down Expand Up @@ -740,6 +753,7 @@ fn sign_extend_to<const N: usize>(raw: &[u8]) -> Result<[u8; N], ArrowError> {
mod tests {
use super::*;
use crate::codec::AvroField;
use crate::schema::Schema as AvroSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this import due to the use crate::schema::*; on line 22.

Suggested change
use crate::schema::Schema as AvroSchema;

use arrow_array::{
cast::AsArray, Array, Decimal128Array, DictionaryArray, FixedSizeBinaryArray,
IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, StructArray,
Expand Down Expand Up @@ -1471,4 +1485,111 @@ mod tests {
assert!(int_array.is_null(0)); // row1 is null
assert_eq!(int_array.value(1), 42); // row3 value is 42
}

#[test]
fn test_map_with_non_nullable_value_type() {
let schema_json = r#"{
"type": "record",
"name": "MapRecord",
"fields": [
{"name": "map_field", "type": { "type": "map", "values": "string" }}
]
}"#;

let schema: AvroSchema = serde_json::from_str(schema_json).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let schema: AvroSchema = serde_json::from_str(schema_json).unwrap();
let schema: Schema = serde_json::from_str(schema_json).unwrap();

let field = AvroField::try_from(&schema).unwrap();
let mut decoder = RecordDecoder::try_new_with_options(field.data_type(), true).unwrap();

let mut data = Vec::new();
data.extend_from_slice(&encode_avro_long(1)); // 1 entry in map
data.extend_from_slice(&encode_avro_bytes(b"key")); // key
data.extend_from_slice(&encode_avro_bytes(b"value")); // value
data.extend_from_slice(&encode_avro_long(0)); // end map

decoder.decode(&data, 1).unwrap();

let batch = decoder.flush().unwrap();
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 1);

let map_arr = batch.column(0).as_any().downcast_ref::<MapArray>().unwrap();
assert_eq!(map_arr.len(), 1);
assert_eq!(map_arr.value_length(0), 1);

let entries = map_arr.value(0);
let key_arr = entries.column(0).as_string::<i32>();
let val_arr = entries.column(1).as_string::<i32>();
assert_eq!(key_arr.value(0), "key");
assert_eq!(val_arr.value(0), "value");
}

#[test]
fn test_map_with_nullable_value_type() {
let schema_json = r#"{
"type": "record",
"name": "MapRecord",
"fields": [
{"name": "map_field1", "type": { "type": "map", "values": ["null", "string"] }},
{"name": "map_field2", "type": { "type": "map", "values": ["string", "null"] }}
]
}"#;

let schema: AvroSchema = serde_json::from_str(schema_json).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not trying to be too nitty with this, there's just a separate struct named AvroSchema defined in arrow-avro/src/schema.rs

Suggested change
let schema: AvroSchema = serde_json::from_str(schema_json).unwrap();
let schema: Schema = serde_json::from_str(schema_json).unwrap();

let field = AvroField::try_from(&schema).unwrap();
let mut decoder = RecordDecoder::try_new_with_options(field.data_type(), true).unwrap();

let mut data = Vec::new();

// map_field1: ["null", "string"]
data.extend_from_slice(&encode_avro_long(2)); // 2 entries in map
// First entry: key1 -> null value (union branch 0)
data.extend_from_slice(&encode_avro_bytes(b"key1"));
data.extend_from_slice(&encode_avro_long(0)); // union branch 0 (null)
// Second entry: key2 -> "value2" (union branch 1)
data.extend_from_slice(&encode_avro_bytes(b"key2"));
data.extend_from_slice(&encode_avro_long(1)); // union branch 1 (string)
data.extend_from_slice(&encode_avro_bytes(b"value2"));
data.extend_from_slice(&encode_avro_long(0)); // end map

// map_field2: ["string", "null"]
data.extend_from_slice(&encode_avro_long(2)); // 2 entries in map
// First entry: key3 -> null value (union branch 1)
data.extend_from_slice(&encode_avro_bytes(b"key3"));
data.extend_from_slice(&encode_avro_long(1)); // union branch 1 (null)
// Second entry: key4 -> "value4" (union branch 0)
data.extend_from_slice(&encode_avro_bytes(b"key4"));
data.extend_from_slice(&encode_avro_long(0)); // union branch 0 (string)
data.extend_from_slice(&encode_avro_bytes(b"value4"));
data.extend_from_slice(&encode_avro_long(0)); // end map

decoder.decode(&data, 1).unwrap();

let batch = decoder.flush().unwrap();
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 1);

// Check the first map field: ["null", "string"]
let map_arr1 = batch.column(0).as_any().downcast_ref::<MapArray>().unwrap();
assert_eq!(map_arr1.len(), 1);
assert_eq!(map_arr1.value_length(0), 2); // 2 entries
let entries1 = map_arr1.value(0);
let key_arr1 = entries1.column(0).as_string::<i32>();
let val_arr1 = entries1.column(1).as_string::<i32>();
assert_eq!(key_arr1.value(0), "key1");
assert!(val_arr1.is_null(0));
assert_eq!(key_arr1.value(1), "key2");
assert_eq!(val_arr1.value(1), "value2");

// Check second map field: ["string", "null"]
let map_arr2 = batch.column(1).as_any().downcast_ref::<MapArray>().unwrap();
assert_eq!(map_arr2.len(), 1);
assert_eq!(map_arr2.value_length(0), 2); // 2 entries
let entries2 = map_arr2.value(0);
let key_arr2 = entries2.column(0).as_string::<i32>();
let val_arr2 = entries2.column(1).as_string::<i32>();
assert_eq!(key_arr2.value(0), "key3");
assert!(val_arr2.is_null(0));
assert_eq!(key_arr2.value(1), "key4");
assert_eq!(val_arr2.value(1), "value4");
}
}
Loading