From 177a01a5056e43ac26b8241fb14c3d35b1344049 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 2 Apr 2020 14:31:14 -0700 Subject: [PATCH 1/6] [SPARK-31331][SQL][DOCS] Document Spark integration with Hive UDFs/UDAFs/UDTFs --- docs/sql-ref-functions-udf-hive.md | 44 +++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md index 8698be707ea43..24a3972fbcb6e 100644 --- a/docs/sql-ref-functions-udf-hive.md +++ b/docs/sql-ref-functions-udf-hive.md @@ -19,4 +19,46 @@ license: | limitations under the License. --- -Integration with Hive UDFs/UDAFs/UDTFs \ No newline at end of file +### Description + +Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result. In addition, Hive also supports UDTFs (User Defined Tabular Functions) that act on one row as input and return multiple rows as output. To use Hive UDFs/UDAFs/UTFs, the user should register them in Spark, and then use them in Spark SQL queries. + +### Examples + + +

+// Register a Hive UDF and use it in Spark SQL
+// Scala
+sql(s"CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.spark.sql.hive.execution.PairUDF'")
+sql("SELECT testUDF(pair) FROM hiveUDFTestTable")
+
+// Register a Hive UDAF and use it in Spark SQL
+// Scala
+sql(
+    """
+    |CREATE TEMPORARY FUNCTION test_avg
+    |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage'
+    """.stripMargin)
+sql("SELECT test_avg(1), test_avg(substr(value,5)) FROM src")
+
+// Register a Hive UDTF and use it in Spark SQL
+// Scala
+// GenericUDTFCount2 outputs the number of rows seen, twice.
+// The function source code can be found at:
+// https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath}")
+sql(
+    """
+    |CREATE TEMPORARY FUNCTION udtf_count2
+    |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+    """.stripMargin)
+sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show
+
++----+
+|col1|
++----+
+|   1|
+|   1|
++----+
+
+
\ No newline at end of file From 3901cf66dc47a33d91ff4b8f25ea82b00046f8ad Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 2 Apr 2020 22:50:02 -0700 Subject: [PATCH 2/6] address comments --- docs/sql-ref-functions-udf-hive.md | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md index 24a3972fbcb6e..6d237eed7cae4 100644 --- a/docs/sql-ref-functions-udf-hive.md +++ b/docs/sql-ref-functions-udf-hive.md @@ -29,28 +29,31 @@ Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark U

 // Register a Hive UDF and use it in Spark SQL
 // Scala
-sql(s"CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.spark.sql.hive.execution.PairUDF'")
-sql("SELECT testUDF(pair) FROM hiveUDFTestTable")
+// include the JAR file containing mytest.hiveUDF implementation
+sql("CREATE TEMPORARY FUNCTION testUDF AS 'mytest.hiveUDF'")
+sql("SELECT testUDF(value) FROM hiveUDFTestTable")
 
 // Register a Hive UDAF and use it in Spark SQL
 // Scala
+// include the JAR file containing
+// org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
 sql(
     """
-    |CREATE TEMPORARY FUNCTION test_avg
-    |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage'
+      |CREATE TEMPORARY FUNCTION hive_max
+      |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'
     """.stripMargin)
-sql("SELECT test_avg(1), test_avg(substr(value,5)) FROM src")
+sql("SELECT key % 2, hive_max(key) FROM t GROUP BY key % 2")
 
 // Register a Hive UDTF and use it in Spark SQL
 // Scala
 // GenericUDTFCount2 outputs the number of rows seen, twice.
 // The function source code can be found at:
 // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
-sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath}")
+// include the JAR file containing GenericUDTFCount2 implementation
 sql(
     """
-    |CREATE TEMPORARY FUNCTION udtf_count2
-    |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+      |CREATE TEMPORARY FUNCTION udtf_count2
+      |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
     """.stripMargin)
 sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show
 
@@ -61,4 +64,4 @@ sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show
 |   1|
 +----+
 
-
\ No newline at end of file + From 100cea41b285757dc2b9228975ddbe8475d07b5a Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 6 Apr 2020 14:13:07 -0700 Subject: [PATCH 3/6] add jar --- docs/sql-ref-functions-udf-hive.md | 31 ++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md index 6d237eed7cae4..ff23174295240 100644 --- a/docs/sql-ref-functions-udf-hive.md +++ b/docs/sql-ref-functions-udf-hive.md @@ -30,19 +30,21 @@ Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark U // Register a Hive UDF and use it in Spark SQL // Scala // include the JAR file containing mytest.hiveUDF implementation -sql("CREATE TEMPORARY FUNCTION testUDF AS 'mytest.hiveUDF'") -sql("SELECT testUDF(value) FROM hiveUDFTestTable") +spark.sql("ADD JAR myHiveUDF.jar") +spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'mytest.hiveUDF'") +spark.sql("SELECT testUDF(value) FROM hiveUDFTestTable") // Register a Hive UDAF and use it in Spark SQL // Scala // include the JAR file containing // org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax -sql( - """ - |CREATE TEMPORARY FUNCTION hive_max - |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax' - """.stripMargin) -sql("SELECT key % 2, hive_max(key) FROM t GROUP BY key % 2") +spark.sql("ADD JAR myHiveUDAF.jar") +spark.sql( + """ + |CREATE TEMPORARY FUNCTION hive_max + |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax' + """.stripMargin) +spark.sql("SELECT key % 2, hive_max(key) FROM t GROUP BY key % 2") // Register a Hive UDTF and use it in Spark SQL // Scala @@ -50,12 +52,13 @@ sql("SELECT key % 2, hive_max(key) FROM t GROUP BY key % 2") // The function source code can be found at: // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF // include the JAR file containing GenericUDTFCount2 implementation -sql( - """ - |CREATE TEMPORARY FUNCTION udtf_count2 - |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - """.stripMargin) -sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show +spark.sql("ADD JAR myHiveUDTF.jar") +spark.sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) +spark.sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show +----+ |col1| From 52269f258391f442fb805459e7a5e8c268f2b0cd Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 6 Apr 2020 14:24:26 -0700 Subject: [PATCH 4/6] remove extra blanlk line --- docs/sql-ref-functions-udf-hive.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md index ff23174295240..3beb13ba18e01 100644 --- a/docs/sql-ref-functions-udf-hive.md +++ b/docs/sql-ref-functions-udf-hive.md @@ -25,7 +25,6 @@ Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark U ### Examples -

 // Register a Hive UDF and use it in Spark SQL
 // Scala

From bc87b372e2d359b6e2d102ab0f5eebfb65f54310 Mon Sep 17 00:00:00 2001
From: Takeshi Yamamuro 
Date: Tue, 7 Apr 2020 13:01:25 +0900
Subject: [PATCH 5/6] Fix

---
 docs/sql-ref-functions-udf-hive.md | 117 ++++++++++++++++++++---------
 1 file changed, 80 insertions(+), 37 deletions(-)

diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md
index 3beb13ba18e01..a3c21f78a6ce9 100644
--- a/docs/sql-ref-functions-udf-hive.md
+++ b/docs/sql-ref-functions-udf-hive.md
@@ -25,45 +25,88 @@ Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark U
 
 ### Examples
 
+Hive has two UDF interfaces: [UDF](https://github.com/apache/hive/blob/master/udf/src/java/org/apache/hadoop/hive/ql/exec/UDF.java) and [GenericUDF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java).
+An example below uses [GenericUDFAbs](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java) derived from `GenericUDF`.
+
+

+// Register `GenericUDFAbs` and use it in Spark SQL.
+// Note that, if you use your own programmed one, you need to add a JAR containig it into a classpath,
+// e.g., `spark.sql("ADD JAR yourHiveUDF.jar")`.
+spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'")
+
+spark.sql("SELECT * FROM hiveUDFTestTable").show()
+// +-----+
+// |value|
+// +-----+
+// | -1.0|
+// |  2.0|
+// | -3.0|
+// +-----+
+
+spark.sql("SELECT testUDF(value) FROM t").show()
+// +--------------+
+// |testUDF(value)|
+// +--------------+
+// |           1.0|
+// |           2.0|
+// |           3.0|
+// +--------------+
+
+ +An example below uses [GenericUDTFExplode](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java) derived from [GenericUDTF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java). +

-// Register a Hive UDF and use it in Spark SQL
-// Scala
-// include the JAR file containing mytest.hiveUDF implementation
-spark.sql("ADD JAR myHiveUDF.jar")
-spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'mytest.hiveUDF'")
-spark.sql("SELECT testUDF(value) FROM hiveUDFTestTable")
-
-// Register a Hive UDAF and use it in Spark SQL
-// Scala
-// include the JAR file containing
-// org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
-spark.sql("ADD JAR myHiveUDAF.jar")
+// Register `GenericUDTFExplode` and use it in Spark SQL
 spark.sql(
-          """
-            |CREATE TEMPORARY FUNCTION hive_max
-            |AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'
-          """.stripMargin)
-spark.sql("SELECT key % 2, hive_max(key) FROM t GROUP BY key % 2")
-
-// Register a Hive UDTF and use it in Spark SQL
-// Scala
-// GenericUDTFCount2 outputs the number of rows seen, twice.
-// The function source code can be found at:
-// https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
-// include the JAR file containing GenericUDTFCount2 implementation
-spark.sql("ADD JAR myHiveUDTF.jar")
+  """
+    |CREATE TEMPORARY FUNCTION hiveUDTF
+    |    AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'
+  """.stripMargin)
+
+spark.sql("SELECT * FROM t").show()
+// +------+
+// | value|
+// +------+
+// |[1, 2]|
+// |[3, 4]|
+// +------+
+
+spark.sql("SELECT hiveUDTF(value) FROM t").show()
+// +---+
+// |col|
+// +---+
+// |  1|
+// |  2|
+// |  3|
+// |  4|
+// +---+
+
+ +Hive has two UDAF interfaces: [UDAF](https://github.com/apache/hive/blob/master/udf/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java) and [GenericUDAFResolver](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFResolver.java). +An example below uses [GenericUDAFSum](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java) derived from `GenericUDAFResolver`. + +

+// Register `GenericUDAFSum` and use it in Spark SQL
 spark.sql(
-          """
-            |CREATE TEMPORARY FUNCTION udtf_count2
-            |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-          """.stripMargin)
-spark.sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a)").show
-
-+----+
-|col1|
-+----+
-|   1|
-|   1|
-+----+
+  """
+    |CREATE TEMPORARY FUNCTION hiveUDAF
+    |    AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum'
+  """.stripMargin)
+
+spark.sql("SELECT * FROM t").show()
+// +---+-----+
+// |key|value|
+// +---+-----+
+// |  a|    1|
+// |  a|    2|
+// |  b|    3|
+// +---+-----+
 
+spark.sql("SELECT key, hiveUDAF(value) FROM t GROUP BY key").show()
+// +---+---------------+
+// |key|hiveUDAF(value)|
+// +---+---------------+
+// |  b|              3|
+// |  a|              3|
+// +---+---------------+
 
From 946e41734a324692a7b4006b0fbd612bd8d08603 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 9 Apr 2020 00:50:12 -0700 Subject: [PATCH 6/6] change scala example to sql example --- docs/sql-ref-functions-udf-hive.md | 146 ++++++++++++++--------------- 1 file changed, 71 insertions(+), 75 deletions(-) diff --git a/docs/sql-ref-functions-udf-hive.md b/docs/sql-ref-functions-udf-hive.md index a3c21f78a6ce9..a87266dec2ea1 100644 --- a/docs/sql-ref-functions-udf-hive.md +++ b/docs/sql-ref-functions-udf-hive.md @@ -28,85 +28,81 @@ Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark U Hive has two UDF interfaces: [UDF](https://github.com/apache/hive/blob/master/udf/src/java/org/apache/hadoop/hive/ql/exec/UDF.java) and [GenericUDF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java). An example below uses [GenericUDFAbs](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFAbs.java) derived from `GenericUDF`. -

-// Register `GenericUDFAbs` and use it in Spark SQL.
-// Note that, if you use your own programmed one, you need to add a JAR containig it into a classpath,
-// e.g., `spark.sql("ADD JAR yourHiveUDF.jar")`.
-spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'")
-
-spark.sql("SELECT * FROM hiveUDFTestTable").show()
-// +-----+
-// |value|
-// +-----+
-// | -1.0|
-// |  2.0|
-// | -3.0|
-// +-----+
-
-spark.sql("SELECT testUDF(value) FROM t").show()
-// +--------------+
-// |testUDF(value)|
-// +--------------+
-// |           1.0|
-// |           2.0|
-// |           3.0|
-// +--------------+
-
+{% highlight sql %} +-- Register `GenericUDFAbs` and use it in Spark SQL. +-- Note that, if you use your own programmed one, you need to add a JAR containig it +-- into a classpath, +-- e.g., ADD JAR yourHiveUDF.jar; +CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'; + +SELECT * FROM t; + +-----+ + |value| + +-----+ + | -1.0| + | 2.0| + | -3.0| + +-----+ + +SELECT testUDF(value) FROM t; + +--------------+ + |testUDF(value)| + +--------------+ + | 1.0| + | 2.0| + | 3.0| + +--------------+ +{% endhighlight %} + An example below uses [GenericUDTFExplode](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java) derived from [GenericUDTF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java). -

-// Register `GenericUDTFExplode` and use it in Spark SQL
-spark.sql(
-  """
-    |CREATE TEMPORARY FUNCTION hiveUDTF
-    |    AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'
-  """.stripMargin)
-
-spark.sql("SELECT * FROM t").show()
-// +------+
-// | value|
-// +------+
-// |[1, 2]|
-// |[3, 4]|
-// +------+
-
-spark.sql("SELECT hiveUDTF(value) FROM t").show()
-// +---+
-// |col|
-// +---+
-// |  1|
-// |  2|
-// |  3|
-// |  4|
-// +---+
-
+{% highlight sql %} +-- Register `GenericUDTFExplode` and use it in Spark SQL +CREATE TEMPORARY FUNCTION hiveUDTF + AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'; + +SELECT * FROM t; + +------+ + | value| + +------+ + |[1, 2]| + |[3, 4]| + +------+ + +SELECT hiveUDTF(value) FROM t; + +---+ + |col| + +---+ + | 1| + | 2| + | 3| + | 4| + +---+ +{% endhighlight %} Hive has two UDAF interfaces: [UDAF](https://github.com/apache/hive/blob/master/udf/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java) and [GenericUDAFResolver](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFResolver.java). An example below uses [GenericUDAFSum](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java) derived from `GenericUDAFResolver`. -

-// Register `GenericUDAFSum` and use it in Spark SQL
-spark.sql(
-  """
-    |CREATE TEMPORARY FUNCTION hiveUDAF
-    |    AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum'
-  """.stripMargin)
-
-spark.sql("SELECT * FROM t").show()
-// +---+-----+
-// |key|value|
-// +---+-----+
-// |  a|    1|
-// |  a|    2|
-// |  b|    3|
-// +---+-----+
-
-spark.sql("SELECT key, hiveUDAF(value) FROM t GROUP BY key").show()
-// +---+---------------+
-// |key|hiveUDAF(value)|
-// +---+---------------+
-// |  b|              3|
-// |  a|              3|
-// +---+---------------+
-
+{% highlight sql %} +-- Register `GenericUDAFSum` and use it in Spark SQL +CREATE TEMPORARY FUNCTION hiveUDAF + AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum'; + +SELECT * FROM t; + +---+-----+ + |key|value| + +---+-----+ + | a| 1| + | a| 2| + | b| 3| + +---+-----+ + +SELECT key, hiveUDAF(value) FROM t GROUP BY key; + +---+---------------+ + |key|hiveUDAF(value)| + +---+---------------+ + | b| 3| + | a| 3| + +---+---------------+ +{% endhighlight %} \ No newline at end of file