Skip to content

Commit a8260e8

Browse files
Update code as feedback
1 parent f4e243f commit a8260e8

File tree

3 files changed

+24
-30
lines changed

3 files changed

+24
-30
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -424,18 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
424424

425425
val desc = table.copy(schema = schema)
426426

427-
// This is a hack, we only take the RC, ORC and Parquet as specific storage
428-
// otherwise, we will convert it into Parquet2 when hive.convertCTAS specified
429-
val specificStorage = (table.inputFormat.map(format => {
430-
// org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet
431-
// org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc
432-
// org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile
433-
// parquet.hive.DeprecatedParquetInputFormat => Parquet
434-
// TODO configurable?
435-
format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile")
436-
}).getOrElse(false))
437-
438-
if (hive.convertCTAS && !specificStorage) {
427+
if (hive.convertCTAS && table.serde.isEmpty) {
439428
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
440429
// does not specify any storage format (file format and storage handler).
441430
if (table.specifiedDatabase.isDefined) {
@@ -454,9 +443,18 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
454443
child
455444
)
456445
} else {
446+
val desc = if (table.serde.isEmpty) {
447+
// add default serde
448+
table.copy(
449+
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
450+
} else {
451+
table
452+
}
453+
457454
val (dbName, tblName) =
458455
processDatabaseAndTableName(
459-
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
456+
desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
457+
460458
execution.CreateTableAsSelect(
461459
desc.copy(
462460
specifiedDatabase = Some(dbName),

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ case class CreateTableAsSelect(
6464

6565
override def output: Seq[Attribute] = Seq.empty[Attribute]
6666
override lazy val resolved: Boolean =
67-
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
67+
// TODO add more condition?
68+
tableDesc.specifiedDatabase.isDefined &&
69+
tableDesc.schema.size > 0 &&
70+
tableDesc.serde.isDefined &&
71+
childrenResolved
6872
}
6973

7074
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -607,26 +611,24 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
607611
serde = None,
608612
viewText = None)
609613

610-
// default serde & input/output format
611-
tableDesc = if ("SequenceFile".equalsIgnoreCase(
612-
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
614+
// default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
615+
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
616+
// handle the default format for the storage type abbriviation
617+
tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
613618
tableDesc.copy(
614619
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
615620
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
616-
} else if ("RCFile".equalsIgnoreCase(
617-
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
621+
} else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
618622
tableDesc.copy(
619623
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
620624
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
621625
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
622-
} else if ("ORC".equalsIgnoreCase(
623-
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
626+
} else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
624627
tableDesc.copy(
625628
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
626629
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
627630
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
628-
} else if ("PARQUET".equalsIgnoreCase(
629-
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
631+
} else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
630632
tableDesc.copy(
631633
inputFormat =
632634
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
@@ -766,12 +768,6 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
766768
case _ => // Unsupport features
767769
}
768770

769-
if (tableDesc.serde.isEmpty) {
770-
// add default serde
771-
tableDesc = tableDesc.copy(
772-
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
773-
}
774-
775771
CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
776772

777773
// If its not a "CTAS" like above then take it as a native command

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class HiveQlSuite extends FunSuite with BeforeAndAfterAll {
137137
assert(desc.serdeProperties == Map())
138138
assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
139139
assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
140-
assert(desc.serde == Option("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
140+
assert(desc.serde.isEmpty)
141141
assert(desc.properties == Map())
142142
}
143143

0 commit comments

Comments
 (0)