Skip to content

Commit f7fd11b

Browse files
committed
Enable arrow-avro to handle writer-only fields during schema resolution.
- Added skipping logic for writer-only fields in `RecordDecoder`. - Introduced `ResolvedRuntime` for runtime decoding adjustments. - Updated tests to validate skipping functionality. - Refactored block-wise processing for optimized performance.
1 parent a620957 commit f7fd11b

File tree

3 files changed

+597
-31
lines changed

3 files changed

+597
-31
lines changed

arrow-avro/src/codec.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,7 @@ impl<'a> Maker<'a> {
955955
// Prepare outputs
956956
let mut reader_fields: Vec<AvroField> = Vec::with_capacity(reader_record.fields.len());
957957
let mut writer_to_reader: Vec<Option<usize>> = vec![None; writer_record.fields.len()];
958-
//let mut skip_fields: Vec<Option<AvroDataType>> = vec![None; writer_record.fields.len()];
958+
let mut skip_fields: Vec<Option<AvroDataType>> = vec![None; writer_record.fields.len()];
959959
//let mut default_fields: Vec<usize> = Vec::new();
960960
// Build reader fields and mapping
961961
for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
@@ -975,6 +975,14 @@ impl<'a> Maker<'a> {
975975
));
976976
}
977977
}
978+
// Any writer fields not mapped should be skipped
979+
for (writer_idx, writer_field) in writer_record.fields.iter().enumerate() {
980+
if writer_to_reader[writer_idx].is_none() {
981+
// Parse writer field type to know how to skip data
982+
let w_dt = self.parse_type(&writer_field.r#type, writer_ns)?;
983+
skip_fields[writer_idx] = Some(w_dt);
984+
}
985+
}
978986
// Implement writer-only fields to skip in Follow-up PR here
979987
// Build resolved record AvroDataType
980988
let resolved = AvroDataType::new_with_resolution(
@@ -984,7 +992,7 @@ impl<'a> Maker<'a> {
984992
Some(ResolutionInfo::Record(ResolvedRecord {
985993
writer_to_reader: Arc::from(writer_to_reader),
986994
default_fields: Arc::default(),
987-
skip_fields: Arc::default(),
995+
skip_fields: Arc::from(skip_fields),
988996
})),
989997
);
990998
// Register a resolved record by reader name+namespace for potential named type refs

arrow-avro/src/reader/mod.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -863,12 +863,39 @@ mod test {
863863
.with_reader_schema(reader_schema)
864864
.build(BufReader::new(file))
865865
.unwrap();
866-
867866
let schema = reader.schema();
868867
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
869868
arrow::compute::concat_batches(&schema, &batches).unwrap()
870869
}
871870

871+
fn make_reader_schema_with_selected_fields_in_order(
872+
path: &str,
873+
selected: &[&str],
874+
) -> AvroSchema {
875+
let mut root = load_writer_schema_json(path);
876+
assert_eq!(root["type"], "record", "writer schema must be a record");
877+
let writer_fields = root
878+
.get("fields")
879+
.and_then(|f| f.as_array())
880+
.expect("record has fields");
881+
let mut field_map: HashMap<String, Value> = HashMap::with_capacity(writer_fields.len());
882+
for f in writer_fields {
883+
if let Some(name) = f.get("name").and_then(|n| n.as_str()) {
884+
field_map.insert(name.to_string(), f.clone());
885+
}
886+
}
887+
let mut new_fields = Vec::with_capacity(selected.len());
888+
for name in selected {
889+
let f = field_map
890+
.get(*name)
891+
.unwrap_or_else(|| panic!("field '{name}' not found in writer schema"))
892+
.clone();
893+
new_fields.push(f);
894+
}
895+
root["fields"] = Value::Array(new_fields);
896+
AvroSchema::new(root.to_string())
897+
}
898+
872899
#[test]
873900
fn test_alltypes_schema_promotion_mixed() {
874901
let files = [
@@ -1537,6 +1564,57 @@ mod test {
15371564
assert!(batch.column(0).as_any().is::<StringViewArray>());
15381565
}
15391566

1567+
#[test]
1568+
fn test_alltypes_skip_writer_fields_keep_double_only() {
1569+
let file = arrow_test_data("avro/alltypes_plain.avro");
1570+
let reader_schema =
1571+
make_reader_schema_with_selected_fields_in_order(&file, &["double_col"]);
1572+
let batch = read_alltypes_with_reader_schema(&file, reader_schema);
1573+
let expected = RecordBatch::try_from_iter_with_nullable([(
1574+
"double_col",
1575+
Arc::new(Float64Array::from_iter_values(
1576+
(0..8).map(|x| (x % 2) as f64 * 10.1),
1577+
)) as _,
1578+
true,
1579+
)])
1580+
.unwrap();
1581+
assert_eq!(batch, expected);
1582+
}
1583+
1584+
#[test]
1585+
fn test_alltypes_skip_writer_fields_reorder_and_skip_many() {
1586+
let file = arrow_test_data("avro/alltypes_plain.avro");
1587+
let reader_schema =
1588+
make_reader_schema_with_selected_fields_in_order(&file, &["timestamp_col", "id"]);
1589+
let batch = read_alltypes_with_reader_schema(&file, reader_schema);
1590+
let expected = RecordBatch::try_from_iter_with_nullable([
1591+
(
1592+
"timestamp_col",
1593+
Arc::new(
1594+
TimestampMicrosecondArray::from_iter_values([
1595+
1235865600000000, // 2009-03-01T00:00:00.000
1596+
1235865660000000, // 2009-03-01T00:01:00.000
1597+
1238544000000000, // 2009-04-01T00:00:00.000
1598+
1238544060000000, // 2009-04-01T00:01:00.000
1599+
1233446400000000, // 2009-02-01T00:00:00.000
1600+
1233446460000000, // 2009-02-01T00:01:00.000
1601+
1230768000000000, // 2009-01-01T00:00:00.000
1602+
1230768060000000, // 2009-01-01T00:01:00.000
1603+
])
1604+
.with_timezone("+00:00"),
1605+
) as _,
1606+
true,
1607+
),
1608+
(
1609+
"id",
1610+
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
1611+
true,
1612+
),
1613+
])
1614+
.unwrap();
1615+
assert_eq!(batch, expected);
1616+
}
1617+
15401618
#[test]
15411619
fn test_read_zero_byte_avro_file() {
15421620
let batch = read_file("test/data/zero_byte.avro", 3, false);

0 commit comments

Comments
 (0)