1717
1818package org .apache .spark .sql .hive
1919
20+ import java .sql .Timestamp
21+ import java .util .{Locale , TimeZone }
22+
2023import org .apache .hadoop .hive .conf .HiveConf
24+ import org .scalatest .BeforeAndAfterAll
2125
22- import org .apache .spark .sql .hive .test .TestHive
2326import org .apache .spark .sql .execution .datasources .parquet .ParquetCompatibilityTest
27+ import org .apache .spark .sql .hive .test .TestHive
2428import org .apache .spark .sql .{Row , SQLConf , SQLContext }
2529
26- class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
27- import ParquetCompatibilityTest .makeNullable
28-
30+ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll {
2931 override def _sqlContext : SQLContext = TestHive
3032 private val sqlContext = _sqlContext
3133
@@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
3537 */
3638 private val stagingDir = new HiveConf ().getVar(HiveConf .ConfVars .STAGINGDIR )
3739
38- test(" Read Parquet file generated by parquet-hive" ) {
40+ private val originalTimeZone = TimeZone .getDefault
41+ private val originalLocale = Locale .getDefault
42+
43+ protected override def beforeAll (): Unit = {
44+ TimeZone .setDefault(TimeZone .getTimeZone(" America/Los_Angeles" ))
45+ Locale .setDefault(Locale .US )
46+ }
47+
48+ override protected def afterAll (): Unit = {
49+ TimeZone .setDefault(originalTimeZone)
50+ Locale .setDefault(originalLocale)
51+ }
52+
53+ override protected def logParquetSchema (path : String ): Unit = {
54+ val schema = readParquetSchema(path, { path =>
55+ ! path.getName.startsWith(" _" ) && ! path.getName.startsWith(stagingDir)
56+ })
57+
58+ logInfo(
59+ s """ Schema of the Parquet file written by parquet-avro:
60+ | $schema
61+ """ .stripMargin)
62+ }
63+
64+ private def testParquetHiveCompatibility (row : Row , hiveTypes : String * ): Unit = {
3965 withTable(" parquet_compat" ) {
4066 withTempPath { dir =>
4167 val path = dir.getCanonicalPath
4268
69+ // Hive columns are always nullable, so here we append a all-null row.
70+ val rows = row :: Row (Seq .fill(row.length)(null ): _* ) :: Nil
71+
72+ // Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
4373 withSQLConf(HiveContext .CONVERT_METASTORE_PARQUET .key -> " false" ) {
4474 withTempTable(" data" ) {
45- sqlContext.sql(
75+ val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s " col_ $index $typ" }
76+
77+ val ddl =
4678 s """ CREATE TABLE parquet_compat(
47- | bool_column BOOLEAN,
48- | byte_column TINYINT,
49- | short_column SMALLINT,
50- | int_column INT,
51- | long_column BIGINT,
52- | float_column FLOAT,
53- | double_column DOUBLE,
54- |
55- | strings_column ARRAY<STRING>,
56- | int_to_string_column MAP<INT, STRING>
79+ | ${fields.mkString(" ,\n " )}
5780 |)
5881 |STORED AS PARQUET
5982 |LOCATION ' $path'
83+ """ .stripMargin
84+
85+ logInfo(
86+ s """ Creating testing Parquet table with the following DDL:
87+ | $ddl
6088 """ .stripMargin)
6189
90+ sqlContext.sql(ddl)
91+
6292 val schema = sqlContext.table(" parquet_compat" ).schema
63- val rowRDD = sqlContext.sparkContext.parallelize(makeRows ).coalesce(1 )
93+ val rowRDD = sqlContext.sparkContext.parallelize(rows ).coalesce(1 )
6494 sqlContext.createDataFrame(rowRDD, schema).registerTempTable(" data" )
6595 sqlContext.sql(" INSERT INTO TABLE parquet_compat SELECT * FROM data" )
6696 }
6797 }
6898
69- val schema = readParquetSchema(path, { path =>
70- ! path.getName.startsWith(" _" ) && ! path.getName.startsWith(stagingDir)
71- })
72-
73- logInfo(
74- s """ Schema of the Parquet file written by parquet-hive:
75- | $schema
76- """ .stripMargin)
99+ logParquetSchema(path)
77100
78101 // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
79102 // Have to assume all BINARY values are strings here.
80103 withSQLConf(SQLConf .PARQUET_BINARY_AS_STRING .key -> " true" ) {
81- checkAnswer(sqlContext.read.parquet(path), makeRows )
104+ checkAnswer(sqlContext.read.parquet(path), rows )
82105 }
83106 }
84107 }
85108 }
86109
87- def makeRows : Seq [Row ] = {
88- (0 until 10 ).map { i =>
89- def nullable [T <: AnyRef ]: ( => T ) => T = makeNullable[T ](i)
110+ test(" simple primitives" ) {
111+ testParquetHiveCompatibility(
112+ Row (true , 1 .toByte, 2 .toShort, 3 , 4 .toLong, 5.1f , 6.1d , " foo" ),
113+ " BOOLEAN" , " TINYINT" , " SMALLINT" , " INT" , " BIGINT" , " FLOAT" , " DOUBLE" , " STRING" )
114+ }
90115
116+ ignore(" SPARK-10177 timestamp" ) {
117+ testParquetHiveCompatibility(Row (Timestamp .valueOf(" 2015-08-24 00:31:00" )), " TIMESTAMP" )
118+ }
119+
120+ test(" array" ) {
121+ testParquetHiveCompatibility(
91122 Row (
92- nullable(i % 2 == 0 : java.lang.Boolean ),
93- nullable(i.toByte: java.lang.Byte ),
94- nullable((i + 1 ).toShort: java.lang.Short ),
95- nullable(i + 2 : Integer ),
96- nullable(i.toLong * 10 : java.lang.Long ),
97- nullable(i.toFloat + 0.1f : java.lang.Float ),
98- nullable(i.toDouble + 0.2d : java.lang.Double ),
99- nullable(Seq .tabulate(3 )(n => s " arr_ ${i + n}" )),
100- nullable(Seq .tabulate(3 )(n => (i + n : Integer ) -> s " val_ ${i + n}" ).toMap))
101- }
123+ Seq [Integer ](1 : Integer , null , 2 : Integer , null ),
124+ Seq [String ](" foo" , null , " bar" , null ),
125+ Seq [Seq [Integer ]](
126+ Seq [Integer ](1 : Integer , null ),
127+ Seq [Integer ](2 : Integer , null ))),
128+ " ARRAY<INT>" ,
129+ " ARRAY<STRING>" ,
130+ " ARRAY<ARRAY<INT>>" )
131+ }
132+
133+ test(" map" ) {
134+ testParquetHiveCompatibility(
135+ Row (
136+ Map [Integer , String ](
137+ (1 : Integer ) -> " foo" ,
138+ (2 : Integer ) -> null )),
139+ " MAP<INT, STRING>" )
140+ }
141+
142+ // HIVE-11625: Parquet map entries with null keys are dropped by Hive
143+ ignore(" map entries with null keys" ) {
144+ testParquetHiveCompatibility(
145+ Row (
146+ Map [Integer , String ](
147+ null .asInstanceOf [Integer ] -> " bar" ,
148+ null .asInstanceOf [Integer ] -> null )),
149+ " MAP<INT, STRING>" )
150+ }
151+
152+ test(" struct" ) {
153+ testParquetHiveCompatibility(
154+ Row (Row (1 , Seq (" foo" , " bar" , null ))),
155+ " STRUCT<f0: INT, f1: ARRAY<STRING>>" )
102156 }
103157}
0 commit comments