From a1dbf6115b081360a1f565ea4daa02e2c63ef112 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 16 Dec 2016 00:57:22 +0800 Subject: [PATCH 1/4] unify CREATE TABLE syntax for data source and hive serde tables --- docs/sql-programming-guide.md | 56 ++++++++++-- .../sql/hive/JavaSparkHiveExample.java | 2 +- examples/src/main/python/sql/hive.py | 2 +- examples/src/main/r/RSparkSQLExample.R | 2 +- .../examples/sql/hive/SparkHiveExample.scala | 2 +- .../spark/sql/catalyst/parser/SqlBase.g4 | 10 ++- .../catalyst/util/CaseInsensitiveMap.scala | 2 + .../spark/sql/execution/SparkSqlParser.scala | 41 +++++---- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../spark/sql/execution/command/ddl.scala | 6 +- .../sql/execution/datasources/rules.scala | 7 +- .../execution/command/DDLCommandSuite.scala | 69 ++++++++++++-- .../spark/sql/hive/HiveExternalCatalog.scala | 4 +- .../spark/sql/hive/HiveSessionState.scala | 1 + .../spark/sql/hive/HiveStrategies.scala | 79 +++++++++++++--- .../sql/hive/execution/HiveOptions.scala | 90 +++++++++++++++++++ .../spark/sql/hive/HiveDDLCommandSuite.scala | 84 ++++++++++++++++- .../sql/hive/HiveExternalCatalogSuite.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 15 ---- .../sql/hive/execution/HiveDDLSuite.scala | 19 ++++ 20 files changed, 414 insertions(+), 85 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 6287e2be95d4..ff433ce352cc 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -522,14 +522,10 @@ Hive metastore. Persistent tables will still exist even after your Spark program long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the `table` method on a `SparkSession` with the name of the table. -By default `saveAsTable` will create a "managed table", meaning that the location of the data will -be controlled by the metastore. Managed tables will also have their data deleted automatically -when a table is dropped. - -Currently, `saveAsTable` does not expose an API supporting the creation of an "External table" from a `DataFrame`, -however, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key -and location of the external table as its value (String) when saving the table with `saveAsTable`. When an External table -is dropped only its metadata is removed. +You can specify the table path via the `path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. +When the table is dropped, the specified table path will not be removed and the table data is still there. +If you do not specify a table path, Spark SQL will generate a default table path to store the table data. +When the table is dropped, the default table path will be removed too. ## Parquet Files @@ -947,6 +943,50 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ +### Specifying storage format for Hive tables + +When you create a Hive table, you need to define how this table should read/write data from/to file system, +i.e. the "input format" and "output format". You also need to define how this table should deserialize the data +to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage +format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(format 'parquet')`. +By default, we will read the table files as plain text. + + + + + + + + + + + + + + + + + + + + + + +
Property NameMeaning
format + A format is kind of a package of storage format specification, including "serde", "input format" and + "output format". Currently we supports 6 formats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'. +
inputFormat, outputFormat + These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal, + e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not + specify them if you already specified the `format` option. +
serde + This option specifies the name of a serde class. When the `format` option is specified, do not specify this option + if the given `format` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" + don't include the serde information and you can use this option with these 3 formats. +
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim + These options can only be used with "textfile" format. They define how to read delimited files into rows. +
+ ### Interacting with Different Versions of Hive Metastore One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore, diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java index 052153c9e973..8d06d38cf2c6 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java @@ -64,7 +64,7 @@ public static void main(String[] args) { .enableHiveSupport() .getOrCreate(); - spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); + spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py index ad83fe1cf14b..ba01544a5bd2 100644 --- a/examples/src/main/python/sql/hive.py +++ b/examples/src/main/python/sql/hive.py @@ -44,7 +44,7 @@ .getOrCreate() # spark is an existing SparkSession - spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 373a36dba14f..e647f0e1e9f1 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -195,7 +195,7 @@ head(teenagers) # $example on:spark_hive$ # enableHiveSupport defaults to TRUE sparkR.session(enableHiveSupport = TRUE) -sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index ded18dacf1fe..d29ed958fe8f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -50,7 +50,7 @@ object SparkHiveExample { import spark.implicits._ import spark.sql - sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a34087cb6cd4..3222a9cdc2c4 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -69,16 +69,18 @@ statement | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase | createTableHeader ('(' colTypeList ')')? tableProvider - (OPTIONS tablePropertyList)? + (OPTIONS options=tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierList)? - bucketSpec? (AS? query)? #createTableUsing + bucketSpec? locationSpec? + (COMMENT comment=STRING)? + (AS? query)? #createTable | createTableHeader ('(' columns=colTypeList ')')? - (COMMENT STRING)? + (COMMENT comment=STRING)? (PARTITIONED BY '(' partitionColumns=colTypeList ')')? bucketSpec? skewSpec? rowFormat? createFileFormat? locationSpec? (TBLPROPERTIES tablePropertyList)? - (AS? query)? #createTable + (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier LIKE source=tableIdentifier #createTableLike | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index a7f7a8a66382..29e49a58375b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) + override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase) + override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = baseMap + kv.copy(_1 = kv._1.toLowerCase) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 14a983e43b00..65fdd506ef86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -342,11 +342,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a data source table, returning a [[CreateTable]] logical plan. + * Create a table, returning a [[CreateTable]] logical plan. * * Expected format: * {{{ - * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name * USING table_provider * [OPTIONS table_property_list] * [PARTITIONED BY (col_name, col_name, ...)] @@ -354,19 +354,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [SORTED BY (col_name [ASC|DESC], ...)] * INTO num_buckets BUCKETS * ] + * [TBLPROPERTIES (property_name=property_value, ...)] + * [COMMENT table_comment] * [AS select_statement]; * }}} */ - override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) if (external) { operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) } - val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText - if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING") - } val schema = Option(ctx.colTypeList()).map(createSchema) val partitionColumnNames = Option(ctx.partitionColumnNames) @@ -374,10 +373,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) - // TODO: this may be wrong for non file-based data source like JDBC, which should be external - // even there is no `path` in options. We should consider allow the EXTERNAL keyword. + val location = Option(ctx.locationSpec).map(visitLocationSpec) val storage = DataSource.buildStorageFormatFromOptions(options) - val tableType = if (storage.locationUri.isDefined) { + + if (location.isDefined && storage.locationUri.isDefined) { + throw new ParseException("Cannot specify LOCATION when there is 'path' in OPTIONS.", ctx) + } + val customLocation = storage.locationUri.orElse(location) + + val tableType = if (customLocation.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -386,12 +390,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val tableDesc = CatalogTable( identifier = table, tableType = tableType, - storage = storage, + storage = storage.copy(locationUri = customLocation), schema = schema.getOrElse(new StructType), provider = Some(provider), partitionColumnNames = partitionColumnNames, - bucketSpec = bucketSpec - ) + bucketSpec = bucketSpec, + comment = Option(ctx.comment).map(string)) // Determine the storage mode. val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists @@ -1011,10 +1015,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning a [[CreateTable]] logical plan. + * Create a Hive serde table, returning a [[CreateTable]] logical plan. * - * This is not used to create datasource tables, which is handled through - * "CREATE TABLE ... USING ...". + * This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL + * CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..." * * Note: several features are currently not supported - temporary tables, bucketing, * skewed columns and storage handlers (STORED BY). @@ -1032,7 +1036,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [AS select_statement]; * }}} */ - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) { val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) // TODO: implement temporary tables if (temp) { @@ -1046,7 +1050,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } - val comment = Option(ctx.STRING).map(string) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) @@ -1104,7 +1107,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { provider = Some(DDLUtils.HIVE_PROVIDER), partitionColumnNames = partitionCols.map(_.name), properties = properties, - comment = comment) + comment = Option(ctx.comment).map(string)) val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ba82ec156e85..8db2b03a7201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -419,8 +419,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTable(tableDesc, mode, None) - if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil @@ -432,8 +431,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule // `CreateTables` - case CreateTable(tableDesc, mode, Some(query)) - if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) => val cmd = CreateDataSourceTableAsSelectCommand( tableDesc, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 522158b64167..fd93eca3b282 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -763,8 +763,12 @@ case class AlterTableSetLocationCommand( object DDLUtils { val HIVE_PROVIDER = "hive" + def isHiveTable(table: CatalogTable): Boolean = { + table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER + } + def isDatasourceTable(table: CatalogTable): Boolean = { - table.provider.isDefined && table.provider.get != HIVE_PROVIDER + table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 07b16671f744..94ba814fa51f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl throw new AnalysisException("Saving data into a view is not allowed.") } - if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(existingTable)) { throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + "not supported yet. Please use the insertInto() API as an alternative.") } @@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl checkDuplication(normalizedPartitionCols, "partition") if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { - if (table.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(table)) { // When we hit this branch, it means users didn't specify schema for the table to be // created, as we always include partition columns in table schema for hive serde tables. // The real schema will be inferred at hive metastore by hive serde, plus the given @@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, Some(_)) - if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => + case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) => throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case _ => // OK diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 1a5e5226c2ec..6aa0fd99a7ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -236,7 +236,7 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } - test("create table - table file format") { + test("create hive table - table file format") { val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile", "sequencefile", "rcfile", "textfile") @@ -251,7 +251,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create table - row format and table file format") { + test("create hive table - row format and table file format") { val createTableStart = "CREATE TABLE my_tab ROW FORMAT" val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" val query1 = s"$createTableStart SERDE 'anything' $fileFormat" @@ -268,7 +268,7 @@ class DDLCommandSuite extends PlanTest { assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt")) } - test("create table - row format serde and generic file format") { + test("create hive table - row format serde and generic file format") { val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val supportedSources = Set("sequencefile", "rcfile", "textfile") @@ -287,7 +287,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create table - row format delimited and generic file format") { + test("create hive table - row format delimited and generic file format") { val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val supportedSources = Set("textfile") @@ -306,7 +306,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create external table - location must be specified") { + test("create hive external table - location must be specified") { assertUnsupported( sql = "CREATE EXTERNAL TABLE my_tab", containsThesePhrases = Seq("create external table", "location")) @@ -316,7 +316,7 @@ class DDLCommandSuite extends PlanTest { assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } - test("create table - property values must be set") { + test("create hive table - property values must be set") { assertUnsupported( sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 'key_with_value'='x')", containsThesePhrases = Seq("key_without_value")) @@ -326,14 +326,14 @@ class DDLCommandSuite extends PlanTest { containsThesePhrases = Seq("key_without_value")) } - test("create table - location implies external") { + test("create hive table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" val ct = parseAs[CreateTable](query) assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } - test("create table using - with partitioned by") { + test("create table - with partitioned by") { val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + "USING parquet PARTITIONED BY (a)" @@ -357,7 +357,7 @@ class DDLCommandSuite extends PlanTest { } } - test("create table using - with bucket") { + test("create table - with bucket") { val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" @@ -379,6 +379,57 @@ class DDLCommandSuite extends PlanTest { } } + test("create table - with comment") { + val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + comment = Some("abc")) + + parser.parsePlan(sql) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $sql") + } + } + + test("create table - with location") { + val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")), + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet")) + + parser.parsePlan(v1) match { + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) + case other => + fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + + s"got ${other.getClass.getName}: $v1") + } + + val v2 = + """ + |CREATE TABLE my_tab(a INT, b STRING) + |USING parquet + |OPTIONS (path '/tmp/file') + |LOCATION '/tmp/file' + """.stripMargin + val e = intercept[ParseException] { + parser.parsePlan(v2) + } + assert(e.message.contains("Cannot specify LOCATION when there is 'path' in OPTIONS")) + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fde6d4a94762..474a2c868e20 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -215,7 +215,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri } - if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { + if (DDLUtils.isHiveTable(tableDefinition)) { val tableWithDataSourceProps = tableDefinition.copy( // We can't leave `locationUri` empty and count on Hive metastore to set a default table // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default @@ -533,7 +533,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } else { val oldTableDef = getRawTable(db, withStatsProps.identifier.table) - val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { + val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) { tableDefinition.storage } else { // We can't alter the table storage of data source table directly for 2 reasons: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index bea073fb4848..52892f1c7270 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -64,6 +64,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) AnalyzeCreateTable(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: + new DetermineHiveSerde(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 773c4a39d854..4a9bd110ec99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,14 +18,79 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} + + +/** + * Determine the serde/format of the Hive serde table, according to the storage properties. + */ +class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case c @ CreateTable(t, _, _) if DDLUtils.isHiveTable(t) && t.storage.inputFormat.isEmpty => + if (t.bucketSpec.nonEmpty) { + throw new AnalysisException("Cannot create bucketed Hive serde table.") + } + + val defaultStorage: CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + serde = defaultHiveSerde.flatMap(_.serde) + .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")), + compressed = false, + properties = Map()) + } + + val options = new HiveOptions(t.storage.properties) + + val fileStorage = if (options.format.isDefined) { + HiveSerDe.sourceToSerDe(options.format.get) match { + case Some(s) => + CatalogStorageFormat.empty.copy( + inputFormat = s.inputFormat, + outputFormat = s.outputFormat, + serde = s.serde) + case None => + throw new IllegalArgumentException(s"invalid format: '${options.format.get}'") + } + } else if (options.inputFormat.isDefined) { + CatalogStorageFormat.empty.copy( + inputFormat = options.inputFormat, + outputFormat = options.outputFormat) + } else { + CatalogStorageFormat.empty + } + + val rowStorage = if (options.serde.isDefined) { + CatalogStorageFormat.empty.copy(serde = options.serde) + } else { + CatalogStorageFormat.empty + } + + val storage = t.storage.copy( + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + properties = options.serdeProperties) + + c.copy(tableDesc = t.copy(storage = storage)) + } +} private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -49,15 +114,7 @@ private[hive] trait HiveStrategies { InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => - val newTableDesc = if (tableDesc.storage.serde.isEmpty) { - // add default serde - tableDesc.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - tableDesc - } - + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => // Currently we will never hit this branch, as SQL string API can only use `Ignore` or // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde // tables yet. @@ -68,7 +125,7 @@ private[hive] trait HiveStrategies { val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) val cmd = CreateHiveTableAsSelectCommand( - newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), + tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), query, mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala new file mode 100644 index 000000000000..49e8854ea7b8 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +/** + * Options for the Hive data source. + */ +class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable { + import HiveOptions._ + + def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + + val format = parameters.get(FORMAT).map(_.toLowerCase) + val inputFormat = parameters.get(INPUT_FORMAT) + val outputFormat = parameters.get(OUTPUT_FORMAT) + + if (inputFormat.isDefined != outputFormat.isDefined) { + throw new IllegalArgumentException("Cannot specify only inputFormat or outputFormat, you " + + "have to specify both of them.") + } + + if (format.isDefined && inputFormat.isDefined) { + throw new IllegalArgumentException("Cannot specify format and inputFormat/outputFormat " + + "together for Hive data source.") + } + + val serde = parameters.get(SERDE) + + for (f <- format if serde.isDefined) { + if (!Set("sequencefile", "textfile", "rcfile").contains(f)) { + throw new IllegalArgumentException(s"format '$f' already specifies a serde.") + } + } + + val containsDelimiters = delimiterOptions.keys.exists(parameters.contains) + + for (f <- format if f != "textfile" && containsDelimiters) { + throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " + + s"with format 'textfile', not $f.") + } + + for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") { + throw new IllegalArgumentException("Hive data source only support newline '\\n' as " + + s"line delimiter, but given: $lineDelim.") + } + + def serdeProperties: Map[String, String] = parameters.filterKeys { + k => !lowerCasedOptionNames.contains(k.toLowerCase) + }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v } +} + +object HiveOptions { + private val lowerCasedOptionNames = collection.mutable.Set[String]() + + private def newOption(name: String): String = { + lowerCasedOptionNames += name.toLowerCase + name + } + + val FORMAT = newOption("format") + val INPUT_FORMAT = newOption("inputFormat") + val OUTPUT_FORMAT = newOption("outputFormat") + val SERDE = newOption("serde") + + // A map from the public delimiter option keys to the underlying Hive serde property keys. + val delimiterOptions = Map( + "fieldDelim" -> "field.delim", + "escapeDelim" -> "escape.delim", + // The following typo is inherited from Hive... + "collectionDelim" -> "colelction.delim", + "mapkeyDelim" -> "mapkey.delim", + "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase -> v } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index d13e29b3029b..714eb9a14d35 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformati import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType @@ -51,6 +50,12 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(e.getMessage.toLowerCase.contains("operation not allowed")) } + private def analyzeCreateTable(sql: String): CatalogTable = { + TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { + case CreateTable(tableDesc, mode, _) => tableDesc + }.head + } + test("Test CTAS #1") { val s1 = """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view @@ -76,7 +81,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) - assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) } test("Test CTAS #2") { @@ -107,7 +112,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) - assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) } test("Test CTAS #3") { @@ -592,4 +597,77 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client assert(hiveClient.getConf("hive.in.test", "") == "true") } + + test("create hive serde table with new syntax - basic") { + val sql = + """ + |CREATE TABLE t + |(id int, name string COMMENT 'blabla') + |USING hive + |OPTIONS (format 'parquet', my_prop 1) + |LOCATION '/tmp/file' + |COMMENT 'BLABLA' + """.stripMargin + + val table = analyzeCreateTable(sql) + assert(table.schema == new StructType() + .add("id", "int") + .add("name", "string", nullable = true, comment = "blabla")) + assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) + assert(table.storage.locationUri == Some("/tmp/file")) + assert(table.storage.properties == Map("my_prop" -> "1")) + assert(table.comment == Some("BLABLA")) + + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + } + + test("create hive serde table with new syntax - with partition and bucketing") { + val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)" + val table = analyzeCreateTable(v1) + assert(table.schema == new StructType().add("c1", "int").add("c2", "int")) + assert(table.partitionColumnNames == Seq("c2")) + // check the default formats + assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + + val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS" + val e = intercept[AnalysisException](analyzeCreateTable(v2)) + assert(e.message.contains("Cannot create bucketed Hive serde table")) + } + + test("create hive serde table with new syntax - Hive options error checking") { + val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')" + val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1)) + assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat")) + + val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " + + "(format 'x', inputFormat 'a', outputFormat 'b')" + val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2)) + assert(e2.getMessage.contains("Cannot specify format and inputFormat/outputFormat together")) + + val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'parquet', serde 'a')" + val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3)) + assert(e3.getMessage.contains("format 'parquet' already specifies a serde")) + + val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'parquet', 'fieldDelim' ' ')" + val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4)) + assert(e4.getMessage.contains( + "Cannot specify delimiters as they are only compatible with format 'textfile'")) + + // The value of 'format' option is case-insensitive. + val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'TEXTFILE', 'lineDelim' ',')" + val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5)) + assert(e5.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) + + val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'wrong')" + val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6)) + assert(e6.getMessage.contains("invalid format: 'wrong'")) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 6fee45824ea3..2f02bb5d3b64 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -68,6 +68,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val rawTable = externalCatalog.client.getTable("db1", "hive_tbl") assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER)) - assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER)) + assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl"))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0f787be0bbd2..846016c77bb4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1193,21 +1193,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("create a data source table using hive") { - val tableName = "tab1" - withTable (tableName) { - val e = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE $tableName - |(col1 int) - |USING hive - """.stripMargin) - }.getMessage - assert(e.contains("Cannot create hive serde table with CREATE TABLE USING")) - } - } - test("create a temp view using hive") { val tableName = "tab1" withTable (tableName) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f313db641b15..d1b10c6c3fbc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1198,4 +1198,23 @@ class HiveDDLSuite assert(e.message.contains("unknown is not a valid partition column")) } } + + test("create hive serde table with new syntax") { + withTable("t", "t2") { + sql("CREATE TABLE t(id int) USING hive OPTIONS(format 'orc')") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(spark.table("t").collect().isEmpty) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + + sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2") + val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + assert(DDLUtils.isHiveTable(table2)) + assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + checkAnswer(spark.table("t2"), Row(1, "a")) + } + } } From 25af2cb9873bc46f62f53d93143d5e6d0ffe0898 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Dec 2016 16:38:32 +0800 Subject: [PATCH 2/4] address comments --- docs/sql-programming-guide.md | 12 +++++++----- .../apache/spark/sql/execution/SparkSqlParser.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 13 +++++++++++++ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ff433ce352cc..c5b6e5efff19 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -522,10 +522,11 @@ Hive metastore. Persistent tables will still exist even after your Spark program long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the `table` method on a `SparkSession` with the name of the table. -You can specify the table path via the `path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. -When the table is dropped, the specified table path will not be removed and the table data is still there. -If you do not specify a table path, Spark SQL will generate a default table path to store the table data. -When the table is dropped, the default table path will be removed too. +For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the +`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped, +the custom table path will not be removed and the table data is still there. If no custom table path is +specifed, Spark will write data to a default table path under the warehouse directory. When the table is +dropped, the default table path will be removed too. ## Parquet Files @@ -949,7 +950,8 @@ When you create a Hive table, you need to define how this table should read/writ i.e. the "input format" and "output format". You also need to define how this table should deserialize the data to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(format 'parquet')`. -By default, we will read the table files as plain text. +By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when +creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 65fdd506ef86..a8e4039b6086 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -354,7 +354,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * [SORTED BY (col_name [ASC|DESC], ...)] * INTO num_buckets BUCKETS * ] - * [TBLPROPERTIES (property_name=property_value, ...)] + * [LOCATION path] * [COMMENT table_comment] * [AS select_statement]; * }}} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 94ba814fa51f..cbccfd0c2829 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -449,6 +449,19 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case CreateTable(table, _, query) if DDLUtils.isHiveTable(table) => + if (table.bucketSpec.isDefined) { + throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.") + } + if (table.partitionColumnNames.nonEmpty && query.isDefined) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Hive's file formats. " + + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + + "CTAS statement." + throw new AnalysisException(errorMessage) + } + case _ => // OK } } From 561b92516fca96576fc21cefeb4c9a37ad77f061 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 31 Dec 2016 20:30:50 +0800 Subject: [PATCH 3/4] document serde properties --- docs/sql-programming-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 35fd5094c1ad..27ed6a62eddf 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -996,6 +996,8 @@ creating table, you can create a table using storage handler at Hive side, and u
Property NameMeaning
+All other properties defined with `OPTIONS` will be regarded as Hive serde properties. + ### Interacting with Different Versions of Hive Metastore One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore, From 08ec4a7e0d4d25da2e5c40fbf497ce7d067a82c5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Jan 2017 13:33:46 +0800 Subject: [PATCH 4/4] address comments --- docs/sql-programming-guide.md | 18 ++-- .../spark/sql/execution/SparkSqlParser.scala | 18 +--- .../sql/execution/datasources/rules.scala | 13 --- .../apache/spark/sql/internal/HiveSerDe.scala | 84 +++++++++++-------- .../sql/execution/SparkSqlParserSuite.scala | 3 +- .../execution/command/DDLCommandSuite.scala | 12 ++- .../spark/sql/hive/HiveStrategies.scala | 36 ++++---- .../sql/hive/execution/HiveOptions.scala | 34 +++++--- .../spark/sql/hive/orc/OrcFileOperator.scala | 2 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 53 ++++++++---- .../sql/hive/execution/HiveDDLSuite.scala | 50 +++++++---- .../hive/orc/OrcHadoopFsRelationSuite.scala | 2 - 12 files changed, 181 insertions(+), 144 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 27ed6a62eddf..0f6e3446559b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -956,17 +956,17 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ When you create a Hive table, you need to define how this table should read/write data from/to file system, i.e. the "input format" and "output format". You also need to define how this table should deserialize the data to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage -format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(format 'parquet')`. +format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`. By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it. - + @@ -975,23 +975,23 @@ creating table, you can create a table using storage handler at Hive side, and u
Property NameMeaning
formatfileFormat - A format is kind of a package of storage format specification, including "serde", "input format" and - "output format". Currently we supports 6 formats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'. + A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and + "output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal, e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not - specify them if you already specified the `format` option. + specify them if you already specified the `fileFormat` option.
serde - This option specifies the name of a serde class. When the `format` option is specified, do not specify this option - if the given `format` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" - don't include the serde information and you can use this option with these 3 formats. + This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option + if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile" + don't include the serde information and you can use this option with these 3 fileFormats.
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim - These options can only be used with "textfile" format. They define how to read delimited files into rows. + These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a8e4039b6086..41768d451261 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -377,7 +377,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val storage = DataSource.buildStorageFormatFromOptions(options) if (location.isDefined && storage.locationUri.isDefined) { - throw new ParseException("Cannot specify LOCATION when there is 'path' in OPTIONS.", ctx) + throw new ParseException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.", ctx) } val customLocation = storage.locationUri.orElse(location) @@ -1060,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val schema = StructType(dataCols ++ partitionCols) // Storage format - val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) - CatalogStorageFormat( - locationUri = None, - inputFormat = defaultHiveSerde.flatMap(_.inputFormat) - .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), - outputFormat = defaultHiveSerde.flatMap(_.outputFormat) - .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - serde = defaultHiveSerde.flatMap(_.serde), - compressed = false, - properties = Map()) - } + val defaultStorage = HiveSerDe.getDefaultStorage(conf) validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) .getOrElse(CatalogStorageFormat.empty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index cbccfd0c2829..94ba814fa51f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -449,19 +449,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case CreateTable(table, _, query) if DDLUtils.isHiveTable(table) => - if (table.bucketSpec.isDefined) { - throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.") - } - if (table.partitionColumnNames.nonEmpty && query.isDefined) { - val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats. " + - "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + - "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + - "CTAS statement." - throw new AnalysisException(errorMessage) - } - case _ => // OK } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index 52e648a917d8..ca46a1151e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -17,12 +17,49 @@ package org.apache.spark.sql.internal +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat + case class HiveSerDe( inputFormat: Option[String] = None, outputFormat: Option[String] = None, serde: Option[String] = None) object HiveSerDe { + val serdeMap = Map( + "sequencefile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), + outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), + + "rcfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")), + + "orc" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), + + "parquet" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + + "textfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + + "avro" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), + serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) + /** * Get the Hive SerDe information from the data source abbreviation string or classname. * @@ -31,41 +68,6 @@ object HiveSerDe { * @return HiveSerDe associated with the specified source */ def sourceToSerDe(source: String): Option[HiveSerDe] = { - val serdeMap = Map( - "sequencefile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), - - "rcfile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")), - - "orc" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), - - "parquet" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), - - "textfile" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - - "avro" -> - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"), - serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))) - val key = source.toLowerCase match { case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.orc") => "orc" @@ -77,4 +79,16 @@ object HiveSerDe { serdeMap.get(key) } + + def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = sourceToSerDe(defaultStorageType) + CatalogStorageFormat.empty.copy( + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + serde = defaultHiveSerde.flatMap(_.serde) + .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index b070138be05d..15e490fb30a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest { tableType: CatalogTableType = CatalogTableType.MANAGED, storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy( inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat, - outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat), + outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat, + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")), schema: StructType = new StructType, provider: Option[String] = Some("hive"), partitionColumnNames: Seq[String] = Seq.empty, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 6aa0fd99a7ac..76bb9e5929a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -245,7 +245,8 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) - assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.serde == + hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } @@ -262,8 +263,10 @@ class DDLCommandSuite extends PlanTest { assert(parsed1.tableDesc.storage.serde == Some("anything")) assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) + val parsed2 = parseAs[CreateTable](query2) - assert(parsed2.tableDesc.storage.serde.isEmpty) + assert(parsed2.tableDesc.storage.serde == + Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt")) } @@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest { val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) - assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.serde == + hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } else { @@ -427,7 +431,7 @@ class DDLCommandSuite extends PlanTest { val e = intercept[ParseException] { parser.parsePlan(v2) } - assert(e.message.contains("Cannot specify LOCATION when there is 'path' in OPTIONS")) + assert(e.message.contains("you can only specify one of them.")) } // ALTER TABLE table_name RENAME TO new_table_name; diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4a9bd110ec99..6d5cc5778aef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -36,39 +36,33 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} */ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case c @ CreateTable(t, _, _) if DDLUtils.isHiveTable(t) && t.storage.inputFormat.isEmpty => - if (t.bucketSpec.nonEmpty) { - throw new AnalysisException("Cannot create bucketed Hive serde table.") + case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty => + if (t.bucketSpec.isDefined) { + throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.") } - - val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) - CatalogStorageFormat( - locationUri = None, - inputFormat = defaultHiveSerde.flatMap(_.inputFormat) - .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), - outputFormat = defaultHiveSerde.flatMap(_.outputFormat) - .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), - serde = defaultHiveSerde.flatMap(_.serde) - .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")), - compressed = false, - properties = Map()) + if (t.partitionColumnNames.nonEmpty && query.isDefined) { + val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + + "create a partitioned table using Hive's file formats. " + + "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + + "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + + "CTAS statement." + throw new AnalysisException(errorMessage) } + val defaultStorage = HiveSerDe.getDefaultStorage(conf) val options = new HiveOptions(t.storage.properties) - val fileStorage = if (options.format.isDefined) { - HiveSerDe.sourceToSerDe(options.format.get) match { + val fileStorage = if (options.fileFormat.isDefined) { + HiveSerDe.sourceToSerDe(options.fileFormat.get) match { case Some(s) => CatalogStorageFormat.empty.copy( inputFormat = s.inputFormat, outputFormat = s.outputFormat, serde = s.serde) case None => - throw new IllegalArgumentException(s"invalid format: '${options.format.get}'") + throw new IllegalArgumentException(s"invalid fileFormat: '${options.fileFormat.get}'") } - } else if (options.inputFormat.isDefined) { + } else if (options.hasInputOutputFormat) { CatalogStorageFormat.empty.copy( inputFormat = options.inputFormat, outputFormat = options.outputFormat) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala index 49e8854ea7b8..35b7a681f12e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -20,14 +20,15 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** - * Options for the Hive data source. + * Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive + * serde/format information from these options. */ class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable { import HiveOptions._ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) - val format = parameters.get(FORMAT).map(_.toLowerCase) + val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase) val inputFormat = parameters.get(INPUT_FORMAT) val outputFormat = parameters.get(OUTPUT_FORMAT) @@ -36,24 +37,35 @@ class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends "have to specify both of them.") } - if (format.isDefined && inputFormat.isDefined) { - throw new IllegalArgumentException("Cannot specify format and inputFormat/outputFormat " + + def hasInputOutputFormat: Boolean = inputFormat.isDefined + + if (fileFormat.isDefined && inputFormat.isDefined) { + throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " + "together for Hive data source.") } val serde = parameters.get(SERDE) - for (f <- format if serde.isDefined) { - if (!Set("sequencefile", "textfile", "rcfile").contains(f)) { - throw new IllegalArgumentException(s"format '$f' already specifies a serde.") + if (fileFormat.isDefined && serde.isDefined) { + if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) { + throw new IllegalArgumentException( + s"fileFormat '${fileFormat.get}' already specifies a serde.") } } val containsDelimiters = delimiterOptions.keys.exists(parameters.contains) - for (f <- format if f != "textfile" && containsDelimiters) { - throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " + - s"with format 'textfile', not $f.") + if (containsDelimiters) { + if (serde.isDefined) { + throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.") + } + if (fileFormat.isEmpty) { + throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.") + } + if (fileFormat.get != "textfile") { + throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " + + s"with fileFormat 'textfile', not ${fileFormat.get}.") + } } for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") { @@ -74,7 +86,7 @@ object HiveOptions { name } - val FORMAT = newOption("format") + val FILE_FORMAT = newOption("fileFormat") val INPUT_FORMAT = newOption("inputFormat") val OUTPUT_FORMAT = newOption("outputFormat") val SERDE = newOption("serde") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 3f1f86c278db..5a3fcd7a759c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType -private[orc] object OrcFileOperator extends Logging { +private[hive] object OrcFileOperator extends Logging { /** * Retrieves an ORC file reader from a given path. The path can point to either a directory or a * single ORC file. If it points to a directory, it picks any non-empty ORC file within that diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 714eb9a14d35..b67e5f6fe57a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -130,7 +130,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc.properties == Map()) } @@ -310,7 +310,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde.isEmpty) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc.storage.properties.isEmpty) assert(desc.properties.isEmpty) assert(desc.comment.isEmpty) @@ -417,7 +417,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val (desc2, _) = extractTableDesc(query2) assert(desc1.storage.inputFormat == Some("winput")) assert(desc1.storage.outputFormat == Some("wowput")) - assert(desc1.storage.serde.isEmpty) + assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) @@ -604,7 +604,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle |CREATE TABLE t |(id int, name string COMMENT 'blabla') |USING hive - |OPTIONS (format 'parquet', my_prop 1) + |OPTIONS (fileFormat 'parquet', my_prop 1) |LOCATION '/tmp/file' |COMMENT 'BLABLA' """.stripMargin @@ -638,8 +638,16 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS" - val e = intercept[AnalysisException](analyzeCreateTable(v2)) - assert(e.message.contains("Cannot create bucketed Hive serde table")) + val e2 = intercept[AnalysisException](analyzeCreateTable(v2)) + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val v3 = + """ + |CREATE TABLE t (c1 int, c2 int) USING hive + |PARTITIONED BY (c2) + |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin + val e3 = intercept[AnalysisException](analyzeCreateTable(v3)) + assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet")) } test("create hive serde table with new syntax - Hive options error checking") { @@ -648,26 +656,35 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat")) val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " + - "(format 'x', inputFormat 'a', outputFormat 'b')" + "(fileFormat 'x', inputFormat 'a', outputFormat 'b')" val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2)) - assert(e2.getMessage.contains("Cannot specify format and inputFormat/outputFormat together")) + assert(e2.getMessage.contains( + "Cannot specify fileFormat and inputFormat/outputFormat together")) - val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'parquet', serde 'a')" + val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')" val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3)) - assert(e3.getMessage.contains("format 'parquet' already specifies a serde")) + assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde")) - val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'parquet', 'fieldDelim' ' ')" + val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')" val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4)) - assert(e4.getMessage.contains( - "Cannot specify delimiters as they are only compatible with format 'textfile'")) + assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde")) - // The value of 'format' option is case-insensitive. - val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'TEXTFILE', 'lineDelim' ',')" + val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')" val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5)) - assert(e5.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) + assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat")) - val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (format 'wrong')" + val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')" val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6)) - assert(e6.getMessage.contains("invalid format: 'wrong'")) + assert(e6.getMessage.contains( + "Cannot specify delimiters as they are only compatible with fileFormat 'textfile'")) + + // The value of 'fileFormat' option is case-insensitive. + val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')" + val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7)) + assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) + + val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')" + val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8)) + assert(e8.getMessage.contains("invalid fileFormat: 'wrong'")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d1b10c6c3fbc..a5ef34adbed4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, Cat import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -1200,21 +1201,40 @@ class HiveDDLSuite } test("create hive serde table with new syntax") { - withTable("t", "t2") { - sql("CREATE TABLE t(id int) USING hive OPTIONS(format 'orc')") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(DDLUtils.isHiveTable(table)) - assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - assert(spark.table("t").collect().isEmpty) - - sql("INSERT INTO t SELECT 1") - checkAnswer(spark.table("t"), Row(1)) - - sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2") - val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) - assert(DDLUtils.isHiveTable(table2)) - assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - checkAnswer(spark.table("t2"), Row(1, "a")) + withTable("t", "t2", "t3") { + withTempPath { path => + sql( + s""" + |CREATE TABLE t(id int) USING hive + |OPTIONS(fileFormat 'orc', compression 'Zlib') + |LOCATION '${path.getCanonicalPath}' + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + assert(table.storage.properties.get("compression") == Some("Zlib")) + assert(spark.table("t").collect().isEmpty) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + // Check if this is compressed as ZLIB. + val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000")) + assert(maybeOrcFile.isDefined) + val orcFilePath = maybeOrcFile.get.toPath.toString + val expectedCompressionKind = + OrcFileOperator.getFileReader(orcFilePath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) + + sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2") + val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + assert(DDLUtils.isHiveTable(table2)) + assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + checkAnswer(spark.table("t2"), Row(1, "a")) + + sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)") + sql("INSERT INTO t3 PARTITION(p=1) SELECT 0") + checkAnswer(spark.table("t3"), Row(0, 1)) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 463c368fc42b..e678cf6f22c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -93,8 +93,6 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val conf = spark.sessionState.newHadoopConf() - val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString