Skip to content

Commit 4e06830

Browse files
committed
support creating hive table with DataFrameWriter and Catalog
1 parent faabe69 commit 4e06830

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
2727
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
2828
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
29-
import org.apache.spark.sql.execution.command.DDLUtils
3029
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
3130
import org.apache.spark.sql.sources.BaseRelation
3231
import org.apache.spark.sql.types.StructType
@@ -361,10 +360,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
361360
}
362361

363362
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
364-
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
365-
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
366-
}
367-
368363
val catalog = df.sparkSession.sessionState.catalog
369364
val tableExists = catalog.tableExists(tableIdent)
370365
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
@@ -385,6 +380,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
385380
}
386381
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
387382
// Only do the check if the table is a data source table (the relation is a BaseRelation).
383+
// TODO(cloud-fan): also check hive table relation here when we support overwrite mode
384+
// for creating hive tables.
388385
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
389386
throw new AnalysisException(
390387
s"Cannot overwrite table $tableName that is also being read from")

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
347347
source: String,
348348
schema: StructType,
349349
options: Map[String, String]): DataFrame = {
350-
if (source.toLowerCase == "hive") {
351-
throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
352-
}
353-
354350
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
355351
val tableDesc = CatalogTable(
356352
identifier = tableIdent,

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
3535
import org.apache.spark.sql.test.SQLTestUtils
36+
import org.apache.spark.sql.types.StructType
3637

3738
class HiveDDLSuite
3839
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1289,4 +1290,66 @@ class HiveDDLSuite
12891290
}
12901291
}
12911292
}
1293+
1294+
test("create hive serde table with Catalog") {
1295+
withTable("t") {
1296+
withTempDir { dir =>
1297+
val df = spark.catalog.createExternalTable(
1298+
"t",
1299+
"hive",
1300+
new StructType().add("i", "int"),
1301+
Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
1302+
assert(df.collect().isEmpty)
1303+
1304+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1305+
assert(DDLUtils.isHiveTable(table))
1306+
assert(table.storage.inputFormat ==
1307+
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
1308+
assert(table.storage.outputFormat ==
1309+
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
1310+
assert(table.storage.serde ==
1311+
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
1312+
1313+
sql("INSERT INTO t SELECT 1")
1314+
checkAnswer(spark.table("t"), Row(1))
1315+
}
1316+
}
1317+
}
1318+
1319+
test("create hive serde table with DataFrameWriter.saveAsTable") {
1320+
withTable("t", "t2") {
1321+
Seq(1 -> "a").toDF("i", "j")
1322+
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
1323+
checkAnswer(spark.table("t"), Row(1, "a"))
1324+
1325+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1326+
assert(DDLUtils.isHiveTable(table))
1327+
assert(table.storage.inputFormat ==
1328+
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
1329+
assert(table.storage.outputFormat ==
1330+
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
1331+
assert(table.storage.serde ==
1332+
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
1333+
1334+
sql("INSERT INTO t SELECT 2, 'b'")
1335+
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
1336+
1337+
val e = intercept[AnalysisException] {
1338+
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
1339+
}
1340+
assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
1341+
"to create a partitioned table using Hive"))
1342+
1343+
val e2 = intercept[AnalysisException] {
1344+
Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
1345+
}
1346+
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
1347+
1348+
val e3 = intercept[AnalysisException] {
1349+
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
1350+
}
1351+
assert(e3.message.contains(
1352+
"CTAS for hive serde tables does not support append or overwrite semantics"))
1353+
}
1354+
}
12921355
}

0 commit comments

Comments
 (0)