Skip to content

Commit dddda52

Browse files
committed
move hive hack for data source table into HiveExternalCatalog
1 parent b781ef8 commit dddda52

File tree

13 files changed

+252
-401
lines changed

13 files changed

+252
-401
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ case class BucketSpec(
112112
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
113113
* future once we have a better understanding of how we want to handle skewed columns.
114114
*
115+
* @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
115116
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
116117
* underlying table but not supported by Spark SQL yet.
117118
*/
@@ -120,6 +121,7 @@ case class CatalogTable(
120121
tableType: CatalogTableType,
121122
storage: CatalogStorageFormat,
122123
schema: StructType,
124+
provider: Option[String] = None,
123125
partitionColumnNames: Seq[String] = Seq.empty,
124126
bucketSpec: Option[BucketSpec] = None,
125127
owner: String = "",

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
552552
identifier = TableIdentifier("my_table", Some("db1")),
553553
tableType = CatalogTableType.MANAGED,
554554
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
555-
schema = new StructType().add("a", "int").add("b", "string")
555+
schema = new StructType().add("a", "int").add("b", "string"),
556+
provider = Some("hive")
556557
)
557558

558559
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
571572
storage = CatalogStorageFormat(
572573
Some(Utils.createTempDir().getAbsolutePath),
573574
None, None, None, false, Map.empty),
574-
schema = new StructType().add("a", "int").add("b", "string")
575+
schema = new StructType().add("a", "int").add("b", "string"),
576+
provider = Some("hive")
575577
)
576578
catalog.createTable("db1", externalTable, ignoreIfExists = false)
577579
assert(!exists(db.locationUri, "external_table"))
@@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
589591
.add("col2", "string")
590592
.add("a", "int")
591593
.add("b", "string"),
594+
provider = Some("hive"),
592595
partitionColumnNames = Seq("a", "b")
593596
)
594597
catalog.createTable("db1", table, ignoreIfExists = false)
@@ -692,6 +695,7 @@ abstract class CatalogTestUtils {
692695
.add("col2", "string")
693696
.add("a", "int")
694697
.add("b", "string"),
698+
provider = Some("hive"),
695699
partitionColumnNames = Seq("a", "b"),
696700
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
697701
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
957957
// Storage format
958958
val defaultStorage: CatalogStorageFormat = {
959959
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
960-
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
960+
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
961961
CatalogStorageFormat(
962962
locationUri = None,
963963
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
@@ -1001,6 +1001,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10011001
tableType = tableType,
10021002
storage = storage,
10031003
schema = schema,
1004+
provider = Some("hive"),
10041005
partitionColumnNames = partitionCols.map(_.name),
10051006
properties = properties,
10061007
comment = comment)
@@ -1101,7 +1102,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
11011102
override def visitGenericFileFormat(
11021103
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
11031104
val source = ctx.identifier.getText
1104-
HiveSerDe.sourceToSerDe(source, conf) match {
1105+
HiveSerDe.sourceToSerDe(source) match {
11051106
case Some(s) =>
11061107
CatalogStorageFormat.empty.copy(
11071108
inputFormat = s.inputFormat,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 21 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.util.regex.Pattern
2121

22-
import scala.collection.mutable
23-
import scala.util.control.NonFatal
24-
2522
import org.apache.spark.internal.Logging
2623
import org.apache.spark.sql._
2724
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -30,7 +27,6 @@ import org.apache.spark.sql.catalyst.catalog._
3027
import org.apache.spark.sql.catalyst.plans.QueryPlan
3128
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3229
import org.apache.spark.sql.execution.datasources._
33-
import org.apache.spark.sql.internal.HiveSerDe
3430
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
3531
import org.apache.spark.sql.types._
3632

@@ -124,16 +120,17 @@ case class CreateDataSourceTableCommand(
124120
res
125121
}
126122

127-
CreateDataSourceTableUtils.createDataSourceTable(
128-
sparkSession = sparkSession,
129-
tableIdent = tableIdent,
123+
val table = CatalogTable(
124+
identifier = tableIdent,
125+
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
126+
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
130127
schema = dataSource.schema,
131-
partitionColumns = partitionColumns,
132-
bucketSpec = bucketSpec,
133-
provider = provider,
134-
options = optionsWithPath,
135-
isExternal = isExternal)
128+
provider = Some(provider),
129+
partitionColumnNames = partitionColumns,
130+
bucketSpec = bucketSpec
131+
)
136132

133+
sessionState.catalog.createTable(table, ignoreIfExists)
137134
Seq.empty[Row]
138135
}
139136
}
@@ -235,7 +232,7 @@ case class CreateDataSourceTableAsSelectCommand(
235232
}
236233
existingSchema = Some(l.schema)
237234
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
238-
existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
235+
existingSchema = Some(s.metadata.schema)
239236
case o =>
240237
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
241238
}
@@ -275,15 +272,17 @@ case class CreateDataSourceTableAsSelectCommand(
275272
// We will use the schema of resolved.relation as the schema of the table (instead of
276273
// the schema of df). It is important since the nullability may be changed by the relation
277274
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
278-
CreateDataSourceTableUtils.createDataSourceTable(
279-
sparkSession = sparkSession,
280-
tableIdent = tableIdent,
281-
schema = result.schema,
282-
partitionColumns = partitionColumns,
283-
bucketSpec = bucketSpec,
284-
provider = provider,
285-
options = optionsWithPath,
286-
isExternal = isExternal)
275+
val schema = result.schema
276+
val table = CatalogTable(
277+
identifier = tableIdent,
278+
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
279+
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
280+
schema = schema,
281+
provider = Some(provider),
282+
partitionColumnNames = partitionColumns,
283+
bucketSpec = bucketSpec
284+
)
285+
sessionState.catalog.createTable(table, ignoreIfExists = false)
287286
}
288287

289288
// Refresh the cache of the table in the catalog.
@@ -324,189 +323,4 @@ object CreateDataSourceTableUtils extends Logging {
324323

325324
matcher.matches()
326325
}
327-
328-
def createDataSourceTable(
329-
sparkSession: SparkSession,
330-
tableIdent: TableIdentifier,
331-
schema: StructType,
332-
partitionColumns: Array[String],
333-
bucketSpec: Option[BucketSpec],
334-
provider: String,
335-
options: Map[String, String],
336-
isExternal: Boolean): Unit = {
337-
val tableProperties = new mutable.HashMap[String, String]
338-
tableProperties.put(DATASOURCE_PROVIDER, provider)
339-
340-
// Serialized JSON schema string may be too long to be stored into a single metastore table
341-
// property. In this case, we split the JSON string and store each part as a separate table
342-
// property.
343-
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
344-
val schemaJsonString = schema.json
345-
// Split the JSON string.
346-
val parts = schemaJsonString.grouped(threshold).toSeq
347-
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
348-
parts.zipWithIndex.foreach { case (part, index) =>
349-
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
350-
}
351-
352-
if (partitionColumns.length > 0) {
353-
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
354-
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
355-
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
356-
}
357-
}
358-
359-
if (bucketSpec.isDefined) {
360-
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
361-
362-
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
363-
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
364-
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
365-
tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
366-
}
367-
368-
if (sortColumnNames.nonEmpty) {
369-
tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
370-
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
371-
tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
372-
}
373-
}
374-
}
375-
376-
val tableType = if (isExternal) {
377-
tableProperties.put("EXTERNAL", "TRUE")
378-
CatalogTableType.EXTERNAL
379-
} else {
380-
tableProperties.put("EXTERNAL", "FALSE")
381-
CatalogTableType.MANAGED
382-
}
383-
384-
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
385-
val dataSource =
386-
DataSource(
387-
sparkSession,
388-
userSpecifiedSchema = Some(schema),
389-
partitionColumns = partitionColumns,
390-
bucketSpec = bucketSpec,
391-
className = provider,
392-
options = options)
393-
394-
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
395-
CatalogTable(
396-
identifier = tableIdent,
397-
tableType = tableType,
398-
schema = new StructType,
399-
storage = CatalogStorageFormat(
400-
locationUri = None,
401-
inputFormat = None,
402-
outputFormat = None,
403-
serde = None,
404-
compressed = false,
405-
properties = options
406-
),
407-
properties = tableProperties.toMap)
408-
}
409-
410-
def newHiveCompatibleMetastoreTable(
411-
relation: HadoopFsRelation,
412-
serde: HiveSerDe): CatalogTable = {
413-
assert(partitionColumns.isEmpty)
414-
assert(relation.partitionSchema.isEmpty)
415-
416-
CatalogTable(
417-
identifier = tableIdent,
418-
tableType = tableType,
419-
storage = CatalogStorageFormat(
420-
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
421-
inputFormat = serde.inputFormat,
422-
outputFormat = serde.outputFormat,
423-
serde = serde.serde,
424-
compressed = false,
425-
properties = options
426-
),
427-
schema = relation.schema,
428-
properties = tableProperties.toMap,
429-
viewText = None)
430-
}
431-
432-
// TODO: Support persisting partitioned data source relations in Hive compatible format
433-
val qualifiedTableName = tableIdent.quotedString
434-
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
435-
val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
436-
val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
437-
case _ if skipHiveMetadata =>
438-
val message =
439-
s"Persisting partitioned data source relation $qualifiedTableName into " +
440-
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
441-
(None, message)
442-
443-
case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
444-
relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
445-
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
446-
val message =
447-
s"Persisting data source relation $qualifiedTableName with a single input path " +
448-
s"into Hive metastore in Hive compatible format. Input path: " +
449-
s"${relation.location.paths.head}."
450-
(Some(hiveTable), message)
451-
452-
case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
453-
val message =
454-
s"Persisting partitioned data source relation $qualifiedTableName into " +
455-
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
456-
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
457-
(None, message)
458-
459-
case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
460-
val message =
461-
s"Persisting bucketed data source relation $qualifiedTableName into " +
462-
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
463-
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
464-
(None, message)
465-
466-
case (Some(serde), relation: HadoopFsRelation) =>
467-
val message =
468-
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
469-
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
470-
s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
471-
(None, message)
472-
473-
case (Some(serde), _) =>
474-
val message =
475-
s"Data source relation $qualifiedTableName is not a " +
476-
s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
477-
"in Spark SQL specific format, which is NOT compatible with Hive."
478-
(None, message)
479-
480-
case _ =>
481-
val message =
482-
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
483-
s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
484-
s"Spark SQL specific format, which is NOT compatible with Hive."
485-
(None, message)
486-
}
487-
488-
(hiveCompatibleTable, logMessage) match {
489-
case (Some(table), message) =>
490-
// We first try to save the metadata of the table in a Hive compatible way.
491-
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
492-
// specific way.
493-
try {
494-
logInfo(message)
495-
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
496-
} catch {
497-
case NonFatal(e) =>
498-
val warningMessage =
499-
s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
500-
s"it into Hive metastore in Spark SQL specific format."
501-
logWarning(warningMessage, e)
502-
val table = newSparkSQLSpecificMetastoreTable()
503-
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
504-
}
505-
506-
case (None, message) =>
507-
logWarning(message)
508-
val table = newSparkSQLSpecificMetastoreTable()
509-
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
510-
}
511-
}
512326
}

0 commit comments

Comments
 (0)