diff --git a/Cargo.toml b/Cargo.toml index 33f243460..4331eaa65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ datafusion = { version = "45.0.0" } datafusion-common = { version = "45.0.0" } datafusion-expr = { version = "45.0.0" } datafusion-physical-plan = { version = "45.0.0" } +datafusion-doc = { version = "45.0.0" } +datafusion-macros = { version = "45.0.0" } # do not change iceberg-rust, datafusion-functions-json hash commits unless datafusion version bumped datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "f2b9b88cd9b4282bc0286a970333e8c01cec177b" } @@ -53,6 +55,8 @@ datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72 datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-doc = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-macros = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } [workspace.lints.clippy] all = { level = "deny", priority = -1 } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 44077a485..d872f74bf 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -19,6 +19,8 @@ datafusion-common = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-json = { workspace = true } datafusion-physical-plan = { workspace = true } +datafusion-doc = { workspace = true } +datafusion-macros = { workspace = true } datafusion_iceberg = { workspace = true } futures = { workspace = true } diff --git a/crates/runtime/src/datafusion/functions/mod.rs b/crates/runtime/src/datafusion/functions/mod.rs index 4e970be81..9a25f1f1c 100644 --- a/crates/runtime/src/datafusion/functions/mod.rs +++ b/crates/runtime/src/datafusion/functions/mod.rs @@ -23,6 +23,7 @@ mod convert_timezone; mod date_add; mod date_diff; mod parse_json; +mod timestamp_from_parts; pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { let functions: Vec> = vec![ @@ -30,6 +31,7 @@ pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { date_add::get_udf(), parse_json::get_udf(), date_diff::get_udf(), + timestamp_from_parts::get_udf(), ]; for func in functions { diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs new file mode 100644 index 000000000..16785b10e --- /dev/null +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -0,0 +1,599 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::builder::PrimitiveBuilder; +use arrow::array::timezone::Tz; +use arrow::array::{Array, AsArray, PrimitiveArray, StringArray}; +use arrow::datatypes::{ + DataType, Date32Type, Int32Type, Int64Type, Time64NanosecondType, TimestampNanosecondType, +}; +use arrow_schema::DataType::{Date32, Int32, Int64, Time64}; +use arrow_schema::TimeUnit; +use chrono::prelude::*; +use chrono::{Duration, Months}; +use datafusion::logical_expr::TypeSignature::Coercible; +use datafusion::logical_expr::TypeSignatureClass; +use datafusion_common::types::{logical_int64, logical_string}; +use datafusion_common::{exec_err, internal_err, Result, ScalarValue, _exec_datafusion_err}; +use datafusion_expr::{ + ColumnarValue, ReturnInfo, ReturnTypeArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_macros::user_doc; + +const UNIX_DAYS_FROM_CE: i32 = 719_163; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = "Creates a timestamp from individual numeric components.", + syntax_example = "timestamp_from_parts(, , , , , [, ] [, ] )", + sql_example = "```sql + > select timestamp_from_parts(2025, 2, 24, 12, 0, 50); + +-----------------------------------------------------------------------------------+ + | timestamp_from_parts(Int64(2025),Int64(2),Int64(24),Int64(12),Int64(0),Int64(50)) | + +-----------------------------------------------------------------------------------+ + | 1740398450.0 | + +-----------------------------------------------------------------------------------+ + SELECT timestamp_from_parts(2025, 2, 24, 12, 0, 50, 65555555); + +---------------------------------------------------------------------------------------------------+ + | timestamp_from_parts(Int64(2025),Int64(2),Int64(24),Int64(12),Int64(0),Int64(50),Int64(65555555)) | + +---------------------------------------------------------------------------------------------------+ + | 1740398450.65555555 | + +---------------------------------------------------------------------------------------------------+ +```", + argument( + name = "year", + description = "An integer expression to use as a year for building a timestamp." + ), + argument( + name = "month", + description = "An integer expression to use as a month for building a timestamp, with January represented as 1, and December as 12." + ), + argument( + name = "day", + description = "An integer expression to use as a day for building a timestamp, usually in the 1-31 range." + ), + argument( + name = "hour", + description = "An integer expression to use as an hour for building a timestamp, usually in the 0-23 range." + ), + argument( + name = "minute", + description = "An integer expression to use as a minute for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "second", + description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "date_expr", + description = "Specifies the date expression to use for building a timestamp + where date_expr provides the year, month, and day for the timestamp." + ), + argument( + name = "time_expr", + description = "Specifies the time expression to use for building a timestamp + where time_expr provides the hour, minute, second, and nanoseconds within the day." + ), + argument( + name = "nanoseconds", + description = "Optional integer expression to use as a nanosecond for building a timestamp, + usually in the 0-999999999 range." + ), + argument( + name = "time_zone", + description = "A string expression to use as a time zone for building a timestamp (e.g. America/Los_Angeles)." + ) +)] +#[derive(Debug)] +pub struct TimestampFromPartsFunc { + signature: Signature, + aliases: Vec, +} + +impl Default for TimestampFromPartsFunc { + fn default() -> Self { + Self::new() + } +} + +impl TimestampFromPartsFunc { + pub fn new() -> Self { + let basic_signature = vec![TypeSignatureClass::Native(logical_int64()); 6]; + Self { + signature: Signature::one_of( + vec![ + // TIMESTAMP_FROM_PARTS( , , , , , [, ] [, ] ) + Coercible(basic_signature.clone()), + Coercible( + [ + basic_signature.clone(), + vec![TypeSignatureClass::Native(logical_int64())], + ] + .concat(), + ), + Coercible( + [ + basic_signature, + vec![ + TypeSignatureClass::Native(logical_int64()), + TypeSignatureClass::Native(logical_string()), + ], + ] + .concat(), + ), + // TIMESTAMP_FROM_PARTS( , ) + Coercible(vec![TypeSignatureClass::Date, TypeSignatureClass::Time]), + ], + Volatility::Immutable, + ), + aliases: vec![ + String::from("timestamp_ntz_from_parts"), + String::from("timestamp_tz_from_parts"), + String::from("timestamp_ltz_from_parts"), + String::from("timestampntzfromparts"), + String::from("timestamptzfromparts"), + String::from("timestampltzfromparts"), + ], + } + } +} +impl ScalarUDFImpl for TimestampFromPartsFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &'static str { + "timestamp_from_parts" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_type_from_args should be called") + } + + fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result { + if args.arg_types.len() == 8 { + if let Some(ScalarValue::Utf8(Some(tz))) = args.scalar_arguments[7] { + return Ok(ReturnInfo::new_nullable(DataType::Timestamp( + TimeUnit::Nanosecond, + Some(Arc::from(tz.clone())), + ))); + } + } + Ok(ReturnInfo::new_nullable(DataType::Timestamp( + TimeUnit::Nanosecond, + None, + ))) + } + + fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result { + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let array_size = args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(a) => Some(a.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + let is_scalar = array_size == 1; + + // TIMESTAMP_FROM_PARTS( , ) + let result = if args.len() == 2 { + let [date, time] = take_function_args(self.name(), args)?; + let date = to_primitive_array::(&date.cast_to(&Date32, None)?)?; + let time = to_primitive_array::( + &time.cast_to(&Time64(TimeUnit::Nanosecond), None)?, + )?; + let mut builder = PrimitiveArray::builder(array_size); + for i in 0..array_size { + let ts = make_timestamp_from_date_time(date.value(i), time.value(i))?; + builder.append_value(ts); + } + Ok(builder.finish()) + } else if args.len() > 5 && args.len() < 9 { + // TIMESTAMP_FROM_PARTS( , , , , , [, ] [, ] ) + timestamps_from_components(args, array_size) + } else { + internal_err!("Unsupported number of arguments") + }?; + + if is_scalar { + // If all inputs are scalar, keeps output as scalar + let result = ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(result)) + } else { + Ok(ColumnarValue::Array(Arc::new(result))) + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +fn timestamps_from_components( + args: &[ColumnarValue], + array_size: usize, +) -> Result> { + let (years, months, days, hours, minutes, seconds, nanoseconds, time_zone) = match args.len() { + 8 => { + let [years, months, days, hours, minutes, seconds, nanoseconds, time_zone] = + take_function_args("timestamp_from_parts", args)?; + ( + years, + months, + days, + hours, + minutes, + seconds, + Some(nanoseconds), + Some(time_zone), + ) + } + 7 => { + let [years, months, days, hours, minutes, seconds, nanoseconds] = + take_function_args("timestamp_from_parts", args)?; + ( + years, + months, + days, + hours, + minutes, + seconds, + Some(nanoseconds), + None, + ) + } + 6 => { + let [years, months, days, hours, minutes, seconds] = + take_function_args("timestamp_from_parts", args)?; + (years, months, days, hours, minutes, seconds, None, None) + } + _ => return internal_err!("Unsupported number of arguments"), + }; + let years = to_primitive_array::(&years.cast_to(&Int32, None)?)?; + let months = to_primitive_array::(&months.cast_to(&Int32, None)?)?; + let days = to_primitive_array::(&days.cast_to(&Int32, None)?)?; + let hours = to_primitive_array::(&hours.cast_to(&Int32, None)?)?; + let minutes = to_primitive_array::(&minutes.cast_to(&Int32, None)?)?; + let seconds = to_primitive_array::(&seconds.cast_to(&Int32, None)?)?; + let nanoseconds = nanoseconds + .map(|nanoseconds| to_primitive_array::(&nanoseconds.cast_to(&Int64, None)?)) + .transpose()?; + let time_zone = time_zone.map(to_string_array).transpose()?; + + let mut builder: PrimitiveBuilder = + PrimitiveArray::builder(array_size); + for i in 0..array_size { + let nanoseconds = nanoseconds.as_ref().map(|ns| ns.value(i)); + let time_zone = time_zone.as_ref().map(|tz| tz.value(i)); + let ts = make_timestamp( + years.value(i), + months.value(i), + days.value(i), + hours.value(i), + minutes.value(i), + seconds.value(i), + nanoseconds, + time_zone, + )?; + builder.append_value(ts); + } + Ok(builder.finish()) +} + +fn make_timestamp_from_date_time(date: i32, time: i64) -> Result { + make_timestamp_from_nanoseconds(i64::from(date) * 86_400_000_000_000 + time, None) +} + +fn make_date(year: i32, month: i32, days: i32) -> Result { + let u_month = match month { + 0 => 1, + _ if month < 0 => 1 - month, + _ => month - 1, + }; + let u_month = u32::try_from(u_month) + .map_err(|_| _exec_datafusion_err!("month value '{month:?}' is out of range"))?; + + NaiveDate::from_ymd_opt(year, 1, 1).map_or_else( + || exec_err!("Invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |date| { + let months = Months::new(u_month); + let days = Duration::days(i64::from(days - 1)); + let result = if month <= 0 { + date.checked_sub_months(months) + } else { + date.checked_add_months(months) + }; + result.map_or_else( + || exec_err!("invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |months_result| { + months_result.checked_add_signed(days).map_or_else( + || exec_err!("invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |days_result| Ok(days_result.num_days_from_ce() - UNIX_DAYS_FROM_CE), + ) + }, + ) + }, + ) +} + +#[allow(clippy::too_many_arguments)] +fn make_timestamp( + year: i32, + month: i32, + day: i32, + hour: i32, + minute: i32, + seconds: i32, + nano: Option, + timezone: Option<&str>, +) -> Result { + let days = make_date(year, month, day)?; + let n_days = i64::from(days) * 86_400_000_000_000; + let n_hour = i64::from(hour) * 3_600_000_000_000; + let n_minute = i64::from(minute) * 60_000_000_000; + let n_seconds = i64::from(seconds) * 1_000_000_000; + let n_nano = nano.unwrap_or(0); + let total_nanos = n_days + n_hour + n_minute + n_seconds + n_nano; + + make_timestamp_from_nanoseconds(total_nanos, timezone) +} + +fn make_timestamp_from_nanoseconds(nanoseconds: i64, tz: Option<&str>) -> Result { + let date_time = DateTime::from_timestamp_nanos(nanoseconds); + let timestamp = if let Some(timezone) = tz { + date_time + .with_timezone(&timezone.parse::()?) + .timestamp_nanos_opt() + } else { + date_time.timestamp_nanos_opt() + }; + timestamp.map_or_else( + || exec_err!("Unable to parse timestamp from '{:?}'", nanoseconds), + Ok, + ) +} + +pub fn take_function_args( + function_name: &str, + args: impl IntoIterator, +) -> Result<[T; N]> { + let args = args.into_iter().collect::>(); + args.try_into().map_err(|v: Vec| { + _exec_datafusion_err!( + "{} function requires {} {}, got {}", + function_name, + N, + if N == 1 { "argument" } else { "arguments" }, + v.len() + ) + }) +} + +fn to_primitive_array(col: &ColumnarValue) -> Result> +where + T: arrow::datatypes::ArrowPrimitiveType, +{ + match col { + ColumnarValue::Array(array) => Ok(array.as_primitive::().to_owned()), + ColumnarValue::Scalar(scalar) => { + let value = scalar.to_array()?; + Ok(value.as_primitive::().to_owned()) + } + } +} + +fn to_string_array(col: &ColumnarValue) -> Result { + match col { + ColumnarValue::Array(array) => array + .as_any() + .downcast_ref::() + .map(std::borrow::ToOwned::to_owned) + .ok_or_else(|| _exec_datafusion_err!("Failed to downcast Array to StringArray")), + ColumnarValue::Scalar(scalar) => { + let value = scalar.to_array()?; + value + .as_any() + .downcast_ref::() + .map(std::borrow::ToOwned::to_owned) + .ok_or_else(|| _exec_datafusion_err!("Failed to downcast Scalar to StringArray")) + } + } +} + +super::macros::make_udf_function!(TimestampFromPartsFunc); + +#[cfg(test)] +mod test { + use crate::datafusion::functions::timestamp_from_parts::{ + to_primitive_array, TimestampFromPartsFunc, + }; + use arrow::datatypes::TimestampNanosecondType; + use chrono::DateTime; + use datafusion::logical_expr::ColumnarValue; + use datafusion_common::ScalarValue; + use datafusion_expr::ScalarUDFImpl; + + #[allow(clippy::unwrap_used)] + fn columnar_value_fn(is_scalar: bool, v: T) -> ColumnarValue + where + ScalarValue: From, + T: Clone, + { + if is_scalar { + ColumnarValue::Scalar(ScalarValue::from(v)) + } else { + ColumnarValue::Array(ScalarValue::from(v).to_array().unwrap()) + } + } + + #[allow(clippy::type_complexity, clippy::unwrap_used, clippy::too_many_lines)] + #[test] + fn test_timestamp_from_parts_components() { + let args: [( + i64, + i64, + i64, + i64, + i64, + i64, + Option, + Option, + String, + ); 7] = [ + ( + 2025, + 1, + 2, + 0, + 0, + 0, + None, + None, + "2025-01-02 00:00:00.000000000".to_string(), + ), + ( + 2025, + 1, + 2, + 12, + 0, + 0, + Some(0), + None, + "2025-01-02 12:00:00.000000000".to_string(), + ), + ( + 2025, + 1, + 2, + 12, + 10, + 0, + Some(0), + None, + "2025-01-02 12:10:00.000000000".to_string(), + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(0), + None, + "2025-01-02 12:10:12.000000000".to_string(), + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(500), + None, + "2025-01-02 12:10:12.000000500".to_string(), + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(0), + Some("America/Los_Angeles".to_string()), + "2025-01-02 12:10:12.000000000".to_string(), + ), + ( + 2025, + -5, + -15, + -12, + 0, + -3600, + None, + None, + "2024-06-14 11:00:00.000000000".to_string(), + ), + ]; + + let is_scalar_type = [true, false]; + + for is_scalar in is_scalar_type { + for (i, (y, m, d, h, mi, s, n, tz, exp)) in args.iter().enumerate() { + let mut fn_args = vec![ + columnar_value_fn(is_scalar, *y), + columnar_value_fn(is_scalar, *m), + columnar_value_fn(is_scalar, *d), + columnar_value_fn(is_scalar, *h), + columnar_value_fn(is_scalar, *mi), + columnar_value_fn(is_scalar, *s), + ]; + if let Some(nano) = n { + fn_args.push(columnar_value_fn(is_scalar, *nano)); + }; + if let Some(t) = tz { + fn_args.push(columnar_value_fn(is_scalar, t.to_string())); + }; + let result = TimestampFromPartsFunc::new() + .invoke_batch(&fn_args, 1) + .unwrap(); + let result = to_primitive_array::(&result).unwrap(); + let ts = DateTime::from_timestamp_nanos(result.value(0)) + .format("%Y-%m-%d %H:%M:%S.%f") + .to_string(); + assert_eq!(ts, *exp, "failed at index {i}"); + } + } + } + + #[allow(clippy::unwrap_used)] + #[test] + fn test_timestamp_from_parts_exp() { + let args: [(i32, i64, i64); 2] = [ + (20143, 39_075_773_219_000, 1_740_394_275_773_219_000), + (20143, 0, 1_740_355_200_000_000_000), + ]; + + let is_scalar_type = [true, false]; + + for is_scalar in is_scalar_type { + for (i, (date, time, exp)) in args.iter().enumerate() { + let fn_args = vec![ + columnar_value_fn(is_scalar, ScalarValue::Date32(Some(*date))), + columnar_value_fn(is_scalar, ScalarValue::Time64Nanosecond(Some(*time))), + ]; + let result = TimestampFromPartsFunc::new() + .invoke_batch(&fn_args, 1) + .unwrap(); + let result = to_primitive_array::(&result).unwrap(); + assert_eq!(result.value(0), *exp, "failed at index {i}"); + } + } + } +}