@@ -22,7 +22,7 @@ import java.util.{ArrayList, Arrays, Properties}
2222
2323import org .apache .hadoop .conf .Configuration
2424import org .apache .hadoop .hive .ql .udf .UDAFPercentile
25- import org .apache .hadoop .hive .ql .udf .generic .{ GenericUDAFAverage , GenericUDF , GenericUDFOPAnd , GenericUDTFExplode }
25+ import org .apache .hadoop .hive .ql .udf .generic ._
2626import org .apache .hadoop .hive .ql .udf .generic .GenericUDF .DeferredObject
2727import org .apache .hadoop .hive .serde2 .{AbstractSerDe , SerDeStats }
2828import org .apache .hadoop .hive .serde2 .objectinspector .{ObjectInspector , ObjectInspectorFactory }
@@ -151,210 +151,220 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
151151 }
152152
153153 test(" UDFIntegerToString" ) {
154- val testData = hiveContext.sparkContext.parallelize(
155- IntegerCaseClass ( 1 ) :: IntegerCaseClass ( 2 ) :: Nil ).toDF()
156- testData.registerTempTable( " integerTable " )
157-
158- val udfName = classOf [ UDFIntegerToString ].getName
159- sql( s " CREATE TEMPORARY FUNCTION testUDFIntegerToString AS ' $udfName ' " )
160- checkAnswer(
161- sql( " SELECT testUDFIntegerToString(i) FROM integerTable " ),
162- Seq ( Row ( " 1 " ), Row ( " 2 " )))
163- sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString " )
164-
165- hiveContext.reset()
154+ withTempTable( " integerTable " ) {
155+ val testData = hiveContext.sparkContext.parallelize(
156+ IntegerCaseClass ( 1 ) :: IntegerCaseClass ( 2 ) :: Nil ).toDF( )
157+ testData.registerTempTable( " integerTable " )
158+
159+ val udfName = classOf [ UDFIntegerToString ].getName
160+ sql( s " CREATE TEMPORARY FUNCTION testUDFIntegerToString AS ' $udfName ' " )
161+ checkAnswer(
162+ sql( " SELECT testUDFIntegerToString(i) FROM integerTable " ),
163+ Seq ( Row ( " 1 " ), Row ( " 2 " )) )
164+ sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString " )
165+ }
166166 }
167167
168168 test(" UDFToListString" ) {
169- val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
170- testData.registerTempTable(" inputTable" )
171-
172- sql(s " CREATE TEMPORARY FUNCTION testUDFToListString AS ' ${classOf [UDFToListString ].getName}' " )
173- val errMsg = intercept[AnalysisException ] {
174- sql(" SELECT testUDFToListString(s) FROM inputTable" )
169+ withTempTable(" inputTable" ) {
170+ val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
171+ testData.registerTempTable(" inputTable" )
172+
173+ sql(s " CREATE TEMPORARY FUNCTION testUDFToListString AS ' ${classOf [UDFToListString ].getName}' " )
174+ val errMsg = intercept[AnalysisException ] {
175+ sql(" SELECT testUDFToListString(s) FROM inputTable" )
176+ }
177+ assert(errMsg.getMessage contains " List type in java is unsupported because " +
178+ " JVM type erasure makes spark fail to catch a component type in List<>;" )
179+
180+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString" )
175181 }
176- assert(errMsg.getMessage contains " List type in java is unsupported because " +
177- " JVM type erasure makes spark fail to catch a component type in List<>;" )
178-
179- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString" )
180- hiveContext.reset()
181182 }
182183
183184 test(" UDFToListInt" ) {
184- val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
185- testData.registerTempTable(" inputTable" )
186-
187- sql(s " CREATE TEMPORARY FUNCTION testUDFToListInt AS ' ${classOf [UDFToListInt ].getName}' " )
188- val errMsg = intercept[AnalysisException ] {
189- sql(" SELECT testUDFToListInt(s) FROM inputTable" )
185+ withTempTable(" inputTable" ) {
186+ val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
187+ testData.registerTempTable(" inputTable" )
188+
189+ sql(s " CREATE TEMPORARY FUNCTION testUDFToListInt AS ' ${classOf [UDFToListInt ].getName}' " )
190+ val errMsg = intercept[AnalysisException ] {
191+ sql(" SELECT testUDFToListInt(s) FROM inputTable" )
192+ }
193+ assert(errMsg.getMessage contains " List type in java is unsupported because " +
194+ " JVM type erasure makes spark fail to catch a component type in List<>;" )
195+
196+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt" )
190197 }
191- assert(errMsg.getMessage contains " List type in java is unsupported because " +
192- " JVM type erasure makes spark fail to catch a component type in List<>;" )
193-
194- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt" )
195- hiveContext.reset()
196198 }
197199
198200 test(" UDFToStringIntMap" ) {
199- val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
200- testData.registerTempTable(" inputTable" )
201-
202- sql(s " CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
203- s " AS ' ${classOf [UDFToStringIntMap ].getName}' " )
204- val errMsg = intercept[AnalysisException ] {
205- sql(" SELECT testUDFToStringIntMap(s) FROM inputTable" )
201+ withTempTable(" inputTable" ) {
202+ val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
203+ testData.registerTempTable(" inputTable" )
204+
205+ sql(s " CREATE TEMPORARY FUNCTION testUDFToStringIntMap " +
206+ s " AS ' ${classOf [UDFToStringIntMap ].getName}' " )
207+ val errMsg = intercept[AnalysisException ] {
208+ sql(" SELECT testUDFToStringIntMap(s) FROM inputTable" )
209+ }
210+ assert(errMsg.getMessage contains " Map type in java is unsupported because " +
211+ " JVM type erasure makes spark fail to catch key and value types in Map<>;" )
212+
213+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap" )
206214 }
207- assert(errMsg.getMessage contains " Map type in java is unsupported because " +
208- " JVM type erasure makes spark fail to catch key and value types in Map<>;" )
209-
210- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap" )
211- hiveContext.reset()
212215 }
213216
214217 test(" UDFToIntIntMap" ) {
215- val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
216- testData.registerTempTable(" inputTable" )
217-
218- sql(s " CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
219- s " AS ' ${classOf [UDFToIntIntMap ].getName}' " )
220- val errMsg = intercept[AnalysisException ] {
221- sql(" SELECT testUDFToIntIntMap(s) FROM inputTable" )
218+ withTempTable(" inputTable" ) {
219+ val testData = hiveContext.sparkContext.parallelize(StringCaseClass (" " ) :: Nil ).toDF()
220+ testData.registerTempTable(" inputTable" )
221+
222+ sql(s " CREATE TEMPORARY FUNCTION testUDFToIntIntMap " +
223+ s " AS ' ${classOf [UDFToIntIntMap ].getName}' " )
224+ val errMsg = intercept[AnalysisException ] {
225+ sql(" SELECT testUDFToIntIntMap(s) FROM inputTable" )
226+ }
227+ assert(errMsg.getMessage contains " Map type in java is unsupported because " +
228+ " JVM type erasure makes spark fail to catch key and value types in Map<>;" )
229+
230+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap" )
222231 }
223- assert(errMsg.getMessage contains " Map type in java is unsupported because " +
224- " JVM type erasure makes spark fail to catch key and value types in Map<>;" )
225-
226- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap" )
227- hiveContext.reset()
228232 }
229233
230234 test(" UDFListListInt" ) {
231- val testData = hiveContext.sparkContext.parallelize(
232- ListListIntCaseClass ( Nil ) ::
233- ListListIntCaseClass ( Seq (( 1 , 2 , 3 )) ) ::
234- ListListIntCaseClass (Seq ((4 , 5 , 6 ), ( 7 , 8 , 9 ))) :: Nil ).toDF()
235- testData.registerTempTable( " listListIntTable " )
236-
237- sql( s " CREATE TEMPORARY FUNCTION testUDFListListInt AS ' ${ classOf [ UDFListListInt ].getName} ' " )
238- checkAnswer(
239- sql( " SELECT testUDFListListInt(lli) FROM listListIntTable " ),
240- Seq ( Row ( 0 ), Row ( 2 ), Row ( 13 )))
241- sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt " )
242-
243- hiveContext.reset()
235+ withTempTable( " listListIntTable " ) {
236+ val testData = hiveContext.sparkContext.parallelize(
237+ ListListIntCaseClass ( Nil ) ::
238+ ListListIntCaseClass (Seq ((1 , 2 , 3 ))) ::
239+ ListListIntCaseClass ( Seq (( 4 , 5 , 6 ), ( 7 , 8 , 9 ))) :: Nil ).toDF( )
240+ testData.registerTempTable( " listListIntTable " )
241+
242+ sql( s " CREATE TEMPORARY FUNCTION testUDFListListInt AS ' ${ classOf [ UDFListListInt ].getName} ' " )
243+ checkAnswer(
244+ sql( " SELECT testUDFListListInt(lli) FROM listListIntTable " ),
245+ Seq ( Row ( 0 ), Row ( 2 ), Row ( 13 )) )
246+ sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt " )
247+ }
244248 }
245249
246250 test(" UDFListString" ) {
247- val testData = hiveContext.sparkContext.parallelize(
248- ListStringCaseClass ( Seq ( " a " , " b " , " c " )) ::
249- ListStringCaseClass (Seq (" d " , " e " )) :: Nil ).toDF()
250- testData.registerTempTable( " listStringTable " )
251-
252- sql( s " CREATE TEMPORARY FUNCTION testUDFListString AS ' ${ classOf [ UDFListString ].getName} ' " )
253- checkAnswer(
254- sql( " SELECT testUDFListString(l) FROM listStringTable " ),
255- Seq ( Row ( " a,b,c " ), Row ( " d,e " )))
256- sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFListString " )
257-
258- hiveContext.reset()
251+ withTempTable( " listStringTable " ) {
252+ val testData = hiveContext.sparkContext.parallelize(
253+ ListStringCaseClass (Seq (" a " , " b " , " c " )) ::
254+ ListStringCaseClass ( Seq ( " d " , " e " )) :: Nil ).toDF( )
255+ testData.registerTempTable( " listStringTable " )
256+
257+ sql( s " CREATE TEMPORARY FUNCTION testUDFListString AS ' ${ classOf [ UDFListString ].getName} ' " )
258+ checkAnswer(
259+ sql( " SELECT testUDFListString(l) FROM listStringTable " ),
260+ Seq ( Row ( " a,b,c " ), Row ( " d,e " )) )
261+ sql( " DROP TEMPORARY FUNCTION IF EXISTS testUDFListString " )
262+ }
259263 }
260264
261265 test(" UDFStringString" ) {
262- val testData = hiveContext.sparkContext.parallelize(
263- StringCaseClass (" world" ) :: StringCaseClass (" goodbye" ) :: Nil ).toDF()
264- testData.registerTempTable(" stringTable" )
266+ withTempTable(" stringTable" ) {
267+ val testData = hiveContext.sparkContext.parallelize(
268+ StringCaseClass (" world" ) :: StringCaseClass (" goodbye" ) :: Nil ).toDF()
269+ testData.registerTempTable(" stringTable" )
265270
266- sql(s " CREATE TEMPORARY FUNCTION testStringStringUDF AS ' ${classOf [UDFStringString ].getName}' " )
267- checkAnswer(
268- sql(" SELECT testStringStringUDF(\" hello\" , s) FROM stringTable" ),
269- Seq (Row (" hello world" ), Row (" hello goodbye" )))
270-
271- checkAnswer(
272- sql(" SELECT testStringStringUDF(\"\" , testStringStringUDF(\" hello\" , s)) FROM stringTable" ),
273- Seq (Row (" hello world" ), Row (" hello goodbye" )))
271+ sql(s " CREATE TEMPORARY FUNCTION testStringStringUDF AS ' ${classOf [UDFStringString ].getName}' " )
272+ checkAnswer(
273+ sql(" SELECT testStringStringUDF(\" hello\" , s) FROM stringTable" ),
274+ Seq (Row (" hello world" ), Row (" hello goodbye" )))
274275
275- sql(" DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF" )
276+ checkAnswer(
277+ sql(" SELECT testStringStringUDF(\"\" , testStringStringUDF(\" hello\" , s)) FROM stringTable" ),
278+ Seq (Row (" hello world" ), Row (" hello goodbye" )))
276279
277- hiveContext.reset()
280+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF" )
281+ }
278282 }
279283
280284 test(" UDFTwoListList" ) {
281- val testData = hiveContext.sparkContext.parallelize(
282- ListListIntCaseClass (Nil ) ::
283- ListListIntCaseClass (Seq ((1 , 2 , 3 ))) ::
284- ListListIntCaseClass (Seq ((4 , 5 , 6 ), (7 , 8 , 9 ))) ::
285- Nil ).toDF()
286- testData.registerTempTable(" TwoListTable" )
287-
288- sql(s " CREATE TEMPORARY FUNCTION testUDFTwoListList AS ' ${classOf [UDFTwoListList ].getName}' " )
289- checkAnswer(
290- sql(" SELECT testUDFTwoListList(lli, lli) FROM TwoListTable" ),
291- Seq (Row (" 0, 0" ), Row (" 2, 2" ), Row (" 13, 13" )))
292- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList" )
293-
294- hiveContext.reset()
295- }
285+ withTempTable(" TwoListTable" ) {
286+ val testData = hiveContext.sparkContext.parallelize(
287+ ListListIntCaseClass (Nil ) ::
288+ ListListIntCaseClass (Seq ((1 , 2 , 3 ))) ::
289+ ListListIntCaseClass (Seq ((4 , 5 , 6 ), (7 , 8 , 9 ))) ::
290+ Nil ).toDF()
291+ testData.registerTempTable(" TwoListTable" )
296292
297- test(" Hive UDFs with insufficient number of input arguments should trigger an analysis error" ) {
298- Seq ((1 , 2 )).toDF(" a" , " b" ).registerTempTable(" testUDF" )
299-
300- {
301- // HiveSimpleUDF
302293 sql(s " CREATE TEMPORARY FUNCTION testUDFTwoListList AS ' ${classOf [UDFTwoListList ].getName}' " )
303- val message = intercept[AnalysisException ] {
304- sql(" SELECT testUDFTwoListList() FROM testUDF" )
305- }.getMessage
306- assert(message.contains(" No handler for Hive udf" ))
294+ checkAnswer(
295+ sql(" SELECT testUDFTwoListList(lli, lli) FROM TwoListTable" ),
296+ Seq (Row (" 0, 0" ), Row (" 2, 2" ), Row (" 13, 13" )))
307297 sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList" )
308298 }
299+ }
309300
310- {
311- // HiveGenericUDF
312- sql(s " CREATE TEMPORARY FUNCTION testUDFAnd AS ' ${classOf [GenericUDFOPAnd ].getName}' " )
313- val message = intercept[AnalysisException ] {
314- sql(" SELECT testUDFAnd() FROM testUDF" )
315- }.getMessage
316- assert(message.contains(" No handler for Hive udf" ))
317- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd" )
318- }
319-
320- {
321- // Hive UDAF
322- sql(s " CREATE TEMPORARY FUNCTION testUDAFPercentile AS ' ${classOf [UDAFPercentile ].getName}' " )
323- val message = intercept[AnalysisException ] {
324- sql(" SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b" )
325- }.getMessage
326- assert(message.contains(" No handler for Hive udf" ))
327- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile" )
328- }
329-
330- {
331- // AbstractGenericUDAFResolver
332- sql(s " CREATE TEMPORARY FUNCTION testUDAFAverage AS ' ${classOf [GenericUDAFAverage ].getName}' " )
333- val message = intercept[AnalysisException ] {
334- sql(" SELECT testUDAFAverage() FROM testUDF GROUP BY b" )
335- }.getMessage
336- assert(message.contains(" No handler for Hive udf" ))
337- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage" )
338- }
339-
340- {
341- // Hive UDTF
342- sql(s " CREATE TEMPORARY FUNCTION testUDTFExplode AS ' ${classOf [GenericUDTFExplode ].getName}' " )
343- val message = intercept[AnalysisException ] {
344- sql(" SELECT testUDTFExplode() FROM testUDF" )
345- }.getMessage
346- assert(message.contains(" No handler for Hive udf" ))
347- sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode" )
301+ test(" Hive UDFs with insufficient number of input arguments should trigger an analysis error" ) {
302+ withTempTable(" testUDF" ) {
303+ Seq ((1 , 2 )).toDF(" a" , " b" ).registerTempTable(" testUDF" )
304+
305+ {
306+ // HiveSimpleUDF
307+ sql(s " CREATE TEMPORARY FUNCTION testUDFTwoListList AS ' ${classOf [UDFTwoListList ].getName}' " )
308+ val message = intercept[AnalysisException ] {
309+ sql(" SELECT testUDFTwoListList() FROM testUDF" )
310+ }.getMessage
311+ assert(message.contains(" No handler for Hive udf" ))
312+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList" )
313+ }
314+
315+ {
316+ // HiveGenericUDF
317+ sql(s " CREATE TEMPORARY FUNCTION testUDFAnd AS ' ${classOf [GenericUDFOPAnd ].getName}' " )
318+ val message = intercept[AnalysisException ] {
319+ sql(" SELECT testUDFAnd() FROM testUDF" )
320+ }.getMessage
321+ assert(message.contains(" No handler for Hive udf" ))
322+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd" )
323+ }
324+
325+ {
326+ // Hive UDAF
327+ sql(s " CREATE TEMPORARY FUNCTION testUDAFPercentile AS ' ${classOf [UDAFPercentile ].getName}' " )
328+ val message = intercept[AnalysisException ] {
329+ sql(" SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b" )
330+ }.getMessage
331+ assert(message.contains(" No handler for Hive udf" ))
332+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile" )
333+ }
334+
335+ {
336+ // AbstractGenericUDAFResolver
337+ sql(s " CREATE TEMPORARY " +
338+ s " FUNCTION testUDAFAverage AS ' ${classOf [GenericUDAFAverage ].getName}' " )
339+ val message = intercept[AnalysisException ] {
340+ sql(" SELECT testUDAFAverage() FROM testUDF GROUP BY b" )
341+ }.getMessage
342+ assert(message.contains(" No handler for Hive udf" ))
343+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage" )
344+ }
345+
346+ {
347+ // Hive UDTF
348+ sql(s " CREATE TEMPORARY " +
349+ s " FUNCTION testUDTFExplode AS ' ${classOf [GenericUDTFExplode ].getName}' " )
350+ val message = intercept[AnalysisException ] {
351+ sql(" SELECT testUDTFExplode() FROM testUDF" )
352+ }.getMessage
353+ assert(message.contains(" No handler for Hive udf" ))
354+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode" )
355+ }
348356 }
349-
350- sqlContext.dropTempTable(" testUDF" )
351357 }
352358
353359 test(" Hive UDF in group by" ) {
354- Seq (Tuple1 (1451400761 )).toDF(" test_date" ).registerTempTable(" tab1" )
355- val count = sql(" select date(cast(test_date as timestamp))" +
356- " from tab1 group by date(cast(test_date as timestamp))" ).count()
357- assert(count == 1 )
360+ withTempTable(" tab1" ) {
361+ Seq (Tuple1 (1451400761 )).toDF(" test_date" ).registerTempTable(" tab1" )
362+ sql(s " CREATE TEMPORARY FUNCTION testUDFToDate AS ' ${classOf [GenericUDFToDate ].getName}' " )
363+ val count = sql(" select testUDFToDate(cast(test_date as timestamp))" +
364+ " from tab1 group by testUDFToDate(cast(test_date as timestamp))" ).count()
365+ sql(" DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate" )
366+ assert(count == 1 )
367+ }
358368 }
359369
360370 test(" SPARK-11522 select input_file_name from non-parquet table" ){
0 commit comments