Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types._

/**
Expand All @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
partitionColumns: Array[String],
userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
Expand Down Expand Up @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand(
}

// Create the relation to validate the arguments before writing the metadata to the metastore.
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation(checkPathExist = false)
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation(checkPathExist = false)

val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
userSpecifiedPartitionColumns
} else {
val res = dataSource match {
case r: HadoopFsRelation => r.partitionSchema.fieldNames
case _ => Array.empty[String]
}
if (userSpecifiedPartitionColumns.length > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an exception for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I just keep the existing behavior.

To be honest, I think we should throw an exception whenever it makes sense. It sounds like the job log is not being read by most users. Will submit a follow-up PR to make it a change. Thanks!

// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
s"Schema: ${dataSource.schema.simpleString}; " +
s"Partition columns: ${res.mkString("(", ", ", ")")}")
}
res
}

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
schema = dataSource.schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we should still use the user-specified schema, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from the code, it is not very clear that dataSource.schema will be userSpecifiedSchema?

Copy link
Member Author

@gatorsmile gatorsmile Aug 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, dataSource.schema could be the inferred schema. Previously, we do not store the inferred schema. After this PR, we did and thus we use dataSource.schema.

Actually, after re-checking the code, I found the schema might be adjusted a little even if users specify the schema. For example, the nullability could be changed :

I think we should make such a change but maybe we should test and log it?

partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -213,7 +235,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata)
existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
Expand Down Expand Up @@ -256,7 +278,7 @@ case class CreateDataSourceTableAsSelectCommand(
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = Some(result.schema),
schema = result.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -306,7 +328,7 @@ object CreateDataSourceTableUtils extends Logging {
def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
schema: StructType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
Expand All @@ -315,28 +337,26 @@ object CreateDataSourceTableUtils extends Logging {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
userSpecifiedSchema.foreach { schema =>
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
Expand All @@ -353,16 +373,6 @@ object CreateDataSourceTableUtils extends Logging {
}
}

if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
Expand All @@ -375,7 +385,7 @@ object CreateDataSourceTableUtils extends Logging {
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,31 +521,29 @@ object DDLUtils {
table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
}

// A persisted data source table may not store its schema in the catalog. In this case, its schema
// will be inferred at runtime when the table is referenced.
def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
// A persisted data source table always store its schema in the catalog.
def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
require(isDatasourceTable(metadata))
val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted."
val props = metadata.properties
if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
props.get(DATASOURCE_SCHEMA).map { schema =>
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using spark.sql.sources.schema any more, we need to still support.
props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
} else {
metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
DataType.fromJson(schema).asInstanceOf[StructType]
} getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if getOrElse makes the code easier to follow.

props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(
"Could not read schema from the metastore because it is corrupted " +
s"(missing part $index of the schema, $numParts parts are expected).")
throw new AnalysisException(msgSchemaCorrupted +
s" (missing part $index of the schema, $numParts parts are expected).")
}

part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
}
} getOrElse(throw new AnalysisException(msgSchemaCorrupted))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, this getOrElse is too far from the get(DATASOURCE_SCHEMA)... Actually, I prefer the if/else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: )

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
} else {
val metadata = catalog.getTableMetadata(table)

if (DDLUtils.isDatasourceTable(metadata)) {
DDLUtils.getSchemaFromTableProperties(metadata) match {
case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
case None => describeSchema(catalog.lookupRelation(table).schema, result)
}
} else {
describeSchema(metadata.schema, result)
}

describeSchema(metadata, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
Expand All @@ -439,12 +431,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF

private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(table)) {
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
for (schema <- userSpecifiedSchema if partColNames.nonEmpty) {
if (partColNames.nonEmpty) {
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
describeSchema(StructType(partColNames.map(schema(_))), buffer)
describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
}
} else {
if (table.partitionColumns.nonEmpty) {
Expand Down Expand Up @@ -518,6 +510,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeSchema(
tableDesc: CatalogTable,
buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(tableDesc)) {
val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
describeSchema(schema, buffer)
} else {
describeSchema(tableDesc.schema, buffer)
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
Expand Down Expand Up @@ -876,12 +879,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman

private def showDataSourceTableDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
builder ++= columns.mkString("(", ", ", ")")
}

builder ++= "\n"
val schema = DDLUtils.getSchemaFromTableProperties(metadata)
val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
builder ++= columns.mkString("(", ", ", ")\n")
}

private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi
*/
private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
val schema = DDLUtils.getSchemaFromTableProperties(table)

// We only need names at here since userSpecifiedSchema we loaded from the metastore
// contains partition columns. We can always get datatypes of partitioning columns
Expand All @@ -218,7 +218,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
* is refreshed. For data source tables, the schema will not be inferred and refreshed.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as an InMemoryRelation, drop the original
Expand Down
Loading