diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 3222a9cdc2c4e..164b5df06746e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -82,7 +82,7 @@ statement (TBLPROPERTIES tablePropertyList)? (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier - LIKE source=tableIdentifier #createTableLike + LIKE source=tableIdentifier locationSpec? #createTableLike | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS (identifier | FOR COLUMNS identifierSeq)? #analyze | ALTER (TABLE | VIEW) from=tableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 41768d451261a..ca76a10f79467 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1141,13 +1141,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * For example: * {{{ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name - * LIKE [other_db_name.]existing_table_name + * LIKE [other_db_name.]existing_table_name [locationSpec] * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { val targetTable = visitTableIdentifier(ctx.target) val sourceTable = visitTableIdentifier(ctx.source) - CreateTableLikeCommand(targetTable, sourceTable, ctx.EXISTS != null) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + CreateTableLikeCommand(targetTable, sourceTable, location, ctx.EXISTS != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 246894813c3b9..5d5b0707be1bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils /** - * A command to create a MANAGED table with the same definition of the given existing table. + * A command to create a table with the same definition of the given existing table. * In the target table definition, the table comment is always empty but the column comments * are identical to the ones defined in the source table. * @@ -52,12 +52,13 @@ import org.apache.spark.util.Utils * The syntax of using this command in SQL is: * {{{ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name - * LIKE [other_db_name.]existing_table_name + * LIKE [other_db_name.]existing_table_name [locationSpec] * }}} */ case class CreateTableLikeCommand( targetTable: TableIdentifier, sourceTable: TableIdentifier, + location: Option[String], ifNotExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -70,12 +71,19 @@ case class CreateTableLikeCommand( sourceTableDesc.provider } + // If location is specified, we create an external table internally. + // Else create managed table. + val tblType = if (location.isEmpty) { + CatalogTableType.MANAGED + } else { + CatalogTableType.EXTERNAL + } + val newTableDesc = CatalogTable( identifier = targetTable, - tableType = CatalogTableType.MANAGED, - // We are creating a new managed table, which should not have custom table location. - storage = sourceTableDesc.storage.copy(locationUri = None), + tableType = tblType, + storage = sourceTableDesc.storage.copy(locationUri = location), schema = sourceTableDesc.schema, provider = newProvider, partitionColumnNames = sourceTableDesc.partitionColumnNames, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index b67e5f6fe57a1..d6384913b88e9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -387,11 +387,11 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')" val query3 = s""" - |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y' - |COLLECTION ITEMS TERMINATED BY 'a' - |MAP KEYS TERMINATED BY 'b' - |LINES TERMINATED BY '\n' - |NULL DEFINED AS 'c' + |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y' + |COLLECTION ITEMS TERMINATED BY 'a' + |MAP KEYS TERMINATED BY 'b' + |LINES TERMINATED BY '\n' + |NULL DEFINED AS 'c' """.stripMargin val (desc1, _) = extractTableDesc(query1) val (desc2, _) = extractTableDesc(query2) @@ -518,24 +518,48 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle test("create table like") { val v1 = "CREATE TABLE table1 LIKE table2" - val (target, source, exists) = parser.parsePlan(v1).collect { - case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting) + val (target, source, location, exists) = parser.parsePlan(v1).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) }.head assert(exists == false) assert(target.database.isEmpty) assert(target.table == "table1") assert(source.database.isEmpty) assert(source.table == "table2") + assert(location.isEmpty) val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" - val (target2, source2, exists2) = parser.parsePlan(v2).collect { - case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting) + val (target2, source2, location2, exists2) = parser.parsePlan(v2).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) }.head assert(exists2) assert(target2.database.isEmpty) assert(target2.table == "table1") assert(source2.database.isEmpty) assert(source2.table == "table2") + assert(location2.isEmpty) + + val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" + val (target3, source3, location3, exists3) = parser.parsePlan(v3).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(!exists3) + assert(target3.database.isEmpty) + assert(target3.table == "table1") + assert(source3.database.isEmpty) + assert(source3.table == "table2") + assert(location3 == Some("/spark/warehouse")) + + val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" + val (target4, source4, location4, exists4) = parser.parsePlan(v4).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(exists4) + assert(target4.database.isEmpty) + assert(target4.table == "table1") + assert(source4.database.isEmpty) + assert(source4.table == "table2") + assert(location4 == Some("/spark/warehouse")) } test("load data") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index edef30823b55c..53a0eaf373219 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { - import spark.implicits._ + import testImplicits._ override def afterEach(): Unit = { try { @@ -50,8 +50,8 @@ class HiveDDLSuite } // check if the directory for recording the data of the table exists. private def tableDirectoryExists( - tableIdentifier: TableIdentifier, - dbPath: Option[String] = None): Boolean = { + tableIdentifier: TableIdentifier, + dbPath: Option[String] = None): Boolean = { val expectedTablePath = if (dbPath.isEmpty) { hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) @@ -79,6 +79,25 @@ class HiveDDLSuite } } + test("create a hive table without schema") { + import testImplicits._ + withTempPath { tempDir => + withTable("tab1", "tab2") { + (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath) + + var e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING hive") }.getMessage + assert(e.contains("Unable to infer the schema. The schema specification is required to " + + "create the table `default`.`tab1`")) + + e = intercept[AnalysisException] { + sql(s"CREATE TABLE tab2 location '${tempDir.getCanonicalPath}'") + }.getMessage + assert(e.contains("Unable to infer the schema. The schema specification is required to " + + "create the table `default`.`tab2`")) + } + } + } + test("drop external tables in default database") { withTempDir { tmpDir => val tabName = "tab1" @@ -199,7 +218,7 @@ class HiveDDLSuite val e = intercept[AnalysisException] { sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)") } - assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") + assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a") } test("add/drop partition with location - managed table") { @@ -222,8 +241,8 @@ class HiveDDLSuite sql( s""" |ALTER TABLE $tab ADD - |PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path' - |PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path' + |PARTITION (ds='2008-04-08', hr=11) LOCATION '${part1Path.toURI}' + |PARTITION (ds='2008-04-08', hr=12) LOCATION '${part2Path.toURI}' """.stripMargin) assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) @@ -522,9 +541,9 @@ class HiveDDLSuite assume(oldPart.storage.properties.filterKeys(expectedSerdeProps.contains) != expectedSerdeProps, "bad test: serde properties were already set") sql(s"""ALTER TABLE boxes PARTITION (width=4) - | SET SERDE '$expectedSerde' - | WITH SERDEPROPERTIES ($expectedSerdePropsString) - |""".stripMargin) + | SET SERDE '$expectedSerde' + | WITH SERDEPROPERTIES ($expectedSerdePropsString) + |""".stripMargin) val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) assert(newPart.storage.serde == Some(expectedSerde)) assume(newPart.storage.properties.filterKeys(expectedSerdeProps.contains) == @@ -656,8 +675,9 @@ class HiveDDLSuite assert(sql("DESC FORMATTED view1").collect().containsSlice( Seq( Row("# View Information", "", ""), - Row("View Original Text:", "SELECT * FROM tbl", ""), - Row("View Expanded Text:", "SELECT * FROM tbl", "") + Row("View Text:", "SELECT * FROM tbl", ""), + Row("View Default Database:", "default", ""), + Row("View Query Output Columns:", "[a]", "") ) )) } @@ -708,8 +728,8 @@ class HiveDDLSuite } test("create/drop database - location without pre-created directory") { - withTempPath { tmpDir => - createDatabaseWithLocation(tmpDir, dirExists = false) + withTempPath { tmpDir => + createDatabaseWithLocation(tmpDir, dirExists = false) } } @@ -813,54 +833,105 @@ class HiveDDLSuite } test("CREATE TABLE LIKE a temporary view") { + // CREATE TABLE LIKE a temporary view. + withCreateTableLikeTempView(None) + + // CREATE TABLE LIKE a temporary view location ... + withTempDir {tmpDir => + withCreateTableLikeTempView(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeTempView(location : Option[String]): Unit = { val sourceViewName = "tab1" val targetTabName = "tab2" + var createdTableType = CatalogTableType.MANAGED withTempView(sourceViewName) { withTable(targetTabName) { spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .createTempView(sourceViewName) - sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName") + if (location.isEmpty) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName ") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName " + + s"LIKE $sourceViewName LOCATION '$location'") + } val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata( TableIdentifier(sourceViewName)) val targetTable = spark.sessionState.catalog.getTableMetadata( TableIdentifier(targetTabName, Some("default"))) - checkCreateTableLike(sourceTable, targetTable) + checkCreateTableLike(sourceTable, targetTable, createdTableType) } } } test("CREATE TABLE LIKE a data source table") { + // CREATE TABLE LIKE a data source table. + withCreateTableLikeDSTable(None) + + // CREATE TABLE LIKE a data source table location ... + withTempDir { tmpDir => + withCreateTableLikeDSTable(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeDSTable(location : Option[String]): Unit = { val sourceTabName = "tab1" val targetTabName = "tab2" + var createdTableType = CatalogTableType.MANAGED withTable(sourceTabName, targetTabName) { spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write.format("json").saveAsTable(sourceTabName) - sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + if ( location.isEmpty ) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName LOCATION '$location'") + } val sourceTable = - spark.sessionState.catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default"))) + spark.sessionState.catalog.getTableMetadata( + TableIdentifier(sourceTabName, Some("default"))) val targetTable = - spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default"))) + spark.sessionState.catalog.getTableMetadata( + TableIdentifier(targetTabName, Some("default"))) // The table type of the source table should be a Hive-managed data source table assert(DDLUtils.isDatasourceTable(sourceTable)) assert(sourceTable.tableType == CatalogTableType.MANAGED) - checkCreateTableLike(sourceTable, targetTable) + checkCreateTableLike(sourceTable, targetTable, createdTableType) } } test("CREATE TABLE LIKE an external data source table") { + // CREATE TABLE LIKE an external data source table. + withCreateTableLikeExtDSTable(None) + + // CREATE TABLE LIKE an external data source table location ... + withTempDir { tmpDir => + withCreateTableLikeExtDSTable(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeExtDSTable(location : Option[String]): Unit = { val sourceTabName = "tab1" val targetTabName = "tab2" + var createdTableType = CatalogTableType.MANAGED withTable(sourceTabName, targetTabName) { withTempPath { dir => val path = dir.getCanonicalPath spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write.format("parquet").save(path) sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '${dir.toURI}')") - sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + if ( location.isEmpty ) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName LOCATION '$location'") + } // The source table should be an external data source table val sourceTable = spark.sessionState.catalog.getTableMetadata( @@ -871,32 +942,62 @@ class HiveDDLSuite assert(DDLUtils.isDatasourceTable(sourceTable)) assert(sourceTable.tableType == CatalogTableType.EXTERNAL) - checkCreateTableLike(sourceTable, targetTable) + checkCreateTableLike(sourceTable, targetTable, createdTableType) } } } test("CREATE TABLE LIKE a managed Hive serde table") { - val catalog = spark.sessionState.catalog + // CREATE TABLE LIKE a managed Hive serde table. + withCreateTableLikeManagedHiveTable(None) + + // CREATE TABLE LIKE a managed Hive serde table location ... + withTempDir { tmpDir => + withCreateTableLikeManagedHiveTable(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeManagedHiveTable(location : Option[String]): Unit = { val sourceTabName = "tab1" val targetTabName = "tab2" + var createdTableType = CatalogTableType.MANAGED + val catalog = spark.sessionState.catalog withTable(sourceTabName, targetTabName) { sql(s"CREATE TABLE $sourceTabName TBLPROPERTIES('prop1'='value1') AS SELECT 1 key, 'a'") - sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") - val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default"))) + if ( location.isEmpty ) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName LOCATION '$location'") + } + + val sourceTable = catalog.getTableMetadata( + TableIdentifier(sourceTabName, Some("default"))) assert(sourceTable.tableType == CatalogTableType.MANAGED) assert(sourceTable.properties.get("prop1").nonEmpty) - val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default"))) + val targetTable = catalog.getTableMetadata( + TableIdentifier(targetTabName, Some("default"))) - checkCreateTableLike(sourceTable, targetTable) + checkCreateTableLike(sourceTable, targetTable, createdTableType) } } test("CREATE TABLE LIKE an external Hive serde table") { + // CREATE TABLE LIKE an external Hive serde table. + withCreateTableLikeExtHiveTable(None) + + // CREATE TABLE LIKE an external Hive serde table location ... + withTempDir { tmpDir => + withCreateTableLikeExtHiveTable(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeExtHiveTable(location : Option[String]): Unit = { val catalog = spark.sessionState.catalog + var createdTableType = CatalogTableType.MANAGED withTempDir { tmpDir => - val basePath = tmpDir.toURI + val basePath1 = tmpDir.toURI val sourceTabName = "tab1" val targetTabName = "tab2" withTable(sourceTabName, targetTabName) { @@ -906,58 +1007,94 @@ class HiveDDLSuite |CREATE EXTERNAL TABLE $sourceTabName (key INT comment 'test', value STRING) |COMMENT 'Apache Spark' |PARTITIONED BY (ds STRING, hr STRING) - |LOCATION '$basePath' - """.stripMargin) + |LOCATION '$basePath1' + """.stripMargin) for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { sql( s""" |INSERT OVERWRITE TABLE $sourceTabName |partition (ds='$ds',hr='$hr') |SELECT 1, 'a' - """.stripMargin) + """.stripMargin) } - sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") - val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default"))) + if ( location.isEmpty ) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName " + + s"LIKE $sourceTabName LOCATION '$location'") + } + + val sourceTable = catalog.getTableMetadata( + TableIdentifier(sourceTabName, Some("default"))) assert(sourceTable.tableType == CatalogTableType.EXTERNAL) assert(sourceTable.comment == Option("Apache Spark")) - val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default"))) + val targetTable = catalog.getTableMetadata( + TableIdentifier(targetTabName, Some("default"))) - checkCreateTableLike(sourceTable, targetTable) + checkCreateTableLike(sourceTable, targetTable, createdTableType) } } } test("CREATE TABLE LIKE a view") { + // CREATE TABLE LIKE a view. + withCreateTableLikeView(None) + + // CREATE TABLE LIKE a view location ... + withTempDir { tmpDir => + withCreateTableLikeView(Some(tmpDir.toURI.toString)) + } + } + + private def withCreateTableLikeView(location : Option[String]): Unit = { val sourceTabName = "tab1" val sourceViewName = "view" val targetTabName = "tab2" + var createdTableType = CatalogTableType.MANAGED withTable(sourceTabName, targetTabName) { withView(sourceViewName) { spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write.format("json").saveAsTable(sourceTabName) sql(s"CREATE VIEW $sourceViewName AS SELECT * FROM $sourceTabName") - sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName") + + if ( location.isEmpty ) { + sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName") + } else { + createdTableType = CatalogTableType.EXTERNAL + sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName LOCATION '$location'") + } val sourceView = spark.sessionState.catalog.getTableMetadata( TableIdentifier(sourceViewName, Some("default"))) // The original source should be a VIEW with an empty path assert(sourceView.tableType == CatalogTableType.VIEW) - assert(sourceView.viewText.nonEmpty && sourceView.viewOriginalText.nonEmpty) + assert(sourceView.viewText.nonEmpty) + assert(sourceView.viewDefaultDatabase == Some("default")) + assert(sourceView.viewQueryColumnNames == Seq("a", "b", "c", "d")) val targetTable = spark.sessionState.catalog.getTableMetadata( TableIdentifier(targetTabName, Some("default"))) - checkCreateTableLike(sourceView, targetTable) + checkCreateTableLike(sourceView, targetTable, createdTableType) } } } - private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = { - // The created table should be a MANAGED table with empty view text and original text. - assert(targetTable.tableType == CatalogTableType.MANAGED, - "the created table must be a Hive managed table") - assert(targetTable.viewText.isEmpty && targetTable.viewOriginalText.isEmpty, - "the view text and original text in the created table must be empty") + private def checkCreateTableLike( + sourceTable: CatalogTable, + targetTable: CatalogTable, + tableType: CatalogTableType): Unit = { + // The created table should be a MANAGED table or EXTERNAL table with empty view text + // and original text. + assert(targetTable.tableType == tableType, + s"the created table must be a Hive ${tableType.name} table") + assert(targetTable.viewText.isEmpty, + "the view text in the created table must be empty") + assert(targetTable.viewDefaultDatabase.isEmpty, + "the view default database in the created table must be empty") + assert(targetTable.viewQueryColumnNames.isEmpty, + "the view query output columns in the created table must be empty") assert(targetTable.comment.isEmpty, "the comment in the created table must be empty") assert(targetTable.unsupportedFeatures.isEmpty, @@ -984,7 +1121,7 @@ class HiveDDLSuite "the table properties of source tables should not be copied in the created table") if (DDLUtils.isDatasourceTable(sourceTable) || - sourceTable.tableType == CatalogTableType.VIEW) { + sourceTable.tableType == CatalogTableType.VIEW) { assert(DDLUtils.isDatasourceTable(targetTable), "the target table should be a data source table") } else { @@ -1185,7 +1322,7 @@ class HiveDDLSuite assert(e2.getMessage.contains(forbiddenPrefix + "foo")) val e3 = intercept[AnalysisException] { - sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") + sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") } assert(e3.getMessage.contains(forbiddenPrefix + "foo")) } @@ -1260,9 +1397,9 @@ class HiveDDLSuite withTempPath { path => sql( s""" - |CREATE TABLE t(id int) USING hive - |OPTIONS(fileFormat 'orc', compression 'Zlib') - |LOCATION '${path.getCanonicalPath}' + |CREATE TABLE t(id int) USING hive + |OPTIONS(fileFormat 'orc', compression 'Zlib') + |LOCATION '${path.toURI}' """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(DDLUtils.isHiveTable(table)) @@ -1319,7 +1456,7 @@ class HiveDDLSuite } test("create hive serde table with DataFrameWriter.saveAsTable") { - withTable("t", "t2") { + withTable("t", "t1") { Seq(1 -> "a").toDF("i", "j") .write.format("hive").option("fileFormat", "avro").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a")) @@ -1350,17 +1487,8 @@ class HiveDDLSuite assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) - sql("INSERT INTO t SELECT 2, 'b'") - checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil) - - val e = intercept[AnalysisException] { - Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") - } - assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " + - "to create a partitioned table using Hive")) - val e2 = intercept[AnalysisException] { - Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2") + Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t1") } assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) @@ -1371,6 +1499,51 @@ class HiveDDLSuite } } + test("append data to hive serde table") { + withTable("t", "t1") { + Seq(1 -> "a").toDF("i", "j") + .write.format("hive").option("fileFormat", "avro").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a")) + + sql("INSERT INTO t SELECT 2, 'b'") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) + + Seq(3 -> "c").toDF("i", "j") + .write.format("hive").mode("append").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + Seq("c" -> 3).toDF("i", "j") + .write.format("hive").mode("append").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") + :: Row(null, "3") :: Nil) + + Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1") + + val e = intercept[AnalysisException] { + Seq(5 -> "e").toDF("i", "j") + .write.format("hive").mode("append").saveAsTable("t1") + } + assert(e.message.contains("The format of the existing table default.t1 is " + + "`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`.")) + } + } + + test("create partitioned hive serde table as select") { + withTable("t", "t1") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + Seq(10 -> "y").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t") + checkAnswer(spark.table("t"), Row("y", 10) :: Nil) + + Seq((1, 2, 3)).toDF("i", "j", "k").write.mode("overwrite").format("hive") + .partitionBy("j", "k").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, 2, 3) :: Nil) + + spark.sql("create table t1 using hive partitioned by (i) as select 1 as i, 'a' as j") + checkAnswer(spark.table("t1"), Row("a", 1) :: Nil) + } + } + } + test("read/write files with hive data source is not allowed") { withTempDir { dir => val e = intercept[AnalysisException] { @@ -1382,6 +1555,54 @@ class HiveDDLSuite Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath) } assert(e2.message.contains("Hive data source can only be used with tables")) + + val e3 = intercept[AnalysisException] { + spark.readStream.format("hive").load(dir.getAbsolutePath) + } + assert(e3.message.contains("Hive data source can only be used with tables")) + + val e4 = intercept[AnalysisException] { + spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath) + .writeStream.format("hive").start(dir.getAbsolutePath) + } + assert(e4.message.contains("Hive data source can only be used with tables")) + } + } + + test("partitioned table should always put partition columns at the end of table schema") { + def getTableColumns(tblName: String): Seq[String] = { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name) + } + + withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") { + sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)") + assert(getTableColumns("t") == Seq("a", "c", "d", "b")) + + sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d") + assert(getTableColumns("t1") == Seq("a", "c", "d", "b")) + + Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2") + assert(getTableColumns("t2") == Seq("a", "c", "d", "b")) + + withTempPath { path => + val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath + Seq(1 -> 1).toDF("a", "c").write.save(dataPath) + + sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'") + assert(getTableColumns("t3") == Seq("a", "c", "d", "b")) + } + + sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)") + assert(getTableColumns("t4") == Seq("a", "c", "d", "b")) + + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql("CREATE TABLE t5 USING hive PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d") + assert(getTableColumns("t5") == Seq("a", "c", "d", "b")) + + Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.format("hive") + .partitionBy("d", "b").saveAsTable("t6") + assert(getTableColumns("t6") == Seq("a", "c", "d", "b")) + } } } }