-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
Async UDFs cannot currently be used as input to aggregations.
Here is a reproducer using the existing async_udf example:
Apply this diff:
diff --git a/datafusion-examples/examples/async_udf.rs b/datafusion-examples/examples/async_udf.rs
index b52ec68ea..3cd5e03e6 100644
--- a/datafusion-examples/examples/async_udf.rs
+++ b/datafusion-examples/examples/async_udf.rs
@@ -43,8 +43,12 @@ async fn main() -> Result<()> {
// Use a hard coded parallelism level of 4 so the explain plan
// is consistent across machines.
let config = SessionConfig::new().with_target_partitions(4);
- let ctx =
- SessionContext::from(SessionStateBuilder::new().with_config(config).build());
+ let ctx = SessionContext::from(
+ SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .build(),
+ );
// Similarly to regular UDFs, you create an AsyncScalarUDF by implementing
// `AsyncScalarUDFImpl` and creating an instance of `AsyncScalarUDF`.
@@ -62,7 +66,7 @@ async fn main() -> Result<()> {
//
// Note: Async UDFs can currently be used in the select list and filter conditions.
let results = ctx
- .sql("select * from animal a where ask_llm(a.name, 'Is this animal furry?')")
+ .sql("select min(ask_llm(a.name, 'Is this animal furry?')) from animal a")
.await?
.collect()
.await?;
Then run cargo run --example async_udf.
That yields the non-informative error: Error: Internal("async functions should not be called directly")
Describe the solution you'd like
Async UDFs can be used as input to aggregations.
(A small incremental improvement would be a more descriptive error message saying that async UDFs in aggregations are currently not supported)
Describe alternatives you've considered
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request