Skip to content

Commit d3b6066

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table
## What changes were proposed in this pull request? For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc. We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving. ## How was this patch tested? existing tests, and a new test in `HiveExternalCatalog` Author: Wenchen Fan <[email protected]> Closes #14750 from cloud-fan/minor1. (cherry picked from commit 95ec4e2) Signed-off-by: Reynold Xin <[email protected]>
1 parent 42386e7 commit d3b6066

File tree

17 files changed

+245
-97
lines changed

17 files changed

+245
-97
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20-
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
20+
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
2121
import org.apache.spark.sql.catalyst.expressions.Expression
2222

2323

@@ -39,6 +39,12 @@ abstract class ExternalCatalog {
3939
}
4040
}
4141

42+
protected def requireTableExists(db: String, table: String): Unit = {
43+
if (!tableExists(db, table)) {
44+
throw new NoSuchTableException(db = db, table = table)
45+
}
46+
}
47+
4248
protected def requireFunctionExists(db: String, funcName: String): Unit = {
4349
if (!functionExists(db, funcName)) {
4450
throw new NoSuchFunctionException(db = db, func = funcName)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ class InMemoryCatalog(
6464
catalog(db).tables(table).partitions.contains(spec)
6565
}
6666

67-
private def requireTableExists(db: String, table: String): Unit = {
68-
if (!tableExists(db, table)) {
69-
throw new NoSuchTableException(db = db, table = table)
70-
}
71-
}
72-
7367
private def requireTableNotExists(db: String, table: String): Unit = {
7468
if (tableExists(db, table)) {
7569
throw new TableAlreadyExistsException(db = db, table = table)

sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,28 @@ object DataType {
250250
case (fromDataType, toDataType) => fromDataType == toDataType
251251
}
252252
}
253+
254+
/**
255+
* Compares two types, ignoring nullability of ArrayType, MapType, StructType, and ignoring case
256+
* sensitivity of field names in StructType.
257+
*/
258+
private[sql] def equalsIgnoreCaseAndNullability(from: DataType, to: DataType): Boolean = {
259+
(from, to) match {
260+
case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
261+
equalsIgnoreCaseAndNullability(fromElement, toElement)
262+
263+
case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
264+
equalsIgnoreCaseAndNullability(fromKey, toKey) &&
265+
equalsIgnoreCaseAndNullability(fromValue, toValue)
266+
267+
case (StructType(fromFields), StructType(toFields)) =>
268+
fromFields.length == toFields.length &&
269+
fromFields.zip(toFields).forall { case (l, r) =>
270+
l.name.equalsIgnoreCase(r.name) &&
271+
equalsIgnoreCaseAndNullability(l.dataType, r.dataType)
272+
}
273+
274+
case (fromDataType, toDataType) => fromDataType == toDataType
275+
}
276+
}
253277
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,26 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
270270
assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
271271
}
272272

273+
test("column names should be case-preserving and column nullability should be retained") {
274+
val catalog = newBasicCatalog()
275+
val tbl = CatalogTable(
276+
identifier = TableIdentifier("tbl", Some("db1")),
277+
tableType = CatalogTableType.MANAGED,
278+
storage = storageFormat,
279+
schema = new StructType()
280+
.add("HelLo", "int", nullable = false)
281+
.add("WoRLd", "int", nullable = true),
282+
provider = Some("hive"),
283+
partitionColumnNames = Seq("WoRLd"),
284+
bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil)))
285+
catalog.createTable(tbl, ignoreIfExists = false)
286+
287+
val readBack = catalog.getTable("db1", "tbl")
288+
assert(readBack.schema == tbl.schema)
289+
assert(readBack.partitionColumnNames == tbl.partitionColumnNames)
290+
assert(readBack.bucketSpec == tbl.bucketSpec)
291+
}
292+
273293
// --------------------------------------------------------------------------
274294
// Partitions
275295
// --------------------------------------------------------------------------

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
2424
import org.apache.spark.annotation.InterfaceStability
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
27-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
28-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
29-
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
30-
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
27+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
28+
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions}
29+
import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils}
30+
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
3131
import org.apache.spark.sql.types.StructType
3232

3333
/**
@@ -359,7 +359,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
359359
}
360360

361361
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
362-
if (source.toLowerCase == "hive") {
362+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
363363
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
364364
}
365365

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
331331
}
332332
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
333333
val provider = ctx.tableProvider.qualifiedName.getText
334-
if (provider.toLowerCase == "hive") {
334+
if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
335335
throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
336336
}
337337
val schema = Option(ctx.colTypeList()).map(createSchema)
@@ -1034,7 +1034,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10341034
tableType = tableType,
10351035
storage = storage,
10361036
schema = schema,
1037-
provider = Some("hive"),
1037+
provider = Some(DDLUtils.HIVE_PROVIDER),
10381038
partitionColumnNames = partitionCols.map(_.name),
10391039
properties = properties,
10401040
comment = comment)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
415415

416416
object DDLStrategy extends Strategy {
417417
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
418-
case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" =>
418+
case CreateTable(tableDesc, mode, None)
419+
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
419420
val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
420421
ExecutedCommandExec(cmd) :: Nil
421422

@@ -427,7 +428,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
427428
// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
428429
// `CreateTables`
429430

430-
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
431+
case CreateTable(tableDesc, mode, Some(query))
432+
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
431433
val cmd =
432434
CreateDataSourceTableAsSelectCommand(
433435
tableDesc,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,10 @@ case class AlterTableSetLocationCommand(
687687

688688

689689
object DDLUtils {
690+
val HIVE_PROVIDER = "hive"
691+
690692
def isDatasourceTable(table: CatalogTable): Boolean = {
691-
table.provider.isDefined && table.provider.get != "hive"
693+
table.provider.isDefined && table.provider.get != HIVE_PROVIDER
692694
}
693695

694696
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd
2929
import org.apache.spark.sql.catalyst.plans.logical
3030
import org.apache.spark.sql.catalyst.plans.logical._
3131
import org.apache.spark.sql.catalyst.rules.Rule
32+
import org.apache.spark.sql.execution.command.DDLUtils
3233
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
3435
import org.apache.spark.sql.types.{AtomicType, StructType}
@@ -127,7 +128,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
127128
checkDuplication(normalizedPartitionCols, "partition")
128129

129130
if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
130-
if (tableDesc.provider.get == "hive") {
131+
if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) {
131132
// When we hit this branch, it means users didn't specify schema for the table to be
132133
// created, as we always include partition columns in table schema for hive serde tables.
133134
// The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -292,7 +293,7 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
292293
def apply(plan: LogicalPlan): Unit = {
293294
plan.foreach {
294295
case CreateTable(tableDesc, _, Some(_))
295-
if tableDesc.provider.get == "hive" =>
296+
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
296297
throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
297298

298299
case _ => // OK

0 commit comments

Comments
 (0)