Skip to content

Commit 2a6aa8e

Browse files
HeartSaVioRcloud-fan
authored andcommitted
[SPARK-31312][SQL] Cache Class instance for the UDF instance in HiveFunctionWrapper
### What changes were proposed in this pull request? This patch proposes to cache Class instance for the UDF instance in HiveFunctionWrapper to fix the case where Hive simple UDF is somehow transformed (expression is copied) and evaluated later with another classloader (for the case current thread context classloader is somehow changed). In this case, Spark throws CNFE as of now. It's only occurred for Hive simple UDF, as HiveFunctionWrapper caches the UDF instance whereas it doesn't do for `UDF` type. The comment says Spark has to create instance every time for UDF, so we cannot simply do the same. This patch caches Class instance instead, and switch current thread context classloader to which loads the Class instance. This patch extends the test boundary as well. We only tested with GenericUDTF for SPARK-26560, and this patch actually requires only UDF. But to avoid regression for other types as well, this patch adds all available types (UDF, GenericUDF, AbstractGenericUDAFResolver, UDAF, GenericUDTF) into the boundary of tests. Credit to cloud-fan as he discovered the problem and proposed the solution. ### Why are the changes needed? Above section describes why it's a bug and how it's fixed. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? New UTs added. Closes #28079 from HeartSaVioR/SPARK-31312. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 8b01473 commit 2a6aa8e

File tree

5 files changed

+203
-52
lines changed

5 files changed

+203
-52
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,12 @@ private[hive] object HiveShim {
118118
*
119119
* @param functionClassName UDF class name
120120
* @param instance optional UDF instance which contains additional information (for macro)
121+
* @param clazz optional class instance to create UDF instance
121122
*/
122-
private[hive] case class HiveFunctionWrapper(var functionClassName: String,
123-
private var instance: AnyRef = null) extends java.io.Externalizable {
123+
private[hive] case class HiveFunctionWrapper(
124+
var functionClassName: String,
125+
private var instance: AnyRef = null,
126+
private var clazz: Class[_ <: AnyRef] = null) extends java.io.Externalizable {
124127

125128
// for Serialization
126129
def this() = this(null)
@@ -232,17 +235,22 @@ private[hive] object HiveShim {
232235
in.readFully(functionInBytes)
233236

234237
// deserialize the function object via Hive Utilities
238+
clazz = Utils.getContextOrSparkClassLoader.loadClass(functionClassName)
239+
.asInstanceOf[Class[_ <: AnyRef]]
235240
instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
236-
Utils.getContextOrSparkClassLoader.loadClass(functionClassName))
241+
clazz)
237242
}
238243
}
239244

240245
def createFunction[UDFType <: AnyRef](): UDFType = {
241246
if (instance != null) {
242247
instance.asInstanceOf[UDFType]
243248
} else {
244-
val func = Utils.getContextOrSparkClassLoader
245-
.loadClass(functionClassName).getConstructor().newInstance().asInstanceOf[UDFType]
249+
if (clazz == null) {
250+
clazz = Utils.getContextOrSparkClassLoader.loadClass(functionClassName)
251+
.asInstanceOf[Class[_ <: AnyRef]]
252+
}
253+
val func = clazz.getConstructor().newInstance().asInstanceOf[UDFType]
246254
if (!func.isInstanceOf[UDF]) {
247255
// We cache the function if it's no the Simple UDF,
248256
// as we always have to create new instance for Simple UDF
-7.29 KB
Binary file not shown.
34.8 KB
Binary file not shown.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.apache.spark.sql.{QueryTest, Row}
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
22+
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
23+
import org.apache.spark.sql.hive.test.TestHiveSingleton
24+
import org.apache.spark.sql.test.SQLTestUtils
25+
import org.apache.spark.sql.types.{IntegerType, StringType}
26+
import org.apache.spark.util.Utils
27+
28+
class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
29+
30+
case class UDFTestInformation(
31+
identifier: String,
32+
funcName: String,
33+
className: String,
34+
fnVerifyQuery: () => Unit,
35+
fnCreateHiveUDFExpression: () => Expression)
36+
37+
private val udfTestInfos: Seq[UDFTestInformation] = Array(
38+
// UDF
39+
// UDFExampleAdd2 is slightly modified version of UDFExampleAdd in hive/contrib,
40+
// which adds two integers or doubles.
41+
UDFTestInformation(
42+
"UDF",
43+
"udf_add2",
44+
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2",
45+
() => {
46+
checkAnswer(sql("SELECT udf_add2(1, 2)"), Row(3) :: Nil)
47+
},
48+
() => {
49+
HiveSimpleUDF(
50+
"default.udf_add2",
51+
HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"),
52+
Array(
53+
AttributeReference("a", IntegerType, nullable = false)(),
54+
AttributeReference("b", IntegerType, nullable = false)()))
55+
}),
56+
57+
// GenericUDF
58+
// GenericUDFTrim2 is cloned version of GenericUDFTrim in hive/contrib.
59+
UDFTestInformation(
60+
"GENERIC_UDF",
61+
"generic_udf_trim2",
62+
"org.apache.hadoop.hive.contrib.udf.example.GenericUDFTrim2",
63+
() => {
64+
checkAnswer(sql("SELECT generic_udf_trim2(' hello ')"), Row("hello") :: Nil)
65+
},
66+
() => {
67+
HiveGenericUDF(
68+
"default.generic_udf_trim2",
69+
HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udf.example.GenericUDFTrim2"),
70+
Array(AttributeReference("a", StringType, nullable = false)())
71+
)
72+
}
73+
),
74+
75+
// AbstractGenericUDAFResolver
76+
// GenericUDAFSum2 is cloned version of GenericUDAFSum in hive/exec.
77+
UDFTestInformation(
78+
"GENERIC_UDAF",
79+
"generic_udaf_sum2",
80+
"org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum2",
81+
() => {
82+
import spark.implicits._
83+
val df = Seq((0: Integer) -> 0, (1: Integer) -> 1, (2: Integer) -> 2, (3: Integer) -> 3)
84+
.toDF("key", "value").createOrReplaceTempView("t")
85+
checkAnswer(sql("SELECT generic_udaf_sum2(value) FROM t GROUP BY key % 2"),
86+
Row(2) :: Row(4) :: Nil)
87+
},
88+
() => {
89+
HiveUDAFFunction(
90+
"default.generic_udaf_sum2",
91+
HiveFunctionWrapper("org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum2"),
92+
Array(AttributeReference("a", IntegerType, nullable = false)())
93+
)
94+
}
95+
),
96+
97+
// UDAF
98+
// UDAFExampleMax2 is cloned version of UDAFExampleMax in hive/contrib.
99+
UDFTestInformation(
100+
"UDAF",
101+
"udaf_max2",
102+
"org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax2",
103+
() => {
104+
import spark.implicits._
105+
val df = Seq((0: Integer) -> 0, (1: Integer) -> 1, (2: Integer) -> 2, (3: Integer) -> 3)
106+
.toDF("key", "value").createOrReplaceTempView("t")
107+
checkAnswer(sql("SELECT udaf_max2(value) FROM t GROUP BY key % 2"),
108+
Row(2) :: Row(3) :: Nil)
109+
},
110+
() => {
111+
HiveUDAFFunction(
112+
"default.udaf_max2",
113+
HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax2"),
114+
Array(AttributeReference("a", IntegerType, nullable = false)()),
115+
isUDAFBridgeRequired = true
116+
)
117+
}
118+
),
119+
120+
// GenericUDTF
121+
// GenericUDTFCount3 is slightly modified version of GenericUDTFCount2 in hive/contrib,
122+
// which emits the count for three times.
123+
UDFTestInformation(
124+
"GENERIC_UDTF",
125+
"udtf_count3",
126+
"org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3",
127+
() => {
128+
checkAnswer(
129+
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
130+
Row(3) :: Row(3) :: Row(3) :: Nil)
131+
},
132+
() => {
133+
HiveGenericUDTF(
134+
"default.udtf_count3",
135+
HiveFunctionWrapper("org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"),
136+
Array.empty[Expression]
137+
)
138+
}
139+
)
140+
)
141+
142+
udfTestInfos.foreach { udfInfo =>
143+
// The test jars are built from below commit:
144+
// https://github.com/HeartSaVioR/hive/commit/12f3f036b6efd0299cd1d457c0c0a65e0fd7e5f2
145+
// which contain new UDF classes to be dynamically loaded and tested via Spark.
146+
147+
// This jar file should not be placed to the classpath.
148+
val jarPath = "src/test/noclasspath/hive-test-udfs.jar"
149+
val jarUrl = s"file://${System.getProperty("user.dir")}/$jarPath"
150+
151+
test("Spark should be able to run Hive UDF using jar regardless of " +
152+
s"current thread context classloader (${udfInfo.identifier}") {
153+
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
154+
withUserDefinedFunction(udfInfo.funcName -> false) {
155+
val sparkClassLoader = Thread.currentThread().getContextClassLoader
156+
157+
sql(s"CREATE FUNCTION ${udfInfo.funcName} AS '${udfInfo.className}' USING JAR '$jarUrl'")
158+
159+
assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)
160+
161+
// JAR will be loaded at first usage, and it will change the current thread's
162+
// context classloader to jar classloader in sharedState.
163+
// See SessionState.addJar for details.
164+
udfInfo.fnVerifyQuery()
165+
166+
assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
167+
assert(Thread.currentThread().getContextClassLoader eq
168+
spark.sqlContext.sharedState.jarClassLoader)
169+
170+
val udfExpr = udfInfo.fnCreateHiveUDFExpression()
171+
// force initializing - this is what we do in HiveSessionCatalog
172+
udfExpr.dataType
173+
174+
// Roll back to the original classloader and run query again. Without this line, the test
175+
// would pass, as thread's context classloader is changed to jar classloader. But thread
176+
// context classloader can be changed from others as well which would fail the query; one
177+
// example is spark-shell, which thread context classloader rolls back automatically. This
178+
// mimics the behavior of spark-shell.
179+
Thread.currentThread().setContextClassLoader(sparkClassLoader)
180+
181+
udfInfo.fnVerifyQuery()
182+
183+
val newExpr = udfExpr.makeCopy(udfExpr.productIterator.map(_.asInstanceOf[AnyRef])
184+
.toArray)
185+
newExpr.dataType
186+
}
187+
}
188+
}
189+
}
190+
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2492,51 +2492,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
24922492
}
24932493
}
24942494
}
2495-
2496-
test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " +
2497-
"current thread context classloader") {
2498-
// force to use Spark classloader as other test (even in other test suites) may change the
2499-
// current thread's context classloader to jar classloader
2500-
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
2501-
withUserDefinedFunction("udtf_count3" -> false) {
2502-
val sparkClassLoader = Thread.currentThread().getContextClassLoader
2503-
2504-
// This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
2505-
// modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
2506-
// three times.
2507-
val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
2508-
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
2509-
2510-
sql(
2511-
s"""
2512-
|CREATE FUNCTION udtf_count3
2513-
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
2514-
|USING JAR '$jarURL'
2515-
""".stripMargin)
2516-
2517-
assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)
2518-
2519-
// JAR will be loaded at first usage, and it will change the current thread's
2520-
// context classloader to jar classloader in sharedState.
2521-
// See SessionState.addJar for details.
2522-
checkAnswer(
2523-
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
2524-
Row(3) :: Row(3) :: Row(3) :: Nil)
2525-
2526-
assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
2527-
assert(Thread.currentThread().getContextClassLoader eq
2528-
spark.sqlContext.sharedState.jarClassLoader)
2529-
2530-
// Roll back to the original classloader and run query again. Without this line, the test
2531-
// would pass, as thread's context classloader is changed to jar classloader. But thread
2532-
// context classloader can be changed from others as well which would fail the query; one
2533-
// example is spark-shell, which thread context classloader rolls back automatically. This
2534-
// mimics the behavior of spark-shell.
2535-
Thread.currentThread().setContextClassLoader(sparkClassLoader)
2536-
checkAnswer(
2537-
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
2538-
Row(3) :: Row(3) :: Row(3) :: Nil)
2539-
}
2540-
}
2541-
}
25422495
}

0 commit comments

Comments
 (0)