diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 3beef6b1df45..04a6a8f8aa9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -118,9 +118,12 @@ private[hive] object HiveShim { * * @param functionClassName UDF class name * @param instance optional UDF instance which contains additional information (for macro) + * @param clazz optional class instance to create UDF instance */ - private[hive] case class HiveFunctionWrapper(var functionClassName: String, - private var instance: AnyRef = null) extends java.io.Externalizable { + private[hive] case class HiveFunctionWrapper( + var functionClassName: String, + private var instance: AnyRef = null, + private var clazz: Class[_ <: AnyRef] = null) extends java.io.Externalizable { // for Serialization def this() = this(null) @@ -232,8 +235,10 @@ private[hive] object HiveShim { in.readFully(functionInBytes) // deserialize the function object via Hive Utilities + clazz = Utils.getContextOrSparkClassLoader.loadClass(functionClassName) + .asInstanceOf[Class[_ <: AnyRef]] instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes), - Utils.getContextOrSparkClassLoader.loadClass(functionClassName)) + clazz) } } @@ -241,8 +246,11 @@ private[hive] object HiveShim { if (instance != null) { instance.asInstanceOf[UDFType] } else { - val func = Utils.getContextOrSparkClassLoader - .loadClass(functionClassName).getConstructor().newInstance().asInstanceOf[UDFType] + if (clazz == null) { + clazz = Utils.getContextOrSparkClassLoader.loadClass(functionClassName) + .asInstanceOf[Class[_ <: AnyRef]] + } + val func = clazz.getConstructor().newInstance().asInstanceOf[UDFType] if (!func.isInstanceOf[UDF]) { // We cache the function if it's no the Simple UDF, // as we always have to create new instance for Simple UDF diff --git a/sql/hive/src/test/noclasspath/TestUDTF-spark-26560.jar b/sql/hive/src/test/noclasspath/TestUDTF-spark-26560.jar deleted file mode 100644 index b73b17d5c788..000000000000 Binary files a/sql/hive/src/test/noclasspath/TestUDTF-spark-26560.jar and /dev/null differ diff --git a/sql/hive/src/test/noclasspath/hive-test-udfs.jar b/sql/hive/src/test/noclasspath/hive-test-udfs.jar new file mode 100644 index 000000000000..a5bfa456f668 Binary files /dev/null and b/sql/hive/src/test/noclasspath/hive-test-udfs.jar differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala new file mode 100644 index 000000000000..ee8e6f4f78be --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.util.Utils + +class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + case class UDFTestInformation( + identifier: String, + funcName: String, + className: String, + fnVerifyQuery: () => Unit, + fnCreateHiveUDFExpression: () => Expression) + + private val udfTestInfos: Seq[UDFTestInformation] = Array( + // UDF + // UDFExampleAdd2 is slightly modified version of UDFExampleAdd in hive/contrib, + // which adds two integers or doubles. + UDFTestInformation( + "UDF", + "udf_add2", + "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2", + () => { + checkAnswer(sql("SELECT udf_add2(1, 2)"), Row(3) :: Nil) + }, + () => { + HiveSimpleUDF( + "default.udf_add2", + HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"), + Array( + AttributeReference("a", IntegerType, nullable = false)(), + AttributeReference("b", IntegerType, nullable = false)())) + }), + + // GenericUDF + // GenericUDFTrim2 is cloned version of GenericUDFTrim in hive/contrib. + UDFTestInformation( + "GENERIC_UDF", + "generic_udf_trim2", + "org.apache.hadoop.hive.contrib.udf.example.GenericUDFTrim2", + () => { + checkAnswer(sql("SELECT generic_udf_trim2(' hello ')"), Row("hello") :: Nil) + }, + () => { + HiveGenericUDF( + "default.generic_udf_trim2", + HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udf.example.GenericUDFTrim2"), + Array(AttributeReference("a", StringType, nullable = false)()) + ) + } + ), + + // AbstractGenericUDAFResolver + // GenericUDAFSum2 is cloned version of GenericUDAFSum in hive/exec. + UDFTestInformation( + "GENERIC_UDAF", + "generic_udaf_sum2", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum2", + () => { + import spark.implicits._ + val df = Seq((0: Integer) -> 0, (1: Integer) -> 1, (2: Integer) -> 2, (3: Integer) -> 3) + .toDF("key", "value").createOrReplaceTempView("t") + checkAnswer(sql("SELECT generic_udaf_sum2(value) FROM t GROUP BY key % 2"), + Row(2) :: Row(4) :: Nil) + }, + () => { + HiveUDAFFunction( + "default.generic_udaf_sum2", + HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum2"), + Array(AttributeReference("a", IntegerType, nullable = false)()) + ) + } + ), + + // UDAF + // UDAFExampleMax2 is cloned version of UDAFExampleMax in hive/contrib. + UDFTestInformation( + "UDAF", + "udaf_max2", + "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax2", + () => { + import spark.implicits._ + val df = Seq((0: Integer) -> 0, (1: Integer) -> 1, (2: Integer) -> 2, (3: Integer) -> 3) + .toDF("key", "value").createOrReplaceTempView("t") + checkAnswer(sql("SELECT udaf_max2(value) FROM t GROUP BY key % 2"), + Row(2) :: Row(3) :: Nil) + }, + () => { + HiveUDAFFunction( + "default.udaf_max2", + HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax2"), + Array(AttributeReference("a", IntegerType, nullable = false)()), + isUDAFBridgeRequired = true + ) + } + ), + + // GenericUDTF + // GenericUDTFCount3 is slightly modified version of GenericUDTFCount2 in hive/contrib, + // which emits the count for three times. + UDFTestInformation( + "GENERIC_UDTF", + "udtf_count3", + "org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3", + () => { + checkAnswer( + sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Row(3) :: Nil) + }, + () => { + HiveGenericUDTF( + "default.udtf_count3", + HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"), + Array.empty[Expression] + ) + } + ) + ) + + udfTestInfos.foreach { udfInfo => + // The test jars are built from below commit: + // https://github.com/HeartSaVioR/hive/commit/12f3f036b6efd0299cd1d457c0c0a65e0fd7e5f2 + // which contain new UDF classes to be dynamically loaded and tested via Spark. + + // This jar file should not be placed to the classpath. + val jarPath = "src/test/noclasspath/hive-test-udfs.jar" + val jarUrl = s"file://${System.getProperty("user.dir")}/$jarPath" + + test("Spark should be able to run Hive UDF using jar regardless of " + + s"current thread context classloader (${udfInfo.identifier}") { + Utils.withContextClassLoader(Utils.getSparkClassLoader) { + withUserDefinedFunction(udfInfo.funcName -> false) { + val sparkClassLoader = Thread.currentThread().getContextClassLoader + + sql(s"CREATE FUNCTION ${udfInfo.funcName} AS '${udfInfo.className}' USING JAR '$jarUrl'") + + assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) + + // JAR will be loaded at first usage, and it will change the current thread's + // context classloader to jar classloader in sharedState. + // See SessionState.addJar for details. + udfInfo.fnVerifyQuery() + + assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) + assert(Thread.currentThread().getContextClassLoader eq + spark.sqlContext.sharedState.jarClassLoader) + + val udfExpr = udfInfo.fnCreateHiveUDFExpression() + // force initializing - this is what we do in HiveSessionCatalog + udfExpr.dataType + + // Roll back to the original classloader and run query again. Without this line, the test + // would pass, as thread's context classloader is changed to jar classloader. But thread + // context classloader can be changed from others as well which would fail the query; one + // example is spark-shell, which thread context classloader rolls back automatically. This + // mimics the behavior of spark-shell. + Thread.currentThread().setContextClassLoader(sparkClassLoader) + + udfInfo.fnVerifyQuery() + + val newExpr = udfExpr.makeCopy(udfExpr.productIterator.map(_.asInstanceOf[AnyRef]) + .toArray) + newExpr.dataType + } + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 539b46474346..65c1db553e0d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2492,51 +2492,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } - - test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " + - "current thread context classloader") { - // force to use Spark classloader as other test (even in other test suites) may change the - // current thread's context classloader to jar classloader - Utils.withContextClassLoader(Utils.getSparkClassLoader) { - withUserDefinedFunction("udtf_count3" -> false) { - val sparkClassLoader = Thread.currentThread().getContextClassLoader - - // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly - // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for - // three times. - val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar" - val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" - - sql( - s""" - |CREATE FUNCTION udtf_count3 - |AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3' - |USING JAR '$jarURL' - """.stripMargin) - - assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) - - // JAR will be loaded at first usage, and it will change the current thread's - // context classloader to jar classloader in sharedState. - // See SessionState.addJar for details. - checkAnswer( - sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Row(3) :: Nil) - - assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) - assert(Thread.currentThread().getContextClassLoader eq - spark.sqlContext.sharedState.jarClassLoader) - - // Roll back to the original classloader and run query again. Without this line, the test - // would pass, as thread's context classloader is changed to jar classloader. But thread - // context classloader can be changed from others as well which would fail the query; one - // example is spark-shell, which thread context classloader rolls back automatically. This - // mimics the behavior of spark-shell. - Thread.currentThread().setContextClassLoader(sparkClassLoader) - checkAnswer( - sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Row(3) :: Nil) - } - } - } }