From ae057565c01c13054ab12fc4db1dd4bef04de138 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 25 Jul 2017 22:31:27 +0800 Subject: [PATCH] revert java UDF changes --- python/pyspark/sql/context.py | 24 +- .../apache/spark/sql/UDFRegistration.scala | 388 +++++------------- .../org/apache/spark/sql/JavaRandUDF.java | 30 -- .../org/apache/spark/sql/JavaUDFSuite.java | 19 - .../scala/org/apache/spark/sql/UDFSuite.scala | 9 +- 5 files changed, 104 insertions(+), 366 deletions(-) delete mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/JavaRandUDF.java diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 19f10e98e052..c44ab247fd3d 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -28,7 +28,7 @@ from pyspark.sql.dataframe import DataFrame from pyspark.sql.readwriter import DataFrameReader from pyspark.sql.streaming import DataStreamReader -from pyspark.sql.types import DoubleType, IntegerType, Row, StringType +from pyspark.sql.types import IntegerType, Row, StringType from pyspark.sql.utils import install_exception_handler __all__ = ["SQLContext", "HiveContext", "UDFRegistration"] @@ -208,37 +208,29 @@ def registerFunction(self, name, f, returnType=StringType()): @ignore_unicode_prefix @since(2.1) - def registerJavaFunction(self, name, javaClassName, returnType=None, deterministic=True): + def registerJavaFunction(self, name, javaClassName, returnType=None): """Register a java UDF so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not specified we would infer it via reflection. - - :param name: name of the UDF. - :param javaClassName: fully qualified name of java class. - :param returnType: a :class:`pyspark.sql.types.DataType` object. - :param deterministic: a flag indicating if the UDF is deterministic. Deterministic UDF - returns same result each time it is invoked with a particular input. + :param name: name of the UDF + :param javaClassName: fully qualified name of java class + :param returnType: a :class:`pyspark.sql.types.DataType` object >>> sqlContext.registerJavaFunction("javaStringLength", ... "test.org.apache.spark.sql.JavaStringLength", IntegerType()) >>> sqlContext.sql("SELECT javaStringLength('test')").collect() - [Row(UDF:javaStringLength(test)=4)] + [Row(UDF(test)=4)] >>> sqlContext.registerJavaFunction("javaStringLength2", ... "test.org.apache.spark.sql.JavaStringLength") >>> sqlContext.sql("SELECT javaStringLength2('test')").collect() - [Row(UDF:javaStringLength2(test)=4)] - >>> sqlContext.registerJavaFunction("javaRand", - ... "test.org.apache.spark.sql.JavaRandUDF", DoubleType(), deterministic=False) - >>> sqlContext.sql("SELECT javaRand(3)").collect() # doctest: +SKIP - [Row(UDF:javaRand(3)=3.12345)] + [Row(UDF(test)=4)] """ jdt = None if returnType is not None: jdt = self.sparkSession._jsparkSession.parseDataType(returnType.json()) - self.sparkSession._jsparkSession.udf().registerJava( - name, javaClassName, jdt, deterministic) + self.sparkSession._jsparkSession.udf().registerJava(name, javaClassName, jdt) @ignore_unicode_prefix @since(2.3) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 99494d4e6781..52dba6ebbb2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -64,7 +64,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Register a user-defined aggregate function (UDAF). + * Registers a user-defined aggregate function (UDAF). * * @param name the name of the UDAF. * @param udaf the UDAF needs to be registered. @@ -81,12 +81,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends /** * Registers a user-defined function (UDF), for a UDF that's already defined using the Dataset * API (i.e. of type UserDefinedFunction). To change a UDF to nondeterministic, call the API - * `UserDefinedFunction.asNondeterministic()`. + * `UserDefinedFunction.asNondeterministic()`. To change a UDF to nonNullable, call the API + * `UserDefinedFunction.asNonNullabe()`. * * Example: * {{{ - * val foo = udf(() => { Math.random() }) + * val foo = udf(() => Math.random()) * spark.udf.register("random", foo.asNondeterministic()) + * + * val bar = udf(() => "bar") + * spark.udf.register("stringLit", bar.asNonNullabe()) * }}} * * @param name the name of the UDF. @@ -138,28 +142,20 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends val version = if (i == 0) "2.3.0" else "1.3.0" val funcCall = if (i == 0) "() => func" else "func" println(s""" - |/** - | * Registers a deterministic user-defined function with ${i} arguments. - | * @since $version - | */ - |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = { - | register(name, f, returnType, deterministic = true) - |} - | - |/** - | * Registers a user-defined function with ${i} arguments. - | * @since 2.3.0 - | */ - |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType, deterministic: Boolean): Unit = { - | val func = f$anyCast.call($anyParams) - | def builder(e: Seq[Expression]) = if (e.length == $i) { - | ScalaUDF($funcCall, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) - | } else { - | throw new AnalysisException("Invalid number of arguments for function " + name + - | ". Expected: $i; Found: " + e.length) - | } - | functionRegistry.createOrReplaceTempFunction(name, builder) - |}""".stripMargin) + |/** + | * Register a user-defined function with ${i} arguments. + | * @since $version + | */ + |def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = { + | val func = f$anyCast.call($anyParams) + | def builder(e: Seq[Expression]) = if (e.length == $i) { + | ScalaUDF($funcCall, returnType, e) + | } else { + | throw new AnalysisException("Invalid number of arguments for function " + name + + | ". Expected: $i; Found: " + e.length) + | } + | functionRegistry.createOrReplaceTempFunction(name, builder) + |}""".stripMargin) } */ @@ -604,7 +600,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends ////////////////////////////////////////////////////////////////////////////////////////////// /** - * Registers a Java UDF class using reflection, for use from pyspark + * Register a Java UDF class using reflection, for use from pyspark * * @param name udf name * @param className fully qualified class name of udf @@ -612,23 +608,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * via reflection. */ private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = { - registerJava(name, className, returnDataType, deterministic = true) - } - /** - * Registers a Java UDF class using reflection, for use from pyspark - * - * @param name udf name - * @param className fully qualified class name of udf - * @param returnDataType return type of udf. If it is null, spark would try to infer - * via reflection. - * @param deterministic True if the UDF is deterministic. Deterministic UDF returns same result - * each time it is invoked with a particular input. - */ - private[sql] def registerJava( - name: String, - className: String, - returnDataType: DataType, - deterministic: Boolean): Unit = { + try { val clazz = Utils.classForName(className) val udfInterfaces = clazz.getGenericInterfaces @@ -649,29 +629,29 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } udfInterfaces(0).getActualTypeArguments.length match { - case 1 => register(name, udf.asInstanceOf[UDF0[_]], returnType, deterministic) - case 2 => register(name, udf.asInstanceOf[UDF1[_, _]], returnType, deterministic) - case 3 => register(name, udf.asInstanceOf[UDF2[_, _, _]], returnType, deterministic) - case 4 => register(name, udf.asInstanceOf[UDF3[_, _, _, _]], returnType, deterministic) - case 5 => register(name, udf.asInstanceOf[UDF4[_, _, _, _, _]], returnType, deterministic) - case 6 => register(name, udf.asInstanceOf[UDF5[_, _, _, _, _, _]], returnType, deterministic) - case 7 => register(name, udf.asInstanceOf[UDF6[_, _, _, _, _, _, _]], returnType, deterministic) - case 8 => register(name, udf.asInstanceOf[UDF7[_, _, _, _, _, _, _, _]], returnType, deterministic) - case 9 => register(name, udf.asInstanceOf[UDF8[_, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 10 => register(name, udf.asInstanceOf[UDF9[_, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 11 => register(name, udf.asInstanceOf[UDF10[_, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 12 => register(name, udf.asInstanceOf[UDF11[_, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 13 => register(name, udf.asInstanceOf[UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 14 => register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 15 => register(name, udf.asInstanceOf[UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 16 => register(name, udf.asInstanceOf[UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 17 => register(name, udf.asInstanceOf[UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 18 => register(name, udf.asInstanceOf[UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 19 => register(name, udf.asInstanceOf[UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 20 => register(name, udf.asInstanceOf[UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 21 => register(name, udf.asInstanceOf[UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 22 => register(name, udf.asInstanceOf[UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) - case 23 => register(name, udf.asInstanceOf[UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType, deterministic) + case 1 => register(name, udf.asInstanceOf[UDF0[_]], returnType) + case 2 => register(name, udf.asInstanceOf[UDF1[_, _]], returnType) + case 3 => register(name, udf.asInstanceOf[UDF2[_, _, _]], returnType) + case 4 => register(name, udf.asInstanceOf[UDF3[_, _, _, _]], returnType) + case 5 => register(name, udf.asInstanceOf[UDF4[_, _, _, _, _]], returnType) + case 6 => register(name, udf.asInstanceOf[UDF5[_, _, _, _, _, _]], returnType) + case 7 => register(name, udf.asInstanceOf[UDF6[_, _, _, _, _, _, _]], returnType) + case 8 => register(name, udf.asInstanceOf[UDF7[_, _, _, _, _, _, _, _]], returnType) + case 9 => register(name, udf.asInstanceOf[UDF8[_, _, _, _, _, _, _, _, _]], returnType) + case 10 => register(name, udf.asInstanceOf[UDF9[_, _, _, _, _, _, _, _, _, _]], returnType) + case 11 => register(name, udf.asInstanceOf[UDF10[_, _, _, _, _, _, _, _, _, _, _]], returnType) + case 12 => register(name, udf.asInstanceOf[UDF11[_, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 13 => register(name, udf.asInstanceOf[UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 14 => register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 15 => register(name, udf.asInstanceOf[UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 16 => register(name, udf.asInstanceOf[UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 17 => register(name, udf.asInstanceOf[UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 18 => register(name, udf.asInstanceOf[UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 19 => register(name, udf.asInstanceOf[UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 20 => register(name, udf.asInstanceOf[UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 21 => register(name, udf.asInstanceOf[UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 22 => register(name, udf.asInstanceOf[UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) + case 23 => register(name, udf.asInstanceOf[UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType) case n => throw new AnalysisException(s"UDF class with $n type arguments is not supported.") } @@ -708,21 +688,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 0 arguments. + * Register a user-defined function with 0 arguments. * @since 2.3.0 */ def register(name: String, f: UDF0[_], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 0 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF0[_], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF0[Any]].call() def builder(e: Seq[Expression]) = if (e.length == 0) { - ScalaUDF(() => func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(() => func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 0; Found: " + e.length) @@ -731,21 +703,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 1 arguments. + * Register a user-defined function with 1 arguments. * @since 1.3.0 */ def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 1 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF1[_, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any) def builder(e: Seq[Expression]) = if (e.length == 1) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 1; Found: " + e.length) @@ -754,21 +718,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 2 arguments. + * Register a user-defined function with 2 arguments. * @since 1.3.0 */ def register(name: String, f: UDF2[_, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 2 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF2[_, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 2) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 2; Found: " + e.length) @@ -777,21 +733,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 3 arguments. + * Register a user-defined function with 3 arguments. * @since 1.3.0 */ def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 3 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF3[_, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 3) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 3; Found: " + e.length) @@ -800,21 +748,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 4 arguments. + * Register a user-defined function with 4 arguments. * @since 1.3.0 */ def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 4 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 4) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 4; Found: " + e.length) @@ -823,21 +763,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 5 arguments. + * Register a user-defined function with 5 arguments. * @since 1.3.0 */ def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 5 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 5) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 5; Found: " + e.length) @@ -846,21 +778,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 6 arguments. + * Register a user-defined function with 6 arguments. * @since 1.3.0 */ def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 6 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 6) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 6; Found: " + e.length) @@ -869,21 +793,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 7 arguments. + * Register a user-defined function with 7 arguments. * @since 1.3.0 */ def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 7 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 7) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 7; Found: " + e.length) @@ -892,21 +808,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 8 arguments. + * Register a user-defined function with 8 arguments. * @since 1.3.0 */ def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 8 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 8) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 8; Found: " + e.length) @@ -915,21 +823,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 9 arguments. + * Register a user-defined function with 9 arguments. * @since 1.3.0 */ def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 9 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 9) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 9; Found: " + e.length) @@ -938,21 +838,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 10 arguments. + * Register a user-defined function with 10 arguments. * @since 1.3.0 */ def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 10 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 10) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 10; Found: " + e.length) @@ -961,21 +853,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 11 arguments. + * Register a user-defined function with 11 arguments. * @since 1.3.0 */ def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 11 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 11) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 11; Found: " + e.length) @@ -984,21 +868,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 12 arguments. + * Register a user-defined function with 12 arguments. * @since 1.3.0 */ def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 12 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 12) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 12; Found: " + e.length) @@ -1007,21 +883,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 13 arguments. + * Register a user-defined function with 13 arguments. * @since 1.3.0 */ def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 13 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 13) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 13; Found: " + e.length) @@ -1030,21 +898,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 14 arguments. + * Register a user-defined function with 14 arguments. * @since 1.3.0 */ def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 14 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 14) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 14; Found: " + e.length) @@ -1053,21 +913,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 15 arguments. + * Register a user-defined function with 15 arguments. * @since 1.3.0 */ def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 15 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 15) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 15; Found: " + e.length) @@ -1076,21 +928,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 16 arguments. + * Register a user-defined function with 16 arguments. * @since 1.3.0 */ def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 16 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 16) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 16; Found: " + e.length) @@ -1099,21 +943,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 17 arguments. + * Register a user-defined function with 17 arguments. * @since 1.3.0 */ def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 17 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 17) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 17; Found: " + e.length) @@ -1122,21 +958,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 18 arguments. + * Register a user-defined function with 18 arguments. * @since 1.3.0 */ def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 18 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 18) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 18; Found: " + e.length) @@ -1145,21 +973,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 19 arguments. + * Register a user-defined function with 19 arguments. * @since 1.3.0 */ def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 19 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 19) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 19; Found: " + e.length) @@ -1168,21 +988,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 20 arguments. + * Register a user-defined function with 20 arguments. * @since 1.3.0 */ def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 20 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 20) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 20; Found: " + e.length) @@ -1191,21 +1003,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 21 arguments. + * Register a user-defined function with 21 arguments. * @since 1.3.0 */ def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 21 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 21) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 21; Found: " + e.length) @@ -1214,21 +1018,13 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } /** - * Registers a deterministic user-defined function with 22 arguments. + * Register a user-defined function with 22 arguments. * @since 1.3.0 */ def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = { - register(name, f, returnType, deterministic = true) - } - - /** - * Registers a user-defined function with 22 arguments. - * @since 2.3.0 - */ - def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType, deterministic: Boolean): Unit = { val func = f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) def builder(e: Seq[Expression]) = if (e.length == 22) { - ScalaUDF(func, returnType, e, inputTypes = Nil, udfName = Some(name), nullable = true, deterministic) + ScalaUDF(func, returnType, e) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 22; Found: " + e.length) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRandUDF.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaRandUDF.java deleted file mode 100644 index df806bc18288..000000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaRandUDF.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql; - -import org.apache.spark.sql.api.java.UDF1; - -/** - * It is used for register Java UDF from PySpark - */ -public class JavaRandUDF implements UDF1 { - @Override - public Double call(Integer i) { - return i + Math.random(); - } -} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java index 2521007dcda2..5bf188882618 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java @@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; @@ -122,22 +121,4 @@ public void udf6Test() { Row result = spark.sql("SELECT returnOne()").head(); Assert.assertEquals(1, result.getInt(0)); } - - @SuppressWarnings("unchecked") - @Test - public void udf7Test() { - spark.udf().registerJava("randUDF", JavaRandUDF.class.getName(), DataTypes.DoubleType, false); - Row result = spark.sql("SELECT randUDF(1)").head(); - Assert.assertTrue(result.getDouble(0) >= 0.0); - } - - @SuppressWarnings("unchecked") - @Test - public void udf8Test() { - spark.udf().register("randUDF", (Integer i) -> i + Math.random(), DataTypes.DoubleType, false); - Row result = spark.sql("SELECT randUDF(1)").head(); - Assert.assertTrue(result.getDouble(0) >= 0.0); - Assert.assertTrue("EXPLAIN outputs are expected to contain the UDF name.", - spark.sql("EXPLAIN SELECT randUDF(1) AS r").collectAsList().toString().contains("randUDF")); - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index bf688704836d..52f3fe2b1866 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -113,19 +113,18 @@ class UDFSuite extends QueryTest with SharedSQLContext { } test("ZeroArgument non-deterministic UDF") { - val foo = udf(() => { Math.random() }) + val foo = udf(() => Math.random()) spark.udf.register("random0", foo.asNondeterministic()) val df = sql("SELECT random0()") assert(df.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) assert(df.head().getDouble(0) >= 0.0) - val foo1 = udf(() => { Math.random() }).asNondeterministic() - val df1 = testData.select(foo1()) + val df1 = testData.select(foo()) assert(df1.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) assert(df1.head().getDouble(0) >= 0.0) - val foo2 = udf(() => { Math.random() }, DataTypes.DoubleType).asNondeterministic() - val df2 = testData.select(foo2()) + val bar = udf(() => Math.random(), DataTypes.DoubleType).asNondeterministic() + val df2 = testData.select(bar()) assert(df2.logicalPlan.asInstanceOf[Project].projectList.forall(!_.deterministic)) assert(df2.head().getDouble(0) >= 0.0) }