Skip to content

Commit 160cb61

Browse files
committed
add java UDF APIs in the functions object
1 parent a6fc300 commit 160cb61

File tree

5 files changed

+315
-112
lines changed

5 files changed

+315
-112
lines changed

sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.lang.reflect.{ParameterizedType, Type}
20+
import java.lang.reflect.ParameterizedType
2121

2222
import scala.reflect.runtime.universe.TypeTag
2323
import scala.util.Try
@@ -110,29 +110,29 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
110110

111111
/* register 0-22 were generated by this script
112112
113-
(0 to 22).map { x =>
113+
(0 to 22).foreach { x =>
114114
val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"})
115-
val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
115+
val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _)
116116
val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"})
117117
println(s"""
118-
/**
119-
* Registers a deterministic Scala closure of ${x} arguments as user-defined function (UDF).
120-
* @tparam RT return type of UDF.
121-
* @since 1.3.0
122-
*/
123-
def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = {
124-
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
125-
val inputTypes = Try($inputTypes).toOption
126-
def builder(e: Seq[Expression]) = if (e.length == $x) {
127-
ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
128-
} else {
129-
throw new AnalysisException("Invalid number of arguments for function " + name +
130-
". Expected: $x; Found: " + e.length)
131-
}
132-
functionRegistry.createOrReplaceTempFunction(name, builder)
133-
val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
134-
if (nullable) udf else udf.asNonNullable()
135-
}""")
118+
|/**
119+
| * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF).
120+
| * @tparam RT return type of UDF.
121+
| * @since 1.3.0
122+
| */
123+
|def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = {
124+
| val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
125+
| val inputTypes = Try($inputTypes).toOption
126+
| def builder(e: Seq[Expression]) = if (e.length == $x) {
127+
| ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
128+
| } else {
129+
| throw new AnalysisException("Invalid number of arguments for function " + name +
130+
| ". Expected: $x; Found: " + e.length)
131+
| }
132+
| functionRegistry.createOrReplaceTempFunction(name, builder)
133+
| val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name)
134+
| if (nullable) udf else udf.asNonNullable()
135+
|}""".stripMargin)
136136
}
137137
138138
(0 to 22).foreach { i =>
@@ -144,7 +144,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
144144
val funcCall = if (i == 0) "() => func" else "func"
145145
println(s"""
146146
|/**
147-
| * Register a user-defined function with ${i} arguments.
147+
| * Register a deterministic Java UDF$i instance as user-defined function (UDF).
148148
| * @since $version
149149
| */
150150
|def register(name: String, f: UDF$i[$extTypeArgs], returnType: DataType): Unit = {
@@ -689,7 +689,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
689689
}
690690

691691
/**
692-
* Register a user-defined function with 0 arguments.
692+
* Register a deterministic Java UDF0 instance as user-defined function (UDF).
693693
* @since 2.3.0
694694
*/
695695
def register(name: String, f: UDF0[_], returnType: DataType): Unit = {
@@ -704,7 +704,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
704704
}
705705

706706
/**
707-
* Register a user-defined function with 1 arguments.
707+
* Register a deterministic Java UDF1 instance as user-defined function (UDF).
708708
* @since 1.3.0
709709
*/
710710
def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
@@ -719,7 +719,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
719719
}
720720

721721
/**
722-
* Register a user-defined function with 2 arguments.
722+
* Register a deterministic Java UDF2 instance as user-defined function (UDF).
723723
* @since 1.3.0
724724
*/
725725
def register(name: String, f: UDF2[_, _, _], returnType: DataType): Unit = {
@@ -734,7 +734,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
734734
}
735735

736736
/**
737-
* Register a user-defined function with 3 arguments.
737+
* Register a deterministic Java UDF3 instance as user-defined function (UDF).
738738
* @since 1.3.0
739739
*/
740740
def register(name: String, f: UDF3[_, _, _, _], returnType: DataType): Unit = {
@@ -749,7 +749,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
749749
}
750750

751751
/**
752-
* Register a user-defined function with 4 arguments.
752+
* Register a deterministic Java UDF4 instance as user-defined function (UDF).
753753
* @since 1.3.0
754754
*/
755755
def register(name: String, f: UDF4[_, _, _, _, _], returnType: DataType): Unit = {
@@ -764,7 +764,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
764764
}
765765

766766
/**
767-
* Register a user-defined function with 5 arguments.
767+
* Register a deterministic Java UDF5 instance as user-defined function (UDF).
768768
* @since 1.3.0
769769
*/
770770
def register(name: String, f: UDF5[_, _, _, _, _, _], returnType: DataType): Unit = {
@@ -779,7 +779,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
779779
}
780780

781781
/**
782-
* Register a user-defined function with 6 arguments.
782+
* Register a deterministic Java UDF6 instance as user-defined function (UDF).
783783
* @since 1.3.0
784784
*/
785785
def register(name: String, f: UDF6[_, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -794,7 +794,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
794794
}
795795

796796
/**
797-
* Register a user-defined function with 7 arguments.
797+
* Register a deterministic Java UDF7 instance as user-defined function (UDF).
798798
* @since 1.3.0
799799
*/
800800
def register(name: String, f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -809,7 +809,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
809809
}
810810

811811
/**
812-
* Register a user-defined function with 8 arguments.
812+
* Register a deterministic Java UDF8 instance as user-defined function (UDF).
813813
* @since 1.3.0
814814
*/
815815
def register(name: String, f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -824,7 +824,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
824824
}
825825

826826
/**
827-
* Register a user-defined function with 9 arguments.
827+
* Register a deterministic Java UDF9 instance as user-defined function (UDF).
828828
* @since 1.3.0
829829
*/
830830
def register(name: String, f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -839,7 +839,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
839839
}
840840

841841
/**
842-
* Register a user-defined function with 10 arguments.
842+
* Register a deterministic Java UDF10 instance as user-defined function (UDF).
843843
* @since 1.3.0
844844
*/
845845
def register(name: String, f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -854,7 +854,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
854854
}
855855

856856
/**
857-
* Register a user-defined function with 11 arguments.
857+
* Register a deterministic Java UDF11 instance as user-defined function (UDF).
858858
* @since 1.3.0
859859
*/
860860
def register(name: String, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -869,7 +869,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
869869
}
870870

871871
/**
872-
* Register a user-defined function with 12 arguments.
872+
* Register a deterministic Java UDF12 instance as user-defined function (UDF).
873873
* @since 1.3.0
874874
*/
875875
def register(name: String, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -884,7 +884,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
884884
}
885885

886886
/**
887-
* Register a user-defined function with 13 arguments.
887+
* Register a deterministic Java UDF13 instance as user-defined function (UDF).
888888
* @since 1.3.0
889889
*/
890890
def register(name: String, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -899,7 +899,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
899899
}
900900

901901
/**
902-
* Register a user-defined function with 14 arguments.
902+
* Register a deterministic Java UDF14 instance as user-defined function (UDF).
903903
* @since 1.3.0
904904
*/
905905
def register(name: String, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -914,7 +914,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
914914
}
915915

916916
/**
917-
* Register a user-defined function with 15 arguments.
917+
* Register a deterministic Java UDF15 instance as user-defined function (UDF).
918918
* @since 1.3.0
919919
*/
920920
def register(name: String, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -929,7 +929,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
929929
}
930930

931931
/**
932-
* Register a user-defined function with 16 arguments.
932+
* Register a deterministic Java UDF16 instance as user-defined function (UDF).
933933
* @since 1.3.0
934934
*/
935935
def register(name: String, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -944,7 +944,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
944944
}
945945

946946
/**
947-
* Register a user-defined function with 17 arguments.
947+
* Register a deterministic Java UDF17 instance as user-defined function (UDF).
948948
* @since 1.3.0
949949
*/
950950
def register(name: String, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -959,7 +959,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
959959
}
960960

961961
/**
962-
* Register a user-defined function with 18 arguments.
962+
* Register a deterministic Java UDF18 instance as user-defined function (UDF).
963963
* @since 1.3.0
964964
*/
965965
def register(name: String, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -974,7 +974,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
974974
}
975975

976976
/**
977-
* Register a user-defined function with 19 arguments.
977+
* Register a deterministic Java UDF19 instance as user-defined function (UDF).
978978
* @since 1.3.0
979979
*/
980980
def register(name: String, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -989,7 +989,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
989989
}
990990

991991
/**
992-
* Register a user-defined function with 20 arguments.
992+
* Register a deterministic Java UDF20 instance as user-defined function (UDF).
993993
* @since 1.3.0
994994
*/
995995
def register(name: String, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -1004,7 +1004,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
10041004
}
10051005

10061006
/**
1007-
* Register a user-defined function with 21 arguments.
1007+
* Register a deterministic Java UDF21 instance as user-defined function (UDF).
10081008
* @since 1.3.0
10091009
*/
10101010
def register(name: String, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {
@@ -1019,7 +1019,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
10191019
}
10201020

10211021
/**
1022-
* Register a user-defined function with 22 arguments.
1022+
* Register a deterministic Java UDF22 instance as user-defined function (UDF).
10231023
* @since 1.3.0
10241024
*/
10251025
def register(name: String, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType: DataType): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ case class UserDefinedFunction protected[sql] (
6666
*
6767
* @since 1.3.0
6868
*/
69+
@scala.annotation.varargs
6970
def apply(exprs: Column*): Column = {
7071
Column(ScalaUDF(
7172
f,

0 commit comments

Comments
 (0)