Skip to content

Commit 4de7131

Browse files
LantaoJincloud-fan
authored andcommitted
[SPARK-29421][SQL] Supporting Create Table Like Using Provider
### What changes were proposed in this pull request? Hive support STORED AS new file format syntax: ```sql CREATE TABLE tbl(a int) STORED AS TEXTFILE; CREATE TABLE tbl2 LIKE tbl STORED AS PARQUET; ``` We add a similar syntax for Spark. Here we separate to two features: 1. specify a different table provider in CREATE TABLE LIKE 2. Hive compatibility In this PR, we address the first one: - [ ] Using `USING provider` to specify a different table provider in CREATE TABLE LIKE. - [ ] Using `STORED AS file_format` in CREATE TABLE LIKE to address Hive compatibility. ### Why are the changes needed? Use CREATE TABLE tb1 LIKE tb2 command to create an empty table tb1 based on the definition of table tb2. The most user case is to create tb1 with the same schema of tb2. But an inconvenient case here is this command also copies the FileFormat from tb2, it cannot change the input/output format and serde. Add the ability of changing file format is useful for some scenarios like upgrading a table from a low performance file format to a high performance one (parquet, orc). ### Does this PR introduce any user-facing change? Add a new syntax based on current CTL: ```sql CREATE TABLE tbl2 LIKE tbl [USING parquet]; ``` ### How was this patch tested? Modify some exist UTs. Closes #26097 from LantaoJin/SPARK-29421. Authored-by: lajin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 1844015 commit 4de7131

File tree

6 files changed

+209
-89
lines changed

6 files changed

+209
-89
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ statement
119119
(TBLPROPERTIES tableProps=tablePropertyList))*
120120
(AS? query)? #createHiveTable
121121
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
122-
LIKE source=tableIdentifier locationSpec? #createTableLike
122+
LIKE source=tableIdentifier tableProvider? locationSpec? #createTableLike
123123
| replaceTableHeader ('(' colTypeList ')')? tableProvider
124124
((OPTIONS options=tablePropertyList) |
125125
(PARTITIONED BY partitioning=transformList) |

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,14 +632,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
632632
* For example:
633633
* {{{
634634
* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
635-
* LIKE [other_db_name.]existing_table_name [locationSpec]
635+
* LIKE [other_db_name.]existing_table_name [USING provider] [locationSpec]
636636
* }}}
637637
*/
638638
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
639639
val targetTable = visitTableIdentifier(ctx.target)
640640
val sourceTable = visitTableIdentifier(ctx.source)
641+
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
641642
val location = Option(ctx.locationSpec).map(visitLocationSpec)
642-
CreateTableLikeCommand(targetTable, sourceTable, location, ctx.EXISTS != null)
643+
CreateTableLikeCommand(targetTable, sourceTable, provider, location, ctx.EXISTS != null)
643644
}
644645

645646
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import java.io.File
2120
import java.net.{URI, URISyntaxException}
22-
import java.nio.file.FileSystems
2321

2422
import scala.collection.mutable.ArrayBuffer
2523
import scala.util.Try
@@ -29,7 +27,7 @@ import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
2927

3028
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3129
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute, UnresolvedRelation}
30+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
3331
import org.apache.spark.sql.catalyst.catalog._
3432
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
3533
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -57,23 +55,34 @@ import org.apache.spark.sql.util.SchemaUtils
5755
* The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
5856
* serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
5957
*
58+
* Use "CREATE TABLE t1 LIKE t2 USING file_format"
59+
* to specify new file format for t1 from a data source table t2.
60+
*
6061
* The syntax of using this command in SQL is:
6162
* {{{
6263
* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
63-
* LIKE [other_db_name.]existing_table_name [locationSpec]
64+
* LIKE [other_db_name.]existing_table_name [USING provider] [locationSpec]
6465
* }}}
6566
*/
6667
case class CreateTableLikeCommand(
6768
targetTable: TableIdentifier,
6869
sourceTable: TableIdentifier,
70+
provider: Option[String],
6971
location: Option[String],
7072
ifNotExists: Boolean) extends RunnableCommand {
7173

7274
override def run(sparkSession: SparkSession): Seq[Row] = {
7375
val catalog = sparkSession.sessionState.catalog
7476
val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
7577

76-
val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
78+
val newProvider = if (provider.isDefined) {
79+
if (!DDLUtils.isHiveTable(provider)) {
80+
// check the validation of provider input, invalid provider will throw
81+
// AnalysisException, ClassNotFoundException, or NoClassDefFoundError
82+
DataSource.lookupDataSource(provider.get, sparkSession.sessionState.conf)
83+
}
84+
provider
85+
} else if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
7786
Some(sparkSession.sessionState.conf.defaultDataSourceName)
7887
} else {
7988
sourceTableDesc.provider

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,47 +1107,75 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
11071107

11081108
test("create table like") {
11091109
val v1 = "CREATE TABLE table1 LIKE table2"
1110-
val (target, source, location, exists) = parser.parsePlan(v1).collect {
1111-
case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting)
1110+
val (target, source, provider, location, exists) = parser.parsePlan(v1).collect {
1111+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
11121112
}.head
11131113
assert(exists == false)
11141114
assert(target.database.isEmpty)
11151115
assert(target.table == "table1")
11161116
assert(source.database.isEmpty)
11171117
assert(source.table == "table2")
11181118
assert(location.isEmpty)
1119+
assert(provider.isEmpty)
11191120

11201121
val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2"
1121-
val (target2, source2, location2, exists2) = parser.parsePlan(v2).collect {
1122-
case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting)
1122+
val (target2, source2, provider2, location2, exists2) = parser.parsePlan(v2).collect {
1123+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
11231124
}.head
11241125
assert(exists2)
11251126
assert(target2.database.isEmpty)
11261127
assert(target2.table == "table1")
11271128
assert(source2.database.isEmpty)
11281129
assert(source2.table == "table2")
11291130
assert(location2.isEmpty)
1131+
assert(provider2.isEmpty)
11301132

11311133
val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'"
1132-
val (target3, source3, location3, exists3) = parser.parsePlan(v3).collect {
1133-
case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting)
1134+
val (target3, source3, provider3, location3, exists3) = parser.parsePlan(v3).collect {
1135+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
11341136
}.head
11351137
assert(!exists3)
11361138
assert(target3.database.isEmpty)
11371139
assert(target3.table == "table1")
11381140
assert(source3.database.isEmpty)
11391141
assert(source3.table == "table2")
11401142
assert(location3 == Some("/spark/warehouse"))
1143+
assert(provider3.isEmpty)
11411144

1142-
val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'"
1143-
val (target4, source4, location4, exists4) = parser.parsePlan(v4).collect {
1144-
case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting)
1145+
val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'"
1146+
val (target4, source4, provider4, location4, exists4) = parser.parsePlan(v4).collect {
1147+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
11451148
}.head
11461149
assert(exists4)
11471150
assert(target4.database.isEmpty)
11481151
assert(target4.table == "table1")
11491152
assert(source4.database.isEmpty)
11501153
assert(source4.table == "table2")
11511154
assert(location4 == Some("/spark/warehouse"))
1155+
assert(provider4.isEmpty)
1156+
1157+
val v5 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING parquet"
1158+
val (target5, source5, provider5, location5, exists5) = parser.parsePlan(v5).collect {
1159+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
1160+
}.head
1161+
assert(exists5)
1162+
assert(target5.database.isEmpty)
1163+
assert(target5.table == "table1")
1164+
assert(source5.database.isEmpty)
1165+
assert(source5.table == "table2")
1166+
assert(location5.isEmpty)
1167+
assert(provider5 == Some("parquet"))
1168+
1169+
val v6 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 USING ORC"
1170+
val (target6, source6, provider6, location6, exists6) = parser.parsePlan(v6).collect {
1171+
case CreateTableLikeCommand(t, s, p, l, allowExisting) => (t, s, p, l, allowExisting)
1172+
}.head
1173+
assert(exists6)
1174+
assert(target6.database.isEmpty)
1175+
assert(target6.table == "table1")
1176+
assert(source6.database.isEmpty)
1177+
assert(source6.table == "table2")
1178+
assert(location6.isEmpty)
1179+
assert(provider6 == Some("ORC"))
11521180
}
11531181
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,19 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
168168
assert(e.message.contains("It doesn't match the specified format"))
169169
}
170170
}
171+
172+
test("throw exception if Create Table LIKE USING Hive built-in ORC in in-memory catalog") {
173+
val catalog = spark.sessionState.catalog
174+
withTable("s", "t") {
175+
sql("CREATE TABLE s(a INT, b INT) USING parquet")
176+
val source = catalog.getTableMetadata(TableIdentifier("s"))
177+
assert(source.provider == Some("parquet"))
178+
val e = intercept[AnalysisException] {
179+
sql("CREATE TABLE t LIKE s USING org.apache.spark.sql.hive.orc")
180+
}.getMessage
181+
assert(e.contains("Hive built-in ORC data source must be used with Hive support enabled"))
182+
}
183+
}
171184
}
172185

173186
abstract class DDLSuite extends QueryTest with SQLTestUtils {
@@ -2820,4 +2833,34 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
28202833
}
28212834
}
28222835
}
2836+
2837+
test("Create Table LIKE USING provider") {
2838+
val catalog = spark.sessionState.catalog
2839+
withTable("s", "t1", "t2", "t3", "t4") {
2840+
sql("CREATE TABLE s(a INT, b INT) USING parquet")
2841+
val source = catalog.getTableMetadata(TableIdentifier("s"))
2842+
assert(source.provider == Some("parquet"))
2843+
2844+
sql("CREATE TABLE t1 LIKE s USING orc")
2845+
val table1 = catalog.getTableMetadata(TableIdentifier("t1"))
2846+
assert(table1.provider == Some("orc"))
2847+
2848+
sql("CREATE TABLE t2 LIKE s USING hive")
2849+
val table2 = catalog.getTableMetadata(TableIdentifier("t2"))
2850+
assert(table2.provider == Some("hive"))
2851+
2852+
val e1 = intercept[ClassNotFoundException] {
2853+
sql("CREATE TABLE t3 LIKE s USING unknown")
2854+
}.getMessage
2855+
assert(e1.contains("Failed to find data source"))
2856+
2857+
withGlobalTempView("src") {
2858+
val globalTempDB = spark.sharedState.globalTempViewManager.database
2859+
sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b")
2860+
sql(s"CREATE TABLE t4 LIKE $globalTempDB.src USING parquet")
2861+
val table = catalog.getTableMetadata(TableIdentifier("t4"))
2862+
assert(table.provider == Some("parquet"))
2863+
}
2864+
}
2865+
}
28232866
}

0 commit comments

Comments
 (0)