From 52f75a79ae70b0c63a250d8049c22dcf98ebb52f Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 21 Feb 2025 20:14:53 +0300 Subject: [PATCH 1/5] Inital version --- .../runtime/src/datafusion/functions/mod.rs | 2 + .../functions/timestamp_from_parts.rs | 367 ++++++++++++++++++ 2 files changed, 369 insertions(+) create mode 100644 crates/runtime/src/datafusion/functions/timestamp_from_parts.rs diff --git a/crates/runtime/src/datafusion/functions/mod.rs b/crates/runtime/src/datafusion/functions/mod.rs index 4e970be81..9a25f1f1c 100644 --- a/crates/runtime/src/datafusion/functions/mod.rs +++ b/crates/runtime/src/datafusion/functions/mod.rs @@ -23,6 +23,7 @@ mod convert_timezone; mod date_add; mod date_diff; mod parse_json; +mod timestamp_from_parts; pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { let functions: Vec> = vec![ @@ -30,6 +31,7 @@ pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { date_add::get_udf(), parse_json::get_udf(), date_diff::get_udf(), + timestamp_from_parts::get_udf(), ]; for func in functions { diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs new file mode 100644 index 000000000..e707111cb --- /dev/null +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::builder::PrimitiveBuilder; +use arrow::array::timezone::Tz; +use arrow::array::{Array, AsArray, PrimitiveArray, StringArray}; +use arrow::datatypes::{ + DataType, Date32Type, Int32Type, Int64Type, Time64NanosecondType, TimestampNanosecondType, +}; +use arrow_schema::DataType::{Date32, Int32, Int64, Time64}; +use arrow_schema::TimeUnit; +use chrono::prelude::*; +use datafusion::logical_expr::TypeSignature::Coercible; +use datafusion::logical_expr::TypeSignatureClass; +use datafusion_common::types::{logical_int64, logical_string}; +use datafusion_common::{exec_err, internal_err, Result, ScalarValue, _exec_datafusion_err}; +use datafusion_expr::{ + ColumnarValue, ReturnInfo, ReturnTypeArgs, ScalarUDFImpl, Signature, Volatility, +}; + +#[derive(Debug)] +pub struct TimestampFromPartsFunc { + signature: Signature, + aliases: Vec, +} + +impl Default for TimestampFromPartsFunc { + fn default() -> Self { + Self::new() + } +} + +impl TimestampFromPartsFunc { + pub fn new() -> Self { + let basic_signature = vec![TypeSignatureClass::Native(logical_int64()); 6]; + Self { + signature: Signature::one_of( + vec![ + // TIMESTAMP_FROM_PARTS( , , , , , [, ] [, ] ) + Coercible(basic_signature.clone()), + Coercible( + [ + basic_signature.clone(), + vec![TypeSignatureClass::Native(logical_int64())], + ] + .concat(), + ), + Coercible( + [ + basic_signature, + vec![ + TypeSignatureClass::Native(logical_int64()), + TypeSignatureClass::Native(logical_string()), + ], + ] + .concat(), + ), + // TIMESTAMP_FROM_PARTS( , ) + Coercible(vec![TypeSignatureClass::Date, TypeSignatureClass::Time]), + ], + Volatility::Immutable, + ), + aliases: vec![ + String::from("timestamp_ntz_from_parts"), + String::from("timestamp_tz_from_parts"), + String::from("timestamp_ltz_from_parts"), + ], + } + } +} +impl ScalarUDFImpl for TimestampFromPartsFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "timestamp_from_parts" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_type_from_args should be called") + } + + fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result { + if args.arg_types.len() == 8 { + if let Some(ScalarValue::Utf8(Some(tz))) = args.scalar_arguments[7] { + return Ok(ReturnInfo::new_nullable(DataType::Timestamp( + TimeUnit::Nanosecond, + Some(Arc::from(tz.clone())), + ))); + } + } + Ok(ReturnInfo::new_nullable(DataType::Timestamp( + TimeUnit::Nanosecond, + None, + ))) + } + + fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result { + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let array_size = args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(a) => Some(a.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + let is_scalar = array_size == 1; + + // TIMESTAMP_FROM_PARTS( , ) + let result = if args.len() == 2 { + let [date, time] = take_function_args(self.name(), args)?; + let date = to_primitive_array::(&date.cast_to(&Date32, None)?)?; + let time = to_primitive_array::( + &time.cast_to(&Time64(TimeUnit::Nanosecond), None)?, + )?; + let mut builder = PrimitiveArray::builder(array_size); + for i in 0..array_size { + make_timestamp_from_date_time(date.value(i), time.value(i), &mut builder)?; + } + Ok(builder.finish()) + } else if args.len() > 5 && args.len() < 9 { + // TIMESTAMP_FROM_PARTS( , , , , , [, ] [, ] ) + timestamps_from_components(args, array_size) + } else { + internal_err!("Unsupported number of arguments") + }?; + + if is_scalar { + // If all inputs are scalar, keeps output as scalar + let result = ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(result)) + } else { + Ok(ColumnarValue::Array(Arc::new(result))) + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +fn timestamps_from_components( + args: &[ColumnarValue], + array_size: usize, +) -> Result> { + let (years, months, days, hours, minutes, seconds, nanoseconds, time_zone) = match args.len() { + 8 => { + let [years, months, days, hours, minutes, seconds, nanoseconds, time_zone] = + take_function_args("timestamp_from_parts", args)?; + ( + years, + months, + days, + hours, + minutes, + seconds, + Some(nanoseconds), + Some(time_zone), + ) + } + 7 => { + let [years, months, days, hours, minutes, seconds, nanoseconds] = + take_function_args("timestamp_from_parts", args)?; + ( + years, + months, + days, + hours, + minutes, + seconds, + Some(nanoseconds), + None, + ) + } + 6 => { + let [years, months, days, hours, minutes, seconds] = + take_function_args("timestamp_from_parts", args)?; + (years, months, days, hours, minutes, seconds, None, None) + } + _ => return internal_err!("Unsupported number of arguments"), + }; + let years = to_primitive_array::(&years.cast_to(&Int32, None)?)?; + let months = to_primitive_array::(&months.cast_to(&Int32, None)?)?; + let days = to_primitive_array::(&days.cast_to(&Int32, None)?)?; + let hours = to_primitive_array::(&hours.cast_to(&Int32, None)?)?; + let minutes = to_primitive_array::(&minutes.cast_to(&Int32, None)?)?; + let seconds = to_primitive_array::(&seconds.cast_to(&Int32, None)?)?; + let nanoseconds = nanoseconds + .map(|nanoseconds| to_primitive_array::(&nanoseconds.cast_to(&Int64, None)?)) + .transpose()?; + let time_zone = time_zone.map(to_string_array).transpose()?; + + let mut builder: PrimitiveBuilder = + PrimitiveArray::builder(array_size); + for i in 0..array_size { + let nanoseconds = nanoseconds.as_ref().map(|ns| ns.value(i)); + let time_zone = time_zone.as_ref().map(|tz| tz.value(i)); + make_timestamp( + years.value(i), + months.value(i), + days.value(i), + hours.value(i), + minutes.value(i), + seconds.value(i), + nanoseconds, + time_zone, + &mut builder, + )?; + } + Ok(builder.finish()) +} + +fn make_timestamp_from_date_time( + date: i32, + time: i64, + builder: &mut PrimitiveBuilder, +) -> Result<()> { + let time = u32::try_from(time) + .map_err(|_| _exec_datafusion_err!("time value '{time:?}' is out of range"))?; + + if let Some(naive_date) = NaiveDate::from_num_days_from_ce_opt(date) { + if let Some(naive_time) = NaiveTime::from_hms_nano_opt(0, 0, 0, time) { + if let Some(timestamp) = naive_date + .and_time(naive_time) + .and_utc() + .timestamp_nanos_opt() + { + builder.append_value(timestamp); + } else { + return exec_err!( + "Unable to parse timestamp from date '{:?}' and time '{:?}'", + date, + time + ); + } + } else { + return exec_err!("Invalid time part '{:?}'", time); + } + } else { + return exec_err!("Invalid date part '{:?}'", date); + } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn make_timestamp( + year: i32, + month: i32, + day: i32, + hour: i32, + minute: i32, + seconds: i32, + nanosecond: Option, + time_zone: Option<&str>, + builder: &mut PrimitiveBuilder, +) -> Result<()> { + let day = u32::try_from(day) + .map_err(|_| _exec_datafusion_err!("day value '{day:?}' is out of range"))?; + let month = u32::try_from(month) + .map_err(|_| _exec_datafusion_err!("month value '{month:?}' is out of range"))?; + let hour = u32::try_from(hour) + .map_err(|_| _exec_datafusion_err!("hour value '{hour:?}' is out of range"))?; + let minute = u32::try_from(minute) + .map_err(|_| _exec_datafusion_err!("minute value '{minute:?}' is out of range"))?; + let seconds = u32::try_from(seconds) + .map_err(|_| _exec_datafusion_err!("seconds value '{seconds:?}' is out of range"))?; + let nano = u32::try_from(nanosecond.unwrap_or(0)) + .map_err(|_| _exec_datafusion_err!("nanosecond value '{nanosecond:?}' is out of range"))?; + + if let Some(date) = NaiveDate::from_ymd_opt(year, month, day) { + if let Some(time) = NaiveTime::from_hms_nano_opt(hour, minute, seconds, nano) { + let date_time = date.and_time(time); + let timestamp = if let Some(time_zone) = time_zone { + Utc.from_utc_datetime(&date_time) + .with_timezone(&time_zone.parse::()?) + .timestamp_nanos_opt() + } else { + date_time.and_utc().timestamp_nanos_opt() + }; + + if let Some(ts) = timestamp { + builder.append_value(ts); + } else { + return exec_err!( + "Unable to parse timestamp from date '{date:?}' and time '{time:?}'" + ); + } + } else { + return exec_err!("Invalid time part '{hour:?}':'{minute:?}':'{seconds:?}'"); + } + } else { + return exec_err!("Invalid date part '{year:?}'-'{month:?}'-'{day:?}'"); + } + Ok(()) +} + +pub fn take_function_args( + function_name: &str, + args: impl IntoIterator, +) -> Result<[T; N]> { + let args = args.into_iter().collect::>(); + args.try_into().map_err(|v: Vec| { + _exec_datafusion_err!( + "{} function requires {} {}, got {}", + function_name, + N, + if N == 1 { "argument" } else { "arguments" }, + v.len() + ) + }) +} + +fn to_primitive_array(col: &ColumnarValue) -> Result> +where + T: arrow::datatypes::ArrowPrimitiveType, +{ + match col { + ColumnarValue::Array(array) => Ok(array.as_primitive::().to_owned()), + ColumnarValue::Scalar(scalar) => { + let value = scalar.to_array()?; + Ok(value.as_primitive::().to_owned()) + } + } +} + +fn to_string_array(col: &ColumnarValue) -> Result { + match col { + ColumnarValue::Array(array) => array + .as_any() + .downcast_ref::() + .map(std::borrow::ToOwned::to_owned) + .ok_or_else(|| _exec_datafusion_err!("Failed to downcast Array to StringArray")), + ColumnarValue::Scalar(scalar) => { + let value = scalar.to_array()?; + value + .as_any() + .downcast_ref::() + .map(std::borrow::ToOwned::to_owned) + .ok_or_else(|| _exec_datafusion_err!("Failed to downcast Scalar to StringArray")) + } + } +} + +super::macros::make_udf_function!(TimestampFromPartsFunc); From ff7192498cb4fe6991da9a74058c89232706a567 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 24 Feb 2025 14:15:34 +0300 Subject: [PATCH 2/5] snowflake timestamp_from_parts udf --- Cargo.toml | 4 + crates/runtime/Cargo.toml | 2 + .../functions/timestamp_from_parts.rs | 256 +++++++++++++++++- 3 files changed, 252 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 33f243460..4331eaa65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ datafusion = { version = "45.0.0" } datafusion-common = { version = "45.0.0" } datafusion-expr = { version = "45.0.0" } datafusion-physical-plan = { version = "45.0.0" } +datafusion-doc = { version = "45.0.0" } +datafusion-macros = { version = "45.0.0" } # do not change iceberg-rust, datafusion-functions-json hash commits unless datafusion version bumped datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "f2b9b88cd9b4282bc0286a970333e8c01cec177b" } @@ -53,6 +55,8 @@ datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72 datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-doc = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-macros = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } [workspace.lints.clippy] all = { level = "deny", priority = -1 } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 44077a485..d872f74bf 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -19,6 +19,8 @@ datafusion-common = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-json = { workspace = true } datafusion-physical-plan = { workspace = true } +datafusion-doc = { workspace = true } +datafusion-macros = { workspace = true } datafusion_iceberg = { workspace = true } futures = { workspace = true } diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs index e707111cb..54963660e 100644 --- a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -34,7 +34,77 @@ use datafusion_common::{exec_err, internal_err, Result, ScalarValue, _exec_dataf use datafusion_expr::{ ColumnarValue, ReturnInfo, ReturnTypeArgs, ScalarUDFImpl, Signature, Volatility, }; +use datafusion_macros::user_doc; +const UNIX_EPOCH_DAYS: i32 = 719_163; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = "Creates a timestamp from individual numeric components.", + syntax_example = "timestamp_from_parts(, , , , , [, ] [, ] )", + sql_example = "```sql + > select timestamp_from_parts(2025, 2, 24, 12, 0, 50); + +-----------------------------------------------------------------------------------+ + | timestamp_from_parts(Int64(2025),Int64(2),Int64(24),Int64(12),Int64(0),Int64(50)) | + +-----------------------------------------------------------------------------------+ + | 1740398450.0 | + +-----------------------------------------------------------------------------------+ + SELECT timestamp_from_parts(2025, 2, 24, 12, 0, 50, 65555555); + +---------------------------------------------------------------------------------------------------+ + | timestamp_from_parts(Int64(2025),Int64(2),Int64(24),Int64(12),Int64(0),Int64(50),Int64(65555555)) | + +---------------------------------------------------------------------------------------------------+ + | 1740398450.65555555 | + +---------------------------------------------------------------------------------------------------+ +```", + standard_argument(name = "str", prefix = "String"), + argument( + name = "year", + description = "An integer expression to use as a year for building a timestamp." + ), + argument( + name = "month", + description = "An integer expression to use as a month for building a timestamp, with January represented as 1, and December as 12." + ), + argument( + name = "day", + description = "An integer expression to use as a day for building a timestamp, usually in the 1-31 range." + ), + argument( + name = "hour", + description = "An integer expression to use as an hour for building a timestamp, usually in the 0-23 range." + ), + argument( + name = "minute", + description = "An integer expression to use as a minute for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "second", + description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "second", + description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "date_expr", + description = "Specifies the date expression to use for building a timestamp + where date_expr provides the year, month, and day for the timestamp." + ), + argument( + name = "time_expr", + description = "Specifies the time expression to use for building a timestamp + where time_expr provides the hour, minute, second, and nanoseconds within the day." + ), + argument( + name = "nanoseconds", + description = "Optional integer expression to use as a nanosecond for building a timestamp, + usually in the 0-999999999 range." + ), + argument( + name = "time_zone", + description = "A string expression to use as a time zone for building a timestamp (e.g. America/Los_Angeles)." + ) +)] #[derive(Debug)] pub struct TimestampFromPartsFunc { signature: Signature, @@ -234,15 +304,20 @@ fn timestamps_from_components( } fn make_timestamp_from_date_time( - date: i32, - time: i64, + days: i32, + time_nano: i64, builder: &mut PrimitiveBuilder, ) -> Result<()> { - let time = u32::try_from(time) - .map_err(|_| _exec_datafusion_err!("time value '{time:?}' is out of range"))?; + let seconds = u32::try_from(time_nano / 1_000_000_000) + .map_err(|_| _exec_datafusion_err!("time seconds value '{time_nano:?}' is out of range"))?; + let nanoseconds = u32::try_from(time_nano % 1_000_000_000).map_err(|_| { + _exec_datafusion_err!("time nanoseconds value '{time_nano:?}' is out of range") + })?; - if let Some(naive_date) = NaiveDate::from_num_days_from_ce_opt(date) { - if let Some(naive_time) = NaiveTime::from_hms_nano_opt(0, 0, 0, time) { + if let Some(naive_date) = NaiveDate::from_num_days_from_ce_opt(days + UNIX_EPOCH_DAYS) { + if let Some(naive_time) = + NaiveTime::from_num_seconds_from_midnight_opt(seconds, nanoseconds) + { if let Some(timestamp) = naive_date .and_time(naive_time) .and_utc() @@ -252,15 +327,15 @@ fn make_timestamp_from_date_time( } else { return exec_err!( "Unable to parse timestamp from date '{:?}' and time '{:?}'", - date, - time + days, + time_nano ); } } else { - return exec_err!("Invalid time part '{:?}'", time); + return exec_err!("Invalid time part '{:?}'", time_nano); } } else { - return exec_err!("Invalid date part '{:?}'", date); + return exec_err!("Invalid date part '{:?}'", days); } Ok(()) } @@ -365,3 +440,164 @@ fn to_string_array(col: &ColumnarValue) -> Result { } super::macros::make_udf_function!(TimestampFromPartsFunc); + +#[cfg(test)] +mod test { + use crate::datafusion::functions::timestamp_from_parts::{ + to_primitive_array, TimestampFromPartsFunc, + }; + use arrow::datatypes::TimestampNanosecondType; + use datafusion::logical_expr::ColumnarValue; + use datafusion_common::ScalarValue; + use datafusion_expr::ScalarUDFImpl; + + #[allow(clippy::unwrap_used)] + fn columnar_value_fn(is_scalar: bool, v: T) -> ColumnarValue + where + ScalarValue: From, + T: Clone, + { + if is_scalar { + ColumnarValue::Scalar(ScalarValue::from(v)) + } else { + ColumnarValue::Array(ScalarValue::from(v).to_array().unwrap()) + } + } + + #[allow(clippy::type_complexity)] + #[allow(clippy::unwrap_used)] + #[test] + fn test_timestamp_from_parts_components() { + let args: [( + i64, + i64, + i64, + i64, + i64, + i64, + Option, + Option, + i64, + ); 7] = [ + (2025, 1, 2, 0, 0, 0, None, None, 1_735_776_000_000_000_000), + ( + 2025, + 1, + 2, + 12, + 0, + 0, + Some(0), + Some("UTC".to_string()), + 1_735_819_200_000_000_000, + ), + ( + 2025, + 1, + 2, + 12, + 10, + 0, + Some(0), + Some("America/New_York".to_string()), + 1_735_819_800_000_000_000, + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(0), + Some("Asia/Tokyo".to_string()), + 1_735_819_812_000_000_000, + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(0), + Some("Europe/London".to_string()), + 1_735_819_812_000_000_000, + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(0), + Some("Africa/Cairo".to_string()), + 1_735_819_812_000_000_000, + ), + ( + 2025, + 1, + 2, + 12, + 10, + 12, + Some(10), + Some("Australia/Sydney".to_string()), + 1_735_819_812_000_000_010, + ), + ]; + + let is_scalar_type = [true, false]; + + for is_scalar in is_scalar_type { + for (i, (y, m, d, h, mi, s, n, tz, exp)) in args.iter().enumerate() { + let mut fn_args = vec![ + columnar_value_fn(is_scalar, *y), + columnar_value_fn(is_scalar, *m), + columnar_value_fn(is_scalar, *d), + columnar_value_fn(is_scalar, *h), + columnar_value_fn(is_scalar, *mi), + columnar_value_fn(is_scalar, *s), + ]; + if let Some(nano) = n { + fn_args.push(columnar_value_fn(is_scalar, *nano)); + }; + if let Some(t) = tz { + fn_args.push(columnar_value_fn(is_scalar, t.to_string())); + }; + let result = TimestampFromPartsFunc::new() + .invoke_batch(&fn_args, 1) + .unwrap(); + let result = to_primitive_array::(&result).unwrap(); + assert_eq!(result.value(0), *exp, "failed at index {i}"); + } + } + } + + #[allow(clippy::unwrap_used)] + #[test] + fn test_timestamp_from_parts_exp() { + // TypeSignatureClass::Date, TypeSignatureClass::Time + let args: [(i32, i64, i64); 2] = [ + (20143, 39_075_773_219_000, 1_740_394_275_773_219_000), + (20143, 0, 1_740_355_200_000_000_000), + ]; + + let is_scalar_type = [true, false]; + + for is_scalar in is_scalar_type { + for (i, (date, time, exp)) in args.iter().enumerate() { + let fn_args = vec![ + columnar_value_fn(is_scalar, ScalarValue::Date32(Some(*date))), + columnar_value_fn(is_scalar, ScalarValue::Time64Nanosecond(Some(*time))), + ]; + let result = TimestampFromPartsFunc::new() + .invoke_batch(&fn_args, 1) + .unwrap(); + let result = to_primitive_array::(&result).unwrap(); + assert_eq!(result.value(0), *exp, "failed at index {i}"); + } + } + } +} From ebf236ac80e6e18061f7d6e31e65f5a6f52f13ae Mon Sep 17 00:00:00 2001 From: osipovartem Date: Mon, 24 Feb 2025 17:58:34 +0300 Subject: [PATCH 3/5] Fix clippy --- .../runtime/src/datafusion/functions/timestamp_from_parts.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs index 54963660e..a93bc6fa5 100644 --- a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -160,7 +160,7 @@ impl ScalarUDFImpl for TimestampFromPartsFunc { self } - fn name(&self) -> &str { + fn name(&self) -> &'static str { "timestamp_from_parts" } @@ -464,8 +464,7 @@ mod test { } } - #[allow(clippy::type_complexity)] - #[allow(clippy::unwrap_used)] + #[allow(clippy::type_complexity, clippy::unwrap_used, clippy::too_many_lines)] #[test] fn test_timestamp_from_parts_components() { let args: [( From cf233d75a5f1ef3b8bbcd31f9645bb103ed4c719 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 25 Feb 2025 13:16:46 +0300 Subject: [PATCH 4/5] Fix example typo --- .../runtime/src/datafusion/functions/timestamp_from_parts.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs index a93bc6fa5..a77684225 100644 --- a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -81,10 +81,6 @@ const UNIX_EPOCH_DAYS: i32 = 719_163; name = "second", description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." ), - argument( - name = "second", - description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." - ), argument( name = "date_expr", description = "Specifies the date expression to use for building a timestamp From c188f486002f87dfa3fc3ad49b36817872438932 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 25 Feb 2025 13:27:19 +0300 Subject: [PATCH 5/5] Support negative values --- .../functions/timestamp_from_parts.rs | 203 +++++++++--------- 1 file changed, 102 insertions(+), 101 deletions(-) diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs index a77684225..16785b10e 100644 --- a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -27,6 +27,7 @@ use arrow::datatypes::{ use arrow_schema::DataType::{Date32, Int32, Int64, Time64}; use arrow_schema::TimeUnit; use chrono::prelude::*; +use chrono::{Duration, Months}; use datafusion::logical_expr::TypeSignature::Coercible; use datafusion::logical_expr::TypeSignatureClass; use datafusion_common::types::{logical_int64, logical_string}; @@ -36,7 +37,7 @@ use datafusion_expr::{ }; use datafusion_macros::user_doc; -const UNIX_EPOCH_DAYS: i32 = 719_163; +const UNIX_DAYS_FROM_CE: i32 = 719_163; #[user_doc( doc_section(label = "Time and Date Functions"), @@ -56,7 +57,6 @@ const UNIX_EPOCH_DAYS: i32 = 719_163; | 1740398450.65555555 | +---------------------------------------------------------------------------------------------------+ ```", - standard_argument(name = "str", prefix = "String"), argument( name = "year", description = "An integer expression to use as a year for building a timestamp." @@ -147,6 +147,9 @@ impl TimestampFromPartsFunc { String::from("timestamp_ntz_from_parts"), String::from("timestamp_tz_from_parts"), String::from("timestamp_ltz_from_parts"), + String::from("timestampntzfromparts"), + String::from("timestamptzfromparts"), + String::from("timestampltzfromparts"), ], } } @@ -204,7 +207,8 @@ impl ScalarUDFImpl for TimestampFromPartsFunc { )?; let mut builder = PrimitiveArray::builder(array_size); for i in 0..array_size { - make_timestamp_from_date_time(date.value(i), time.value(i), &mut builder)?; + let ts = make_timestamp_from_date_time(date.value(i), time.value(i))?; + builder.append_value(ts); } Ok(builder.finish()) } else if args.len() > 5 && args.len() < 9 { @@ -284,7 +288,7 @@ fn timestamps_from_components( for i in 0..array_size { let nanoseconds = nanoseconds.as_ref().map(|ns| ns.value(i)); let time_zone = time_zone.as_ref().map(|tz| tz.value(i)); - make_timestamp( + let ts = make_timestamp( years.value(i), months.value(i), days.value(i), @@ -293,47 +297,46 @@ fn timestamps_from_components( seconds.value(i), nanoseconds, time_zone, - &mut builder, )?; + builder.append_value(ts); } Ok(builder.finish()) } -fn make_timestamp_from_date_time( - days: i32, - time_nano: i64, - builder: &mut PrimitiveBuilder, -) -> Result<()> { - let seconds = u32::try_from(time_nano / 1_000_000_000) - .map_err(|_| _exec_datafusion_err!("time seconds value '{time_nano:?}' is out of range"))?; - let nanoseconds = u32::try_from(time_nano % 1_000_000_000).map_err(|_| { - _exec_datafusion_err!("time nanoseconds value '{time_nano:?}' is out of range") - })?; - - if let Some(naive_date) = NaiveDate::from_num_days_from_ce_opt(days + UNIX_EPOCH_DAYS) { - if let Some(naive_time) = - NaiveTime::from_num_seconds_from_midnight_opt(seconds, nanoseconds) - { - if let Some(timestamp) = naive_date - .and_time(naive_time) - .and_utc() - .timestamp_nanos_opt() - { - builder.append_value(timestamp); +fn make_timestamp_from_date_time(date: i32, time: i64) -> Result { + make_timestamp_from_nanoseconds(i64::from(date) * 86_400_000_000_000 + time, None) +} + +fn make_date(year: i32, month: i32, days: i32) -> Result { + let u_month = match month { + 0 => 1, + _ if month < 0 => 1 - month, + _ => month - 1, + }; + let u_month = u32::try_from(u_month) + .map_err(|_| _exec_datafusion_err!("month value '{month:?}' is out of range"))?; + + NaiveDate::from_ymd_opt(year, 1, 1).map_or_else( + || exec_err!("Invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |date| { + let months = Months::new(u_month); + let days = Duration::days(i64::from(days - 1)); + let result = if month <= 0 { + date.checked_sub_months(months) } else { - return exec_err!( - "Unable to parse timestamp from date '{:?}' and time '{:?}'", - days, - time_nano - ); - } - } else { - return exec_err!("Invalid time part '{:?}'", time_nano); - } - } else { - return exec_err!("Invalid date part '{:?}'", days); - } - Ok(()) + date.checked_add_months(months) + }; + result.map_or_else( + || exec_err!("invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |months_result| { + months_result.checked_add_signed(days).map_or_else( + || exec_err!("invalid date part '{year:?}' '{month:?}' '{days:?}'"), + |days_result| Ok(days_result.num_days_from_ce() - UNIX_DAYS_FROM_CE), + ) + }, + ) + }, + ) } #[allow(clippy::too_many_arguments)] @@ -344,48 +347,33 @@ fn make_timestamp( hour: i32, minute: i32, seconds: i32, - nanosecond: Option, - time_zone: Option<&str>, - builder: &mut PrimitiveBuilder, -) -> Result<()> { - let day = u32::try_from(day) - .map_err(|_| _exec_datafusion_err!("day value '{day:?}' is out of range"))?; - let month = u32::try_from(month) - .map_err(|_| _exec_datafusion_err!("month value '{month:?}' is out of range"))?; - let hour = u32::try_from(hour) - .map_err(|_| _exec_datafusion_err!("hour value '{hour:?}' is out of range"))?; - let minute = u32::try_from(minute) - .map_err(|_| _exec_datafusion_err!("minute value '{minute:?}' is out of range"))?; - let seconds = u32::try_from(seconds) - .map_err(|_| _exec_datafusion_err!("seconds value '{seconds:?}' is out of range"))?; - let nano = u32::try_from(nanosecond.unwrap_or(0)) - .map_err(|_| _exec_datafusion_err!("nanosecond value '{nanosecond:?}' is out of range"))?; - - if let Some(date) = NaiveDate::from_ymd_opt(year, month, day) { - if let Some(time) = NaiveTime::from_hms_nano_opt(hour, minute, seconds, nano) { - let date_time = date.and_time(time); - let timestamp = if let Some(time_zone) = time_zone { - Utc.from_utc_datetime(&date_time) - .with_timezone(&time_zone.parse::()?) - .timestamp_nanos_opt() - } else { - date_time.and_utc().timestamp_nanos_opt() - }; + nano: Option, + timezone: Option<&str>, +) -> Result { + let days = make_date(year, month, day)?; + let n_days = i64::from(days) * 86_400_000_000_000; + let n_hour = i64::from(hour) * 3_600_000_000_000; + let n_minute = i64::from(minute) * 60_000_000_000; + let n_seconds = i64::from(seconds) * 1_000_000_000; + let n_nano = nano.unwrap_or(0); + let total_nanos = n_days + n_hour + n_minute + n_seconds + n_nano; + + make_timestamp_from_nanoseconds(total_nanos, timezone) +} - if let Some(ts) = timestamp { - builder.append_value(ts); - } else { - return exec_err!( - "Unable to parse timestamp from date '{date:?}' and time '{time:?}'" - ); - } - } else { - return exec_err!("Invalid time part '{hour:?}':'{minute:?}':'{seconds:?}'"); - } +fn make_timestamp_from_nanoseconds(nanoseconds: i64, tz: Option<&str>) -> Result { + let date_time = DateTime::from_timestamp_nanos(nanoseconds); + let timestamp = if let Some(timezone) = tz { + date_time + .with_timezone(&timezone.parse::()?) + .timestamp_nanos_opt() } else { - return exec_err!("Invalid date part '{year:?}'-'{month:?}'-'{day:?}'"); - } - Ok(()) + date_time.timestamp_nanos_opt() + }; + timestamp.map_or_else( + || exec_err!("Unable to parse timestamp from '{:?}'", nanoseconds), + Ok, + ) } pub fn take_function_args( @@ -443,6 +431,7 @@ mod test { to_primitive_array, TimestampFromPartsFunc, }; use arrow::datatypes::TimestampNanosecondType; + use chrono::DateTime; use datafusion::logical_expr::ColumnarValue; use datafusion_common::ScalarValue; use datafusion_expr::ScalarUDFImpl; @@ -472,30 +461,29 @@ mod test { i64, Option, Option, - i64, + String, ); 7] = [ - (2025, 1, 2, 0, 0, 0, None, None, 1_735_776_000_000_000_000), ( 2025, 1, 2, - 12, 0, 0, - Some(0), - Some("UTC".to_string()), - 1_735_819_200_000_000_000, + 0, + None, + None, + "2025-01-02 00:00:00.000000000".to_string(), ), ( 2025, 1, 2, 12, - 10, + 0, 0, Some(0), - Some("America/New_York".to_string()), - 1_735_819_800_000_000_000, + None, + "2025-01-02 12:00:00.000000000".to_string(), ), ( 2025, @@ -503,10 +491,10 @@ mod test { 2, 12, 10, - 12, + 0, Some(0), - Some("Asia/Tokyo".to_string()), - 1_735_819_812_000_000_000, + None, + "2025-01-02 12:10:00.000000000".to_string(), ), ( 2025, @@ -516,8 +504,8 @@ mod test { 10, 12, Some(0), - Some("Europe/London".to_string()), - 1_735_819_812_000_000_000, + None, + "2025-01-02 12:10:12.000000000".to_string(), ), ( 2025, @@ -526,9 +514,9 @@ mod test { 12, 10, 12, - Some(0), - Some("Africa/Cairo".to_string()), - 1_735_819_812_000_000_000, + Some(500), + None, + "2025-01-02 12:10:12.000000500".to_string(), ), ( 2025, @@ -537,9 +525,20 @@ mod test { 12, 10, 12, - Some(10), - Some("Australia/Sydney".to_string()), - 1_735_819_812_000_000_010, + Some(0), + Some("America/Los_Angeles".to_string()), + "2025-01-02 12:10:12.000000000".to_string(), + ), + ( + 2025, + -5, + -15, + -12, + 0, + -3600, + None, + None, + "2024-06-14 11:00:00.000000000".to_string(), ), ]; @@ -565,7 +564,10 @@ mod test { .invoke_batch(&fn_args, 1) .unwrap(); let result = to_primitive_array::(&result).unwrap(); - assert_eq!(result.value(0), *exp, "failed at index {i}"); + let ts = DateTime::from_timestamp_nanos(result.value(0)) + .format("%Y-%m-%d %H:%M:%S.%f") + .to_string(); + assert_eq!(ts, *exp, "failed at index {i}"); } } } @@ -573,7 +575,6 @@ mod test { #[allow(clippy::unwrap_used)] #[test] fn test_timestamp_from_parts_exp() { - // TypeSignatureClass::Date, TypeSignatureClass::Time let args: [(i32, i64, i64); 2] = [ (20143, 39_075_773_219_000, 1_740_394_275_773_219_000), (20143, 0, 1_740_355_200_000_000_000),