Skip to content

Commit 08ec4a7

Browse files
committed
address comments
1 parent 561b925 commit 08ec4a7

File tree

12 files changed

+181
-144
lines changed

12 files changed

+181
-144
lines changed

docs/sql-programming-guide.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -956,17 +956,17 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ
956956
When you create a Hive table, you need to define how this table should read/write data from/to file system,
957957
i.e. the "input format" and "output format". You also need to define how this table should deserialize the data
958958
to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage
959-
format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(format 'parquet')`.
959+
format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`.
960960
By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when
961961
creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it.
962962

963963
<table class="table">
964964
<tr><th>Property Name</th><th>Meaning</th></tr>
965965
<tr>
966-
<td><code>format</code></td>
966+
<td><code>fileFormat</code></td>
967967
<td>
968-
A format is kind of a package of storage format specification, including "serde", "input format" and
969-
"output format". Currently we supports 6 formats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
968+
A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and
969+
"output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
970970
</td>
971971
</tr>
972972

@@ -975,23 +975,23 @@ creating table, you can create a table using storage handler at Hive side, and u
975975
<td>
976976
These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
977977
e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not
978-
specify them if you already specified the `format` option.
978+
specify them if you already specified the `fileFormat` option.
979979
</td>
980980
</tr>
981981

982982
<tr>
983983
<td><code>serde</code></td>
984984
<td>
985-
This option specifies the name of a serde class. When the `format` option is specified, do not specify this option
986-
if the given `format` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
987-
don't include the serde information and you can use this option with these 3 formats.
985+
This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option
986+
if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
987+
don't include the serde information and you can use this option with these 3 fileFormats.
988988
</td>
989989
</tr>
990990

991991
<tr>
992992
<td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td>
993993
<td>
994-
These options can only be used with "textfile" format. They define how to read delimited files into rows.
994+
These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows.
995995
</td>
996996
</tr>
997997
</table>

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
377377
val storage = DataSource.buildStorageFormatFromOptions(options)
378378

379379
if (location.isDefined && storage.locationUri.isDefined) {
380-
throw new ParseException("Cannot specify LOCATION when there is 'path' in OPTIONS.", ctx)
380+
throw new ParseException(
381+
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
382+
"you can only specify one of them.", ctx)
381383
}
382384
val customLocation = storage.locationUri.orElse(location)
383385

@@ -1060,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10601062
val schema = StructType(dataCols ++ partitionCols)
10611063

10621064
// Storage format
1063-
val defaultStorage: CatalogStorageFormat = {
1064-
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
1065-
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
1066-
CatalogStorageFormat(
1067-
locationUri = None,
1068-
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
1069-
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
1070-
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
1071-
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
1072-
serde = defaultHiveSerde.flatMap(_.serde),
1073-
compressed = false,
1074-
properties = Map())
1075-
}
1065+
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
10761066
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
10771067
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
10781068
.getOrElse(CatalogStorageFormat.empty)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -449,19 +449,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
449449
// The relation in l is not an InsertableRelation.
450450
failAnalysis(s"$l does not allow insertion.")
451451

452-
case CreateTable(table, _, query) if DDLUtils.isHiveTable(table) =>
453-
if (table.bucketSpec.isDefined) {
454-
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
455-
}
456-
if (table.partitionColumnNames.nonEmpty && query.isDefined) {
457-
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
458-
"create a partitioned table using Hive's file formats. " +
459-
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
460-
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
461-
"CTAS statement."
462-
throw new AnalysisException(errorMessage)
463-
}
464-
465452
case _ => // OK
466453
}
467454
}

sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,49 @@
1717

1818
package org.apache.spark.sql.internal
1919

20+
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
21+
2022
case class HiveSerDe(
2123
inputFormat: Option[String] = None,
2224
outputFormat: Option[String] = None,
2325
serde: Option[String] = None)
2426

2527
object HiveSerDe {
28+
val serdeMap = Map(
29+
"sequencefile" ->
30+
HiveSerDe(
31+
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
32+
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
33+
34+
"rcfile" ->
35+
HiveSerDe(
36+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
37+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
38+
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
39+
40+
"orc" ->
41+
HiveSerDe(
42+
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
43+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
44+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
45+
46+
"parquet" ->
47+
HiveSerDe(
48+
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
49+
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
50+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
51+
52+
"textfile" ->
53+
HiveSerDe(
54+
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
55+
outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
56+
57+
"avro" ->
58+
HiveSerDe(
59+
inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
60+
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
61+
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
62+
2663
/**
2764
* Get the Hive SerDe information from the data source abbreviation string or classname.
2865
*
@@ -31,41 +68,6 @@ object HiveSerDe {
3168
* @return HiveSerDe associated with the specified source
3269
*/
3370
def sourceToSerDe(source: String): Option[HiveSerDe] = {
34-
val serdeMap = Map(
35-
"sequencefile" ->
36-
HiveSerDe(
37-
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
38-
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
39-
40-
"rcfile" ->
41-
HiveSerDe(
42-
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
43-
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
44-
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
45-
46-
"orc" ->
47-
HiveSerDe(
48-
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
49-
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
50-
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
51-
52-
"parquet" ->
53-
HiveSerDe(
54-
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
55-
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
56-
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
57-
58-
"textfile" ->
59-
HiveSerDe(
60-
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
61-
outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
62-
63-
"avro" ->
64-
HiveSerDe(
65-
inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
66-
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
67-
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
68-
6971
val key = source.toLowerCase match {
7072
case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
7173
case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
@@ -77,4 +79,16 @@ object HiveSerDe {
7779

7880
serdeMap.get(key)
7981
}
82+
83+
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
84+
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
85+
val defaultHiveSerde = sourceToSerDe(defaultStorageType)
86+
CatalogStorageFormat.empty.copy(
87+
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
88+
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
89+
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
90+
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
91+
serde = defaultHiveSerde.flatMap(_.serde)
92+
.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
93+
}
8094
}

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest {
121121
tableType: CatalogTableType = CatalogTableType.MANAGED,
122122
storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
123123
inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
124-
outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat),
124+
outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
125+
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
125126
schema: StructType = new StructType,
126127
provider: Option[String] = Some("hive"),
127128
partitionColumnNames: Seq[String] = Seq.empty,

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ class DDLCommandSuite extends PlanTest {
245245
val ct = parseAs[CreateTable](query)
246246
val hiveSerde = HiveSerDe.sourceToSerDe(s)
247247
assert(hiveSerde.isDefined)
248-
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
248+
assert(ct.tableDesc.storage.serde ==
249+
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
249250
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
250251
assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
251252
}
@@ -262,8 +263,10 @@ class DDLCommandSuite extends PlanTest {
262263
assert(parsed1.tableDesc.storage.serde == Some("anything"))
263264
assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
264265
assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
266+
265267
val parsed2 = parseAs[CreateTable](query2)
266-
assert(parsed2.tableDesc.storage.serde.isEmpty)
268+
assert(parsed2.tableDesc.storage.serde ==
269+
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
267270
assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
268271
assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
269272
}
@@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest {
297300
val ct = parseAs[CreateTable](query)
298301
val hiveSerde = HiveSerDe.sourceToSerDe(s)
299302
assert(hiveSerde.isDefined)
300-
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
303+
assert(ct.tableDesc.storage.serde ==
304+
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
301305
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
302306
assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
303307
} else {
@@ -427,7 +431,7 @@ class DDLCommandSuite extends PlanTest {
427431
val e = intercept[ParseException] {
428432
parser.parsePlan(v2)
429433
}
430-
assert(e.message.contains("Cannot specify LOCATION when there is 'path' in OPTIONS"))
434+
assert(e.message.contains("you can only specify one of them."))
431435
}
432436

433437
// ALTER TABLE table_name RENAME TO new_table_name;

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,39 +36,33 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3636
*/
3737
class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
3838
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
39-
case c @ CreateTable(t, _, _) if DDLUtils.isHiveTable(t) && t.storage.inputFormat.isEmpty =>
40-
if (t.bucketSpec.nonEmpty) {
41-
throw new AnalysisException("Cannot create bucketed Hive serde table.")
39+
case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
40+
if (t.bucketSpec.isDefined) {
41+
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
4242
}
43-
44-
val defaultStorage: CatalogStorageFormat = {
45-
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
46-
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
47-
CatalogStorageFormat(
48-
locationUri = None,
49-
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
50-
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
51-
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
52-
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
53-
serde = defaultHiveSerde.flatMap(_.serde)
54-
.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
55-
compressed = false,
56-
properties = Map())
43+
if (t.partitionColumnNames.nonEmpty && query.isDefined) {
44+
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
45+
"create a partitioned table using Hive's file formats. " +
46+
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
47+
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
48+
"CTAS statement."
49+
throw new AnalysisException(errorMessage)
5750
}
5851

52+
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
5953
val options = new HiveOptions(t.storage.properties)
6054

61-
val fileStorage = if (options.format.isDefined) {
62-
HiveSerDe.sourceToSerDe(options.format.get) match {
55+
val fileStorage = if (options.fileFormat.isDefined) {
56+
HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
6357
case Some(s) =>
6458
CatalogStorageFormat.empty.copy(
6559
inputFormat = s.inputFormat,
6660
outputFormat = s.outputFormat,
6761
serde = s.serde)
6862
case None =>
69-
throw new IllegalArgumentException(s"invalid format: '${options.format.get}'")
63+
throw new IllegalArgumentException(s"invalid fileFormat: '${options.fileFormat.get}'")
7064
}
71-
} else if (options.inputFormat.isDefined) {
65+
} else if (options.hasInputOutputFormat) {
7266
CatalogStorageFormat.empty.copy(
7367
inputFormat = options.inputFormat,
7468
outputFormat = options.outputFormat)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.sql.hive.execution
2020
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2121

2222
/**
23-
* Options for the Hive data source.
23+
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
24+
* serde/format information from these options.
2425
*/
2526
class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable {
2627
import HiveOptions._
2728

2829
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
2930

30-
val format = parameters.get(FORMAT).map(_.toLowerCase)
31+
val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase)
3132
val inputFormat = parameters.get(INPUT_FORMAT)
3233
val outputFormat = parameters.get(OUTPUT_FORMAT)
3334

@@ -36,24 +37,35 @@ class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends
3637
"have to specify both of them.")
3738
}
3839

39-
if (format.isDefined && inputFormat.isDefined) {
40-
throw new IllegalArgumentException("Cannot specify format and inputFormat/outputFormat " +
40+
def hasInputOutputFormat: Boolean = inputFormat.isDefined
41+
42+
if (fileFormat.isDefined && inputFormat.isDefined) {
43+
throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " +
4144
"together for Hive data source.")
4245
}
4346

4447
val serde = parameters.get(SERDE)
4548

46-
for (f <- format if serde.isDefined) {
47-
if (!Set("sequencefile", "textfile", "rcfile").contains(f)) {
48-
throw new IllegalArgumentException(s"format '$f' already specifies a serde.")
49+
if (fileFormat.isDefined && serde.isDefined) {
50+
if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) {
51+
throw new IllegalArgumentException(
52+
s"fileFormat '${fileFormat.get}' already specifies a serde.")
4953
}
5054
}
5155

5256
val containsDelimiters = delimiterOptions.keys.exists(parameters.contains)
5357

54-
for (f <- format if f != "textfile" && containsDelimiters) {
55-
throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " +
56-
s"with format 'textfile', not $f.")
58+
if (containsDelimiters) {
59+
if (serde.isDefined) {
60+
throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.")
61+
}
62+
if (fileFormat.isEmpty) {
63+
throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.")
64+
}
65+
if (fileFormat.get != "textfile") {
66+
throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " +
67+
s"with fileFormat 'textfile', not ${fileFormat.get}.")
68+
}
5769
}
5870

5971
for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") {
@@ -74,7 +86,7 @@ object HiveOptions {
7486
name
7587
}
7688

77-
val FORMAT = newOption("format")
89+
val FILE_FORMAT = newOption("fileFormat")
7890
val INPUT_FORMAT = newOption("inputFormat")
7991
val OUTPUT_FORMAT = newOption("outputFormat")
8092
val SERDE = newOption("serde")

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2828
import org.apache.spark.sql.types.StructType
2929

30-
private[orc] object OrcFileOperator extends Logging {
30+
private[hive] object OrcFileOperator extends Logging {
3131
/**
3232
* Retrieves an ORC file reader from a given path. The path can point to either a directory or a
3333
* single ORC file. If it points to a directory, it picks any non-empty ORC file within that

0 commit comments

Comments
 (0)