@@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet
2020
2121import java .io .File
2222
23- import org .apache .spark .sql .catalyst .expressions .Row
2423import org .scalatest .BeforeAndAfterAll
2524
2625import org .apache .spark .sql .{SQLConf , QueryTest }
26+ import org .apache .spark .sql .catalyst .expressions .Row
2727import org .apache .spark .sql .execution .PhysicalRDD
2828import org .apache .spark .sql .hive .execution .HiveTableScan
2929import org .apache .spark .sql .hive .test .TestHive ._
3030import org .apache .spark .sql .hive .test .TestHive .implicits ._
31-
31+ import org . apache . spark . sql . sources . LogicalRelation
3232
3333// The data where the partitioning key exists only in the directory structure.
3434case class ParquetData (intField : Int , stringField : String )
@@ -122,52 +122,71 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
122122 override def beforeAll (): Unit = {
123123 super .beforeAll()
124124
125- sql(s """
126- create table test_parquet
127- (
128- intField INT,
129- stringField STRING
130- )
131- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
132- STORED AS
133- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
134- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
135- """ )
136-
137- val rdd = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str ${i}"} """ ))
138- jsonRDD(rdd).registerTempTable(" jt" )
139- sql("""
140- create table test_parquet_jt ROW FORMAT
141- | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
142- | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
143- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
144- | AS select * from jt""" .stripMargin)
125+ sql(
126+ """
127+ |create table test_parquet
128+ |(
129+ | intField INT,
130+ | stringField STRING
131+ |)
132+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
133+ |STORED AS
134+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
135+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
136+ """ .stripMargin)
145137
146138 conf.setConf(SQLConf .PARQUET_USE_DATA_SOURCE_API , " true" )
147139 }
148140
149141 override def afterAll (): Unit = {
150142 super .afterAll()
151- sql(" DROP TABLE test_parquet" )
152- sql(" DROP TABLE jt" )
153- sql(" DROP TABLE test_parquet_jt" )
143+ sql(" DROP TABLE IF EXISTS test_parquet" )
154144
155145 setConf(SQLConf .PARQUET_USE_DATA_SOURCE_API , originalConf.toString)
156146 }
157147
158- test(" scan from an empty parquet table" ) {
148+ test(" scan an empty parquet table" ) {
159149 checkAnswer(sql(" SELECT count(*) FROM test_parquet" ), Row (0 ))
160150 }
161151
162- test(" scan from an empty parquet table with upper case" ) {
152+ test(" scan an empty parquet table with upper case" ) {
163153 checkAnswer(sql(" SELECT count(INTFIELD) FROM TEST_parquet" ), Row (0 ))
164154 }
165155
166- test(" scan from an non empty parquet table #1" ) {
156+ test(" scan a parquet table created through a CTAS statement" ) {
157+ val originalConvertMetastore = getConf(" spark.sql.hive.convertMetastoreParquet" , " true" )
158+ val originalUseDataSource = getConf(SQLConf .PARQUET_USE_DATA_SOURCE_API , " true" )
159+ setConf(" spark.sql.hive.convertMetastoreParquet" , " true" )
160+ setConf(SQLConf .PARQUET_USE_DATA_SOURCE_API , " true" )
161+
162+ val rdd = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str ${i}"} """ ))
163+ jsonRDD(rdd).registerTempTable(" jt" )
164+ sql(
165+ """
166+ |create table test_parquet_ctas ROW FORMAT
167+ |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
168+ |STORED AS
169+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
170+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
171+ |AS select * from jt
172+ """ .stripMargin)
173+
167174 checkAnswer(
168- sql(s " SELECT a, b FROM test_parquet_jt WHERE a = '1' " ),
175+ sql(s " SELECT a, b FROM test_parquet_ctas WHERE a = 1 " ),
169176 Seq (Row (1 , " str1" ))
170177 )
178+
179+ table(" test_parquet_ctas" ).queryExecution.analyzed match {
180+ case LogicalRelation (p : ParquetRelation2 ) => // OK
181+ case _ =>
182+ fail(
183+ s " test_parquet_ctas should be converted to ${classOf [ParquetRelation2 ].getCanonicalName}" )
184+ }
185+
186+ sql(" DROP TABLE IF EXISTS jt" )
187+ sql(" DROP TABLE IF EXISTS test_parquet_ctas" )
188+ setConf(" spark.sql.hive.convertMetastoreParquet" , originalConvertMetastore)
189+ setConf(SQLConf .PARQUET_USE_DATA_SOURCE_API , originalUseDataSource)
171190 }
172191}
173192
0 commit comments