From bcc4b3cdb04c359b3cc9d887a1a0ebd5c13bf78b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 12 May 2016 17:25:51 -0700 Subject: [PATCH 1/5] [SPARK-15282][SQL] UDF funtion is not always deterministic and need to be evaluated once. --- .../sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../optimizer/FilterPushdownSuite.scala | 14 ++++++++++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 21 +++++++++++-------- .../scala/org/apache/spark/sql/UDFSuite.scala | 6 ++++-- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 0038cf65e2993..e2c75f8015df6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -42,7 +42,7 @@ case class ScalaUDF( extends Expression with ImplicitCastInputTypes with NonSQLExpression { override def nullable: Boolean = true - + override def deterministic: Boolean = false override def toString: String = s"UDF(${children.mkString(", ")})" // scalastyle:off line.size.limit diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fcc14a803beaa..0b4471c59551c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{DoubleType, IntegerType} class FilterPushdownSuite extends PlanTest { @@ -133,6 +133,18 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } + test("SPARK-15282: can't push down filter with udf column through project") { + // UDF function might be non-deterministic + val originalQuery = testRelation + .select(ScalaUDF(() => Math.random(), DoubleType, Nil, Nil).as('udf), 'a) + .where('a > 5) + .analyze + + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + test("filters: combines filters") { val originalQuery = testRelation .select('a) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b1f848fdc89a3..777916862ef17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1232,7 +1232,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "SELECT name, salary FROM personWithMeta JOIN salary ON id = personId")) } - test("SPARK-3371 Renaming a function expression with group by gives error") { + // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. + ignore("SPARK-3371 Renaming a function expression with group by gives error") { spark.udf.register("len", (s: String) => s.length) checkAnswer( sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), @@ -2052,6 +2053,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + // SPARK-15282: If UDF is nondeterministic, we need to call every time. test("Common subexpression elimination") { // TODO: support subexpression elimination in whole stage codegen withSQLConf("spark.sql.codegen.wholeStage" -> "false") { @@ -2082,30 +2084,31 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } verifyCallCount(df.selectExpr("testUdf(a)"), Row(1), 1) - verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1) - verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a + 1)"), Row(2, 2), 1) + verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) + verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a + 1)"), Row(2, 2), 2) verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a)"), Row(2, 1), 2) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(a + 1)", "testUdf(a + 1)"), Row(4, 2), 1) + df.selectExpr("testUdf(a + 1) + testUdf(a + 1)", "testUdf(a + 1)"), Row(4, 2), 3) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(1 + b)", "testUdf(a + 1)"), Row(4, 2), 2) + df.selectExpr("testUdf(a + 1) + testUdf(1 + b)", "testUdf(a + 1)"), Row(4, 2), 3) val testUdf = functions.udf((x: Int) => { countAcc.++=(1) x }) - verifyCallCount( - df.groupBy().agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) + // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. + // verifyCallCount( + // df.groupBy().agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 3) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 3) // Try disabling it via configuration. spark.conf.set("spark.sql.subexpressionElimination.enabled", "false") verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) spark.conf.set("spark.sql.subexpressionElimination.enabled", "true") - verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1) + verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 547d3c1abe858..6ae1fc8c7c9e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -133,7 +133,8 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(result.count() === 2) } - test("UDF in a GROUP BY") { + // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. + ignore("UDF in a GROUP BY") { spark.udf.register("groupFunction", (n: Int) => { n > 10 }) val df = Seq(("red", 1), ("red", 2), ("blue", 10), @@ -150,7 +151,8 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(result.count() === 2) } - test("UDFs everywhere") { + // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. + ignore("UDFs everywhere") { spark.udf.register("groupFunction", (n: Int) => { n > 10 }) spark.udf.register("havingFilter", (n: Long) => { n > 2000 }) spark.udf.register("whereFilter", (n: Int) => { n < 150 }) From 3fe691be0bcf19fe1a85b08c4552bf6cb2ea537f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 20 May 2016 09:55:59 -0700 Subject: [PATCH 2/5] Revert the change on ScalaUDF. Instead change PushDownPredicate optimizer. --- .../sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../sql/catalyst/optimizer/Optimizer.scala | 5 ++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 21 ++++++++----------- .../scala/org/apache/spark/sql/UDFSuite.scala | 6 ++---- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index e2c75f8015df6..0038cf65e2993 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -42,7 +42,7 @@ case class ScalaUDF( extends Expression with ImplicitCastInputTypes with NonSQLExpression { override def nullable: Boolean = true - override def deterministic: Boolean = false + override def toString: String = s"UDF(${children.mkString(", ")})" // scalastyle:off line.size.limit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5e998d61880ea..3f83fab9d6274 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1026,7 +1026,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // state and all the input rows processed before. In another word, the order of input rows // matters for non-deterministic expressions, while pushing down predicates changes the order. case filter @ Filter(condition, project @ Project(fields, grandChild)) - if fields.forall(_.deterministic) => + if fields.forall(_.deterministic) && + fields.forall(_.find(_.isInstanceOf[ScalaUDF]).isEmpty) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). @@ -1036,6 +1037,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) + case filter @ Filter(_, Project(_, _)) => filter + // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be // pushed beneath must satisfy the following two conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 777916862ef17..b1f848fdc89a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1232,8 +1232,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "SELECT name, salary FROM personWithMeta JOIN salary ON id = personId")) } - // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. - ignore("SPARK-3371 Renaming a function expression with group by gives error") { + test("SPARK-3371 Renaming a function expression with group by gives error") { spark.udf.register("len", (s: String) => s.length) checkAnswer( sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), @@ -2053,7 +2052,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - // SPARK-15282: If UDF is nondeterministic, we need to call every time. test("Common subexpression elimination") { // TODO: support subexpression elimination in whole stage codegen withSQLConf("spark.sql.codegen.wholeStage" -> "false") { @@ -2084,31 +2082,30 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } verifyCallCount(df.selectExpr("testUdf(a)"), Row(1), 1) - verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) - verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a + 1)"), Row(2, 2), 2) + verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1) + verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a + 1)"), Row(2, 2), 1) verifyCallCount(df.selectExpr("testUdf(a + 1)", "testUdf(a)"), Row(2, 1), 2) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(a + 1)", "testUdf(a + 1)"), Row(4, 2), 3) + df.selectExpr("testUdf(a + 1) + testUdf(a + 1)", "testUdf(a + 1)"), Row(4, 2), 1) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(1 + b)", "testUdf(a + 1)"), Row(4, 2), 3) + df.selectExpr("testUdf(a + 1) + testUdf(1 + b)", "testUdf(a + 1)"), Row(4, 2), 2) val testUdf = functions.udf((x: Int) => { countAcc.++=(1) x }) - // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. - // verifyCallCount( - // df.groupBy().agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 3) + verifyCallCount( + df.groupBy().agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) verifyCallCount( - df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 3) + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) // Try disabling it via configuration. spark.conf.set("spark.sql.subexpressionElimination.enabled", "false") verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) spark.conf.set("spark.sql.subexpressionElimination.enabled", "true") - verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2) + verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 6ae1fc8c7c9e5..547d3c1abe858 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -133,8 +133,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(result.count() === 2) } - // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. - ignore("UDF in a GROUP BY") { + test("UDF in a GROUP BY") { spark.udf.register("groupFunction", (n: Int) => { n > 10 }) val df = Seq(("red", 1), ("red", 2), ("blue", 10), @@ -151,8 +150,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(result.count() === 2) } - // SPARK-15282: If UDF is nondeterministic, we can not use that in GROUP BY. - ignore("UDFs everywhere") { + test("UDFs everywhere") { spark.udf.register("groupFunction", (n: Int) => { n > 10 }) spark.udf.register("havingFilter", (n: Long) => { n > 2000 }) spark.udf.register("whereFilter", (n: Int) => { n < 150 }) From e183fa4a668bfecba3bc48c4947f760559e783a4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 22 May 2016 13:50:33 -0700 Subject: [PATCH 3/5] Revert code changes. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 +---- .../catalyst/optimizer/FilterPushdownSuite.scala | 14 +------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3f83fab9d6274..5e998d61880ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1026,8 +1026,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // state and all the input rows processed before. In another word, the order of input rows // matters for non-deterministic expressions, while pushing down predicates changes the order. case filter @ Filter(condition, project @ Project(fields, grandChild)) - if fields.forall(_.deterministic) && - fields.forall(_.find(_.isInstanceOf[ScalaUDF]).isEmpty) => + if fields.forall(_.deterministic) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). @@ -1037,8 +1036,6 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) - case filter @ Filter(_, Project(_, _)) => filter - // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be // pushed beneath must satisfy the following two conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0b4471c59551c..fcc14a803beaa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.{DoubleType, IntegerType} +import org.apache.spark.sql.types.IntegerType class FilterPushdownSuite extends PlanTest { @@ -133,18 +133,6 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("SPARK-15282: can't push down filter with udf column through project") { - // UDF function might be non-deterministic - val originalQuery = testRelation - .select(ScalaUDF(() => Math.random(), DoubleType, Nil, Nil).as('udf), 'a) - .where('a > 5) - .analyze - - val optimized = Optimize.execute(originalQuery) - - comparePlans(optimized, originalQuery) - } - test("filters: combines filters") { val originalQuery = testRelation .select('a) From 880d80fa2ef08bb412805d60577810eb7bff879a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 22 May 2016 14:13:03 -0700 Subject: [PATCH 4/5] Add notes on udf functions. --- python/pyspark/sql/functions.py | 1 + .../org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala | 1 + sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 1 + sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 1 + .../src/main/scala/org/apache/spark/sql/UDFRegistration.scala | 1 + .../org/apache/spark/sql/expressions/UserDefinedFunction.scala | 1 + .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 1 + 7 files changed, 7 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index dac842c0ce8c0..aedf0a3c78ead 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1756,6 +1756,7 @@ def __call__(self, *cols): @since(1.3) def udf(f, returnType=StringType()): """Creates a :class:`Column` expression representing a user defined function (UDF). + Note that the user-defined functions should be deterministic. >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 0038cf65e2993..13320a6312cf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.DataType /** * User-defined function. + * Note that the user-defined functions should be deterministic. * @param function The user defined scala function to run. * Note that if you use primitive parameters, you are not able to check if it is * null or not, and the UDF will return null for you if the primitive input is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 14d12d30bc0b3..677e25a92bc40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -199,6 +199,7 @@ class SQLContext private[sql]( /** * A collection of methods for registering user-defined functions (UDF). + * Note that the user-defined functions should be deterministic. * * The following example registers a Scala closure as UDF: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f697769bdcdb5..d1b6cf428b671 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -145,6 +145,7 @@ class SparkSession private( /** * A collection of methods for registering user-defined functions (UDF). + * Note that the user-defined functions should be deterministic. * * The following example registers a Scala closure as UDF: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 3a043dcc6af22..e427ef84affa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.DataType /** * Functions for registering user-defined functions. Use [[SQLContext.udf]] to access this. + * Note that the user-defined functions should be deterministic. * * @since 1.3.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index bd35d19aa20bb..d3c5d5b8c1035 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.types.DataType /** * A user-defined function. To create one, use the `udf` functions in [[functions]]. + * Note that the user-defined functions should be deterministic. * As an example: * {{{ * // Defined a UDF that returns true or false based on some numeric score. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 939b9195cae99..a79c23c1d81d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -100,6 +100,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { /** * Interface exposed to the user for registering user-defined functions. + * Note that the user-defined functions should be deterministic. */ lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry) From c1f92c700b8de8bb9a4fc8116872544ed296d0d6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 23 May 2016 11:04:45 -0700 Subject: [PATCH 5/5] Address comments. --- python/pyspark/sql/functions.py | 4 +++- .../org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 +++- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 +++- .../src/main/scala/org/apache/spark/sql/UDFRegistration.scala | 2 +- .../apache/spark/sql/expressions/UserDefinedFunction.scala | 4 +++- .../scala/org/apache/spark/sql/internal/SessionState.scala | 2 +- 7 files changed, 15 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index aedf0a3c78ead..716b16fdc9530 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1756,7 +1756,9 @@ def __call__(self, *cols): @since(1.3) def udf(f, returnType=StringType()): """Creates a :class:`Column` expression representing a user defined function (UDF). - Note that the user-defined functions should be deterministic. + Note that the user-defined functions must be deterministic. Due to optimization, + duplicate invocations may be eliminated or the function may even be invoked more times than + it is present in the query. >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 13320a6312cf0..21390644bc0b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types.DataType /** * User-defined function. - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. * @param function The user defined scala function to run. * Note that if you use primitive parameters, you are not able to check if it is * null or not, and the UDF will return null for you if the primitive input is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 677e25a92bc40..7013e316ead83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -199,7 +199,9 @@ class SQLContext private[sql]( /** * A collection of methods for registering user-defined functions (UDF). - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. Due to optimization, + * duplicate invocations may be eliminated or the function may even be invoked more times than + * it is present in the query. * * The following example registers a Scala closure as UDF: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d1b6cf428b671..5c87c844185c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -145,7 +145,9 @@ class SparkSession private( /** * A collection of methods for registering user-defined functions (UDF). - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. Due to optimization, + * duplicate invocations may be eliminated or the function may even be invoked more times than + * it is present in the query. * * The following example registers a Scala closure as UDF: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index e427ef84affa5..b006236481a29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.DataType /** * Functions for registering user-defined functions. Use [[SQLContext.udf]] to access this. - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. * * @since 1.3.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index d3c5d5b8c1035..49fdec57558e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.types.DataType /** * A user-defined function. To create one, use the `udf` functions in [[functions]]. - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. Due to optimization, + * duplicate invocations may be eliminated or the function may even be invoked more times than + * it is present in the query. * As an example: * {{{ * // Defined a UDF that returns true or false based on some numeric score. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index a79c23c1d81d9..c9cc2ba04a413 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -100,7 +100,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { /** * Interface exposed to the user for registering user-defined functions. - * Note that the user-defined functions should be deterministic. + * Note that the user-defined functions must be deterministic. */ lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)