diff --git a/crates/runtime/src/datafusion/functions/mod.rs b/crates/runtime/src/datafusion/functions/mod.rs index 9a25f1f1c..9c46194d1 100644 --- a/crates/runtime/src/datafusion/functions/mod.rs +++ b/crates/runtime/src/datafusion/functions/mod.rs @@ -23,6 +23,7 @@ mod convert_timezone; mod date_add; mod date_diff; mod parse_json; +mod time_from_parts; mod timestamp_from_parts; pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { @@ -32,6 +33,7 @@ pub fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { parse_json::get_udf(), date_diff::get_udf(), timestamp_from_parts::get_udf(), + time_from_parts::get_udf(), ]; for func in functions { diff --git a/crates/runtime/src/datafusion/functions/time_from_parts.rs b/crates/runtime/src/datafusion/functions/time_from_parts.rs new file mode 100644 index 000000000..ad17fa6a2 --- /dev/null +++ b/crates/runtime/src/datafusion/functions/time_from_parts.rs @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use crate::datafusion::functions::timestamp_from_parts::{ + make_time, take_function_args, to_primitive_array, +}; +use arrow::array::builder::PrimitiveBuilder; +use arrow::array::{Array, PrimitiveArray}; +use arrow::datatypes::{DataType, Int64Type, Time64NanosecondType}; +use arrow_schema::DataType::{Int64, Time64}; +use arrow_schema::TimeUnit; +use datafusion::logical_expr::TypeSignature::Coercible; +use datafusion::logical_expr::TypeSignatureClass; +use datafusion_common::types::logical_int64; +use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_macros::user_doc; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = "Creates a timestamp from individual numeric components.", + syntax_example = "time_from_parts(, , [, ])", + sql_example = "```sql + > select time_from_parts(12, 34, 56, 987654321); + +-----------------------------------------------------------------+ + | time_from_parts(Int64(12),Int64(34),Int64(56),Int64(987654321)) | + +-----------------------------------------------------------------+ + | 1740398450.0 | + +-----------------------------------------------------------------+ +```", + argument( + name = "hour", + description = "An integer expression to use as an hour for building a timestamp, usually in the 0-23 range." + ), + argument( + name = "minute", + description = "An integer expression to use as a minute for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "second", + description = "An integer expression to use as a second for building a timestamp, usually in the 0-59 range." + ), + argument( + name = "nanoseconds", + description = "Optional integer expression to use as a nanosecond for building a timestamp, + usually in the 0-999999999 range." + ) +)] +#[derive(Debug)] +pub struct TimeFromPartsFunc { + signature: Signature, + aliases: Vec, +} + +impl Default for TimeFromPartsFunc { + fn default() -> Self { + Self::new() + } +} + +impl TimeFromPartsFunc { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + Coercible(vec![TypeSignatureClass::Native(logical_int64()); 4]), + Coercible(vec![TypeSignatureClass::Native(logical_int64()); 3]), + ], + Volatility::Immutable, + ), + aliases: vec![String::from("timefromparts")], + } + } +} +impl ScalarUDFImpl for TimeFromPartsFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &'static str { + "time_from_parts" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Time64(TimeUnit::Nanosecond)) + } + + fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result { + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let array_size = args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(a) => Some(a.len()), + ColumnarValue::Scalar(_) => None, + }) + .unwrap_or(1); + let is_scalar = array_size == 1; + + let result = time_from_components(args, array_size)?; + if is_scalar { + // If all inputs are scalar, keeps output as scalar + let result = ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(result)) + } else { + Ok(ColumnarValue::Array(Arc::new(result))) + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +fn time_from_components( + args: &[ColumnarValue], + array_size: usize, +) -> Result> { + let (hours, minutes, seconds, nanos) = match args.len() { + 4 => { + let [hours, minutes, seconds, nanos] = take_function_args("time_from_parts", args)?; + (hours, minutes, seconds, Some(nanos)) + } + 3 => { + let [hours, minutes, seconds] = take_function_args("time_from_parts", args)?; + (hours, minutes, seconds, None) + } + _ => return internal_err!("Unsupported number of arguments"), + }; + + let hours = to_primitive_array::(&hours.cast_to(&Int64, None)?)?; + let minutes = to_primitive_array::(&minutes.cast_to(&Int64, None)?)?; + let seconds = to_primitive_array::(&seconds.cast_to(&Int64, None)?)?; + let nanoseconds = nanos + .map(|nanoseconds| to_primitive_array::(&nanoseconds.cast_to(&Int64, None)?)) + .transpose()?; + let mut builder: PrimitiveBuilder = PrimitiveArray::builder(array_size); + for i in 0..array_size { + builder.append_value(make_time( + hours.value(i), + minutes.value(i), + seconds.value(i), + nanoseconds.as_ref().map(|ns| ns.value(i)), + )); + } + Ok(builder.finish()) +} + +super::macros::make_udf_function!(TimeFromPartsFunc); + +#[cfg(test)] +mod test { + use crate::datafusion::functions::time_from_parts::TimeFromPartsFunc; + use crate::datafusion::functions::timestamp_from_parts::to_primitive_array; + use arrow::datatypes::Time64NanosecondType; + use chrono::NaiveTime; + use datafusion::logical_expr::ColumnarValue; + use datafusion_common::ScalarValue; + use datafusion_expr::ScalarUDFImpl; + + #[allow(clippy::unwrap_used)] + fn columnar_value_fn(is_scalar: bool, v: T) -> ColumnarValue + where + ScalarValue: From, + T: Clone, + { + if is_scalar { + ColumnarValue::Scalar(ScalarValue::from(v)) + } else { + ColumnarValue::Array(ScalarValue::from(v).to_array().unwrap()) + } + } + + #[allow(clippy::type_complexity)] + #[allow(clippy::unwrap_used)] + #[test] + fn test_time_from_parts() { + let args: [(i64, i64, i64, Option, String); 6] = [ + (12, 0, 0, None, "12::00::00".to_string()), + (12, 10, 0, None, "12::10::00".to_string()), + (12, 10, 12, None, "12::10::12".to_string()), + (12, 10, 12, Some(255), "12::10::12.000000255".to_string()), + (12, 10, -12, Some(255), "12::09::48.000000255".to_string()), + (12, -10, -12, Some(255), "12::49::48.000000255".to_string()), + ]; + + let is_scalar_type = [true, false]; + + for is_scalar in is_scalar_type { + for (i, (h, mi, s, n, exp)) in args.iter().enumerate() { + let mut fn_args = vec![ + columnar_value_fn(is_scalar, *h), + columnar_value_fn(is_scalar, *mi), + columnar_value_fn(is_scalar, *s), + ]; + if let Some(nano) = n { + fn_args.push(columnar_value_fn(is_scalar, *nano)); + }; + let result = TimeFromPartsFunc::new().invoke_batch(&fn_args, 1).unwrap(); + let result = to_primitive_array::(&result).unwrap(); + let seconds = result.value(0) / 1_000_000_000; + let nanoseconds = result.value(0) % 1_000_000_000; + + let time = NaiveTime::from_num_seconds_from_midnight_opt( + u32::try_from(seconds).unwrap(), + u32::try_from(nanoseconds).unwrap(), + ) + .unwrap(); + assert_eq!( + time.format("%H:%M:%S.%9f").to_string(), + *exp, + "failed at index {i}" + ); + } + } + } +} diff --git a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs index 16785b10e..c6ce46f69 100644 --- a/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs +++ b/crates/runtime/src/datafusion/functions/timestamp_from_parts.rs @@ -275,9 +275,9 @@ fn timestamps_from_components( let years = to_primitive_array::(&years.cast_to(&Int32, None)?)?; let months = to_primitive_array::(&months.cast_to(&Int32, None)?)?; let days = to_primitive_array::(&days.cast_to(&Int32, None)?)?; - let hours = to_primitive_array::(&hours.cast_to(&Int32, None)?)?; - let minutes = to_primitive_array::(&minutes.cast_to(&Int32, None)?)?; - let seconds = to_primitive_array::(&seconds.cast_to(&Int32, None)?)?; + let hours = to_primitive_array::(&hours.cast_to(&Int64, None)?)?; + let minutes = to_primitive_array::(&minutes.cast_to(&Int64, None)?)?; + let seconds = to_primitive_array::(&seconds.cast_to(&Int64, None)?)?; let nanoseconds = nanoseconds .map(|nanoseconds| to_primitive_array::(&nanoseconds.cast_to(&Int64, None)?)) .transpose()?; @@ -339,25 +339,29 @@ fn make_date(year: i32, month: i32, days: i32) -> Result { ) } +pub fn make_time(hour: i64, minute: i64, seconds: i64, nanosecond: Option) -> i64 { + let n_hour = hour * 3_600_000_000_000; + let n_minute = minute * 60_000_000_000; + let n_seconds = seconds * 1_000_000_000; + let n_nano = nanosecond.unwrap_or(0); + n_hour + n_minute + n_seconds + n_nano +} + #[allow(clippy::too_many_arguments)] fn make_timestamp( year: i32, month: i32, day: i32, - hour: i32, - minute: i32, - seconds: i32, + hour: i64, + minute: i64, + seconds: i64, nano: Option, timezone: Option<&str>, ) -> Result { let days = make_date(year, month, day)?; - let n_days = i64::from(days) * 86_400_000_000_000; - let n_hour = i64::from(hour) * 3_600_000_000_000; - let n_minute = i64::from(minute) * 60_000_000_000; - let n_seconds = i64::from(seconds) * 1_000_000_000; - let n_nano = nano.unwrap_or(0); - let total_nanos = n_days + n_hour + n_minute + n_seconds + n_nano; - + let n_date = i64::from(days) * 86_400_000_000_000; + let n_time = make_time(hour, minute, seconds, nano); + let total_nanos = n_date + n_time; make_timestamp_from_nanoseconds(total_nanos, timezone) } @@ -392,7 +396,7 @@ pub fn take_function_args( }) } -fn to_primitive_array(col: &ColumnarValue) -> Result> +pub fn to_primitive_array(col: &ColumnarValue) -> Result> where T: arrow::datatypes::ArrowPrimitiveType, { @@ -449,7 +453,7 @@ mod test { } } - #[allow(clippy::type_complexity, clippy::unwrap_used, clippy::too_many_lines)] + #[allow(clippy::type_complexity, clippy::too_many_lines, clippy::unwrap_used)] #[test] fn test_timestamp_from_parts_components() { let args: [(