-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Design doc
Is your feature request related to a problem or challenge?
Verbosity
Currently implementing a scalar function is a pretty involved process. For example a simple function calculating greatest common divisor looks like this (a lot of detail omitted for brevity):
impl ScalarUDFImpl for GcdFunc {
fn name(&self) -> &str {
"gcd"
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Int64)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(gcd, vec![])(args)
}
}
/// Gcd SQL function
fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
match args[0].data_type() {
Int64 => {
let arg1 = downcast_arg!(&args[0], "x", Int64Array);
let arg2 = downcast_arg!(&args[1], "y", Int64Array);
Ok(arg1
.iter()
.zip(arg2.iter())
.map(|(a1, a2)| match (a1, a2) {
(Some(a1), Some(a2)) => Ok(Some(compute_gcd(a1, a2)?)),
_ => Ok(None),
})
.collect::<Result<Int64Array>>()
.map(Arc::new)? as ArrayRef)
}
other => exec_err!("Unsupported data type {other:?} for function gcd"),
}
}
/// Computes greatest common divisor using Binary GCD algorithm.
pub fn compute_gcd(x: i64, y: i64) -> Result<i64> {
...
}This function is still "relatively simple" because:
- it handles only int64 type (has no overloads)
- a more complicated function would calculate return type based in input types in
return_typeand again ininvoke
- a more complicated function would calculate return type based in input types in
- doesn't not support constant
xor constantyparameters- the
make_scalar_functionhelper expands scalar constant argument into an array, thusgcd(x, 5)is less efficient thangcd(x, y)because it first needs to allocate a temporary buffer for5s repeated times the batch length
- the
- it doesn't handle dictionaries or run-end (REE / RLE) encoded values
- i doubt people frequently calculate
gcdon partitioning keys, but this is just an example
- i doubt people frequently calculate
- it doesn't handle all-non-null case, so probably misses optimization/vectorization opportunity
It should be possible to express functions using simple row-level operations, because in query engines most function logic is structured like that anyway (compute_gcd in the example).
Local data structures
Currently DataFusion functions are singletons plugged into the execution engine. They have no way to store and reuse buffers or compiled regular expressions, etc.
Thread local storage is not an option because DataFusion, when embedded, does not control creation of application threads.
Describe the solution you'd like
Simple
It should be possible to separate function logic from the mechanics.
Exemplary GCD function needs to provide fn compute_gcd(x: i64, y: i64) -> Result<i64> and the rest of boilerplate should not be hand-written for every function separately.
It should be possible to implement a function that accepts string values, without having to deal with the 5 different runtime representations that can carry string values: StringArray, LargeStringArray, StringViewArray, DictionaryArray<Key>, RunArray<Key> (maybe more than 5 because they are recursive in theory: can DictionaryArray contain a DictionaryArray? can it contain RunArray?)
Focused
Because SQL is statically typed, it is necessary to select function overload during planning time, so that return type is also known (the return_type function). The invoke function needs to implement same logic again. This process should be less error-prone: once the function is bound to the query call site, its signature is known and the implementation should not need to do type checks.
Performant / Efficient
It should be the compiler's / framework's job to provide vectorization, without hand-writing same logic in every function separately.
It should be the compiler's / framework's to do null checks, providing efficient tight loops for the all-non-null case without having to write such logic in every function separately.
It should be possible to write efficient functions that need thread local data structures, for example for regular expressions, without having to use thread locals and/or shared pools which introduce contention.
Describe alternatives you've considered
However, direct use of the library is not possible because
- it operates on Arrow arrays directly, doesn't fit DataFusion execution model
- it operates on Arrow data types, but DataFusion needs its own type system: [Proposal] Decouple logical from physical types #11513
- The problem space for a query engine is much more complicated
The library could still be part of the solution, but doesn't have to be and it's a non-goal to use a particular library.
Additional context
- [Proposal] Decouple logical from physical types #11513
- Change DataFusion to use LogicalTypes and hide "encodings" (like Dictionary and REE) lower down #7421
- How function definitions look like in Trino, for example a function operating on Int64-equivalent https://github.com/trinodb/trino/blob/e6aabfec55fcf5e4dd3c283b9a29da97c1301f6c/core/trino-main/src/main/java/io/trino/operator/scalar/MathFunctions.java#L128-L135
- Velox "Simple Functions" https://vldb.org/pvldb/vol15/p3372-pedreira.pdf