-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20586] [SQL] Add deterministic to ScalaUDF #17848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even our test case does not follow our assumption. We do not expect users to define non-deterministic UDF before this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like Scala compiler is not smart enough...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sounds like a source code compatibility issue, can we look into it?
|
Test build #76428 has finished for PR 17848 at commit
|
|
Disabling optimizations aside, to what extent can we actually support nondeterministic functions? Right now a common user mistake is to run RNG inside an UDF. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this breaks bin. compatibility (I know you are familiar with the point), the target of this pr is spark-v3.0.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive UDFs ignores distinctLike and AFAIK there is no optimisation rules for distinctLike though, do we need this param now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is being used by Hive optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically think these parameters are useful for users though, do we always need to set deterministic and distinctLike when registering UDFs? ISTM this is a little annoying for users, so we'd better to use default parameters for these parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion with others, we decide to use the way you proposed. The related two PRs have been merged. Will make a change in this PR too. Thanks!
|
@zero323 Which caches? Could you give an example? |
|
Sorry for a late update. Taking care of two kids alone is really a challenging task. Will update the PR now. |
|
My concern is that people trying non-deterministic UDFs get tripped by repeated computations at least as often as by internal optimizations, and In particular let's say we have this fan-out - fan-in worfklow depending on a non-deterministic where dotted edges represent an arbitrary chain of transformations. Can we ensure that the state of each |
Not sure whether it answers your concern? |
| } | ||
| Assert.assertEquals(55, sum); | ||
| Assert.assertTrue("EXPLAIN outputs are expected to contain the UDF name.", | ||
| spark.sql("EXPLAIN SELECT inc(1) AS f").collectAsList().toString().contains("inc")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to fix the issue of name loss for JavaUDF in the explain command.
|
Test build #76915 has finished for PR 17848 at commit
|
|
Test build #76918 has finished for PR 17848 at commit
|
|
Test build #76919 has finished for PR 17848 at commit
|
|
Test build #76922 has finished for PR 17848 at commit
|
| val inputTypes = Try($inputTypes).toOption | ||
| def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable) | ||
| functionRegistry.registerFunction(name, builder) | ||
| UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't directly call register(name, func, deterministic = true, distinctLike = false) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will break the JAVA applications that call our Scala APIs with default arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably miss your point though, I suggested code below;
/**
* Registers a Scala closure of 0 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 1.3.0
*/
def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = {
register(name, func, deterministic = true, distinctLike = false)
}
/**
* Registers a Scala closure of 0 arguments as user-defined function (UDF).
* @tparam RT return type of UDF.
* @since 2.3.0
*/
def register[RT: TypeTag](name: String, func: Function0[RT], deterministic: Boolean, distinctLike: Boolean): UserDefinedFunction = {
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
val inputTypes = Try(Nil).toOption
def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, deterministic, distinctLike)
functionRegistry.registerFunction(name, builder)
val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
val withDeterminism = if (!deterministic) udf.nonDeterministic() else udf
val withDistinctLike = if (distinctLike) withDeterminism.withDistinctLike() else withDeterminism
withDistinctLike
}
| this._nameOption = Option(name) | ||
| this | ||
| val udf = copyAll() | ||
| udf._nameOption = Option(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @maropu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know your intention here (you probably mean we should not update values even in var variables) though, is it okay that the code below has four times object allocation in the worst case? I'm a bit worried about this point;
val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name).withNullability(nullable)
val withDeterminism = if (!deterministic) udf.nonDeterministic() else udf
val withDistinctLike = if (distinctLike) withDeterminism.withDistinctLike() else withDeterminism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu We should make a copy when calling withName, instead of returning this object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I know. I just meant we added an interface newInstance(name, nullable, determinism) there.
|
Test build #76986 has finished for PR 17848 at commit
|
|
Test build #79130 has finished for PR 17848 at commit
|
|
Test build #79228 has finished for PR 17848 at commit
|
|
Test build #79594 has finished for PR 17848 at commit
|
| registerUDF(name, func, deterministic = false) | ||
| } | ||
|
|
||
| private def registerUDF[$typeTags](name: String, func: Function$x[$types], deterministic: Boolean): UserDefinedFunction = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern is we have many public functions with different names that are doing the similar things.
| */ | ||
| def nullable: Boolean = _nullable | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns true iff the UDF is deterministic, i.e. the UDF produces the same output given the same input.
|
Test build #79776 has finished for PR 17848 at commit
|
|
Test build #79779 has finished for PR 17848 at commit
|
|
Test build #79780 has finished for PR 17848 at commit
|
| */ | ||
| def withNullability(nullable: Boolean): UserDefinedFunction = { | ||
| def asNonNullabe(): UserDefinedFunction = { | ||
| if (nullable == _nullable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (!nullable)
| * API (i.e. of type UserDefinedFunction). | ||
| * Registers a user-defined function (UDF), for a UDF that's already defined using the Dataset | ||
| * API (i.e. of type UserDefinedFunction). To change a UDF to nondeterministic, call the API | ||
| * `UserDefinedFunction.asNondeterministic()`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also mention how to turn the UDF to be non-nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a good example will be
val foo = udf(() => { "hello" })
spark.udf.register("stringConstant", foo.asNonNullable())
Although the return type of the UDF is String and is nullable, but we know that this UDF will never return null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do
| | * Registers a user-defined function with ${i} arguments. | ||
| | * @since 2.3.0 | ||
| | */ | ||
| |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType, deterministic: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? I think for java UDF we can also build a UserDefiendFunction first and call asNondeterminstic or asNonNullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW although we don't have def udf(f: UDF1[_, _]): UserDefinedFunction APIs, we do have def udf(f: AnyRef, dataType: DataType): UserDefinedFunction which can be used for java udf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far, the impl of def udf(f: AnyRef, dataType: DataType) does not support Java UDF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we have to add new APIs, why not we add a bunch of def udf(f: UDF1[_, _]): UserDefinedFunction instead of a bunch of def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType, deterministic: Boolean)?
| * @param deterministic True if the UDF is deterministic. Deterministic UDF returns same result | ||
| * each time it is invoked with a particular input. | ||
| */ | ||
| private[sql] def registerJava( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a new method? it's private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for PySpark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh... We can feel free to make the change on the interface. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, our JAVA API can directly use it. To JAVA APIs, they are not private at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then let's remove the private[sql] and add since tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW can we use default parameter? will it break java compatibility?
| /** | ||
| * Defines a user-defined function (UDF) using a Scala closure. For this variant, the caller must | ||
| * specify the output data type, and there is no automatic input type coercion. | ||
| * Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not only scala closure, I think java UDF class is also supported here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately nope, although we accept AnyRef.
java.lang.ClassCastException: java.lang.Class cannot be cast to scala.Function1
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:92)
at org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:70)
at org.apache.spark.sql.UDFRegistration.org$apache$spark$sql$UDFRegistration$$builder$2(UDFRegistration.scala:99)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn...
|
Test build #79810 has started for PR 17848 at commit |
|
let's leave the java UDF API unchanged and think about whether we should add java UDF API in |
|
LGTM BTW. |
|
Thanks! @cloud-fan |
|
Test build #79935 has finished for PR 17848 at commit
|
|
Test build #79936 has finished for PR 17848 at commit
|
|
Test build #79942 has finished for PR 17848 at commit
|
|
Thanks! Merging to master. |

What changes were proposed in this pull request?
Like Hive UDFType, we should allow users to add the extra flags for ScalaUDF and JavaUDF too. stateful/impliesOrder are not applicable to our Scala UDF. Thus, we only add the following two flags.
When the deterministic flag is not correctly set, the results could be wrong.
For ScalaUDF in Dataset APIs, users can call the following extra APIs for
UserDefinedFunctionto make the corresponding changes.nonDeterministic: Updates UserDefinedFunction to non-deterministic.Also fixed the Java UDF name loss issue.
Will submit a separate PR for
distinctLikefor UDAFHow was this patch tested?
Added test cases for both ScalaUDF