Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ impl AvroDataType {
Codec::Int64
| Codec::TimeMicros
| Codec::TimestampMillis(_)
| Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
| Codec::TimestampMicros(_)
| Codec::TimestampNanos(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
#[cfg(feature = "avro_custom_types")]
Codec::DurationNanos
| Codec::DurationMicros
Expand Down Expand Up @@ -652,6 +653,11 @@ pub(crate) enum Codec {
/// Maps to Arrow's Timestamp(TimeUnit::Microsecond) data type
/// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
TimestampMicros(bool),
/// Represents Avro timestamp-nanos or local-timestamp-nanos logical type
///
/// Maps to Arrow's Timestamp(TimeUnit::Nanosecond) data type
/// The boolean parameter indicates whether the timestamp has a UTC timezone (true) or is local time (false)
TimestampNanos(bool),
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
/// The i32 parameter indicates the fixed binary size
Fixed(i32),
Expand Down Expand Up @@ -715,6 +721,9 @@ impl Codec {
Self::TimestampMicros(is_utc) => {
DataType::Timestamp(TimeUnit::Microsecond, is_utc.then(|| "+00:00".into()))
}
Self::TimestampNanos(is_utc) => {
DataType::Timestamp(TimeUnit::Nanosecond, is_utc.then(|| "+00:00".into()))
}
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
Self::Decimal(precision, scale, _size) => {
Expand Down Expand Up @@ -917,6 +926,8 @@ enum UnionFieldKind {
TimestampMillisLocal,
TimestampMicrosUtc,
TimestampMicrosLocal,
TimestampNanosUtc,
TimestampNanosLocal,
Duration,
Fixed,
Decimal,
Expand Down Expand Up @@ -946,6 +957,8 @@ impl From<&Codec> for UnionFieldKind {
Codec::TimestampMillis(false) => Self::TimestampMillisLocal,
Codec::TimestampMicros(true) => Self::TimestampMicrosUtc,
Codec::TimestampMicros(false) => Self::TimestampMicrosLocal,
Codec::TimestampNanos(true) => Self::TimestampNanosUtc,
Codec::TimestampNanos(false) => Self::TimestampNanosLocal,
Codec::Interval => Self::Duration,
Codec::Fixed(_) => Self::Fixed,
Codec::Decimal(..) => Self::Decimal,
Expand Down Expand Up @@ -1399,7 +1412,17 @@ impl<'a> Maker<'a> {
(Some("local-timestamp-micros"), c @ Codec::Int64) => {
*c = Codec::TimestampMicros(false)
}
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
(Some("timestamp-nanos"), c @ Codec::Int64) => *c = Codec::TimestampNanos(true),
(Some("local-timestamp-nanos"), c @ Codec::Int64) => {
*c = Codec::TimestampNanos(false)
}
(Some("uuid"), c @ Codec::Utf8) => {
// Map Avro string+logicalType=uuid into the UUID Codec,
// and preserve the logicalType in Arrow field metadata
// so writers can round-trip it correctly.
*c = Codec::Uuid;
field.metadata.insert("logicalType".into(), "uuid".into());
}
#[cfg(feature = "avro_custom_types")]
(Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
#[cfg(feature = "avro_custom_types")]
Expand Down Expand Up @@ -1437,6 +1460,18 @@ impl<'a> Maker<'a> {
}
(None, _) => {}
}
if matches!(field.codec, Codec::Int64) {
if let Some(unit) = t
.attributes
.additional
.get("arrowTimeUnit")
.and_then(|v| v.as_str())
{
if unit == "nanosecond" {
field.codec = Codec::TimestampNanos(false);
}
}
}
if !t.attributes.additional.is_empty() {
for (k, v) in &t.attributes.additional {
field.metadata.insert(k.to_string(), v.to_string());
Expand Down
16 changes: 13 additions & 3 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7437,7 +7437,6 @@ mod test {
"entire RecordBatch mismatch (schema, all columns, all rows)"
);
}

#[test]
fn comprehensive_e2e_resolution_test() {
use serde_json::Value;
Expand Down Expand Up @@ -7593,14 +7592,20 @@ mod test {
let batch = read_alltypes_with_reader_schema(path, reader_schema.clone());

const UUID_EXT_KEY: &str = "ARROW:extension:name";
const UUID_LOGICAL_KEY: &str = "logicalType";

let uuid_md_top: Option<HashMap<String, String>> = batch
.schema()
.field_with_name("uuid_str")
.ok()
.and_then(|f| {
let md = f.metadata();
if md.get(UUID_EXT_KEY).is_some() {
let has_ext = md.get(UUID_EXT_KEY).is_some();
let is_uuid_logical = md
.get(UUID_LOGICAL_KEY)
.map(|v| v.trim_matches('"') == "uuid")
.unwrap_or(false);
if has_ext || is_uuid_logical {
Some(md.clone())
} else {
None
Expand All @@ -7617,7 +7622,12 @@ mod test {
.find(|(_, child)| child.name() == "uuid")
.and_then(|(_, child)| {
let md = child.metadata();
if md.get(UUID_EXT_KEY).is_some() {
let has_ext = md.get(UUID_EXT_KEY).is_some();
let is_uuid_logical = md
.get(UUID_LOGICAL_KEY)
.map(|v| v.trim_matches('"') == "uuid")
.unwrap_or(false);
if has_ext || is_uuid_logical {
Some(md.clone())
} else {
None
Expand Down
114 changes: 110 additions & 4 deletions arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ enum Decoder {
TimeMicros(Vec<i64>),
TimestampMillis(bool, Vec<i64>),
TimestampMicros(bool, Vec<i64>),
TimestampNanos(bool, Vec<i64>),
Int32ToInt64(Vec<i64>),
Int32ToFloat32(Vec<f32>),
Int32ToFloat64(Vec<f64>),
Expand Down Expand Up @@ -324,6 +325,9 @@ impl Decoder {
(Codec::TimestampMicros(is_utc), _) => {
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
}
(Codec::TimestampNanos(is_utc), _) => {
Self::TimestampNanos(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
}
#[cfg(feature = "avro_custom_types")]
(Codec::DurationNanos, _) => {
Self::DurationNanosecond(Vec::with_capacity(DEFAULT_CAPACITY))
Expand Down Expand Up @@ -530,7 +534,8 @@ impl Decoder {
| Self::Int32ToInt64(v)
| Self::TimeMicros(v)
| Self::TimestampMillis(_, v)
| Self::TimestampMicros(_, v) => v.push(0),
| Self::TimestampMicros(_, v)
| Self::TimestampNanos(_, v) => v.push(0),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(v)
| Self::DurationMillisecond(v)
Expand Down Expand Up @@ -643,7 +648,8 @@ impl Decoder {
| Self::Int32ToInt64(v)
| Self::TimeMicros(v)
| Self::TimestampMillis(_, v)
| Self::TimestampMicros(_, v) => match lit {
| Self::TimestampMicros(_, v)
| Self::TimestampNanos(_, v) => match lit {
AvroLiteral::Long(i) => {
v.push(*i);
Ok(())
Expand Down Expand Up @@ -854,7 +860,8 @@ impl Decoder {
Self::Int64(values)
| Self::TimeMicros(values)
| Self::TimestampMillis(_, values)
| Self::TimestampMicros(_, values) => values.push(buf.get_long()?),
| Self::TimestampMicros(_, values)
| Self::TimestampNanos(_, values) => values.push(buf.get_long()?),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(values)
| Self::DurationMillisecond(values)
Expand Down Expand Up @@ -1070,6 +1077,10 @@ impl Decoder {
flush_primitive::<TimestampMicrosecondType>(values, nulls)
.with_timezone_opt(is_utc.then(|| "+00:00")),
),
Self::TimestampNanos(is_utc, values) => Arc::new(
flush_primitive::<TimestampNanosecondType>(values, nulls)
.with_timezone_opt(is_utc.then(|| "+00:00")),
),
#[cfg(feature = "avro_custom_types")]
Self::DurationSecond(values) => {
Arc::new(flush_primitive::<DurationSecondType>(values, nulls))
Expand Down Expand Up @@ -1959,6 +1970,7 @@ enum Skipper {
TimeMicros,
TimestampMillis,
TimestampMicros,
TimestampNanos,
Fixed(usize),
Decimal(Option<usize>),
UuidString,
Expand All @@ -1983,6 +1995,7 @@ impl Skipper {
Codec::TimeMicros => Self::TimeMicros,
Codec::TimestampMillis(_) => Self::TimestampMillis,
Codec::TimestampMicros(_) => Self::TimestampMicros,
Codec::TimestampNanos(_) => Self::TimestampNanos,
#[cfg(feature = "avro_custom_types")]
Codec::DurationNanos
| Codec::DurationMicros
Expand Down Expand Up @@ -2044,7 +2057,11 @@ impl Skipper {
buf.get_int()?;
Ok(())
}
Self::Int64 | Self::TimeMicros | Self::TimestampMillis | Self::TimestampMicros => {
Self::Int64
| Self::TimeMicros
| Self::TimestampMillis
| Self::TimestampMicros
| Self::TimestampNanos => {
buf.get_long()?;
Ok(())
}
Expand Down Expand Up @@ -4647,4 +4664,93 @@ mod tests {
.expect("Int32Array");
assert_eq!(a.values(), &[1, 2, 3]);
}

#[test]
fn test_timestamp_nanos_decoding_utc() {
let avro_type = avro_from_codec(Codec::TimestampNanos(true));
let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
let mut data = Vec::new();
for v in [0_i64, 1_i64, -1_i64, 1_234_567_890_i64] {
data.extend_from_slice(&encode_avro_long(v));
}
let mut cur = AvroCursor::new(&data);
for _ in 0..4 {
decoder.decode(&mut cur).expect("decode nanos ts");
}
let array = decoder.flush(None).expect("flush nanos ts");
let ts = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("TimestampNanosecondArray");
assert_eq!(ts.values(), &[0, 1, -1, 1_234_567_890]);
match ts.data_type() {
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
assert_eq!(tz.as_deref(), Some("+00:00"));
}
other => panic!("expected Timestamp(Nanosecond, Some(\"+00:00\")), got {other:?}"),
}
}

#[test]
fn test_timestamp_nanos_decoding_local() {
let avro_type = avro_from_codec(Codec::TimestampNanos(false));
let mut decoder = Decoder::try_new(&avro_type).expect("create TimestampNanos decoder");
let mut data = Vec::new();
for v in [10_i64, 20_i64, -30_i64] {
data.extend_from_slice(&encode_avro_long(v));
}
let mut cur = AvroCursor::new(&data);
for _ in 0..3 {
decoder.decode(&mut cur).expect("decode nanos ts");
}
let array = decoder.flush(None).expect("flush nanos ts");
let ts = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("TimestampNanosecondArray");
assert_eq!(ts.values(), &[10, 20, -30]);
match ts.data_type() {
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
assert_eq!(tz.as_deref(), None);
}
other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
}
}

#[test]
fn test_timestamp_nanos_decoding_with_nulls() {
let avro_type = AvroDataType::new(
Codec::TimestampNanos(false),
Default::default(),
Some(Nullability::NullFirst),
);
let mut decoder = Decoder::try_new(&avro_type).expect("create nullable TimestampNanos");
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_long(1));
data.extend_from_slice(&encode_avro_long(42));
data.extend_from_slice(&encode_avro_long(0));
data.extend_from_slice(&encode_avro_long(1));
data.extend_from_slice(&encode_avro_long(-7));
let mut cur = AvroCursor::new(&data);
for _ in 0..3 {
decoder.decode(&mut cur).expect("decode nullable nanos ts");
}
let array = decoder.flush(None).expect("flush nullable nanos ts");
let ts = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("TimestampNanosecondArray");
assert_eq!(ts.len(), 3);
assert!(ts.is_valid(0));
assert!(ts.is_null(1));
assert!(ts.is_valid(2));
assert_eq!(ts.value(0), 42);
assert_eq!(ts.value(2), -7);
match ts.data_type() {
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => {
assert_eq!(tz.as_deref(), None);
}
other => panic!("expected Timestamp(Nanosecond, None), got {other:?}"),
}
}
}
Loading
Loading