Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression

import org.apache.spark.sql.types.StructType

/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
Expand Down Expand Up @@ -104,6 +104,19 @@ abstract class ExternalCatalog {
*/
def alterTable(tableDefinition: CatalogTable): Unit

/**
* Alter the schema of a table identified by the provided database and table name. The new schema
* should still contain the existing bucket columns and partition columns used by the table. This
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
* as the schema itself).
*
* @param db Database that table to alter schema for exists in
* @param table Name of table to alter schema for
* @param schema Updated schema to be used for the table (must contain existing partition and
* bucket columns)
*/
def alterTableSchema(db: String, table: String, schema: StructType): Unit

def getTable(db: String, table: String): CatalogTable

def getTableOption(db: String, table: String): Option[CatalogTable]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.StructType

/**
* An in-memory (ephemeral) implementation of the system catalog.
Expand Down Expand Up @@ -297,6 +298,15 @@ class InMemoryCatalog(
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}

override def alterTableSchema(
db: String,
table: String,
schema: StructType): Unit = synchronized {
requireTableExists(db, table)
val origTable = catalog(db).tables(table).table
catalog(db).tables(table).table = origTable.copy(schema = schema)
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
* configured.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -176,7 +181,8 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true) {

/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -239,6 +239,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}

test("alter table schema") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
val newSchema = StructType(Seq(
StructField("new_field_1", IntegerType),
StructField("new_field_2", StringType),
StructField("a", IntegerType),
StructField("b", StringType)))
catalog.alterTableSchema("db2", "tbl1", newSchema)
val newTbl1 = catalog.getTable("db2", "tbl1")
assert(newTbl1.schema == newSchema)
}

test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
"lastAccessTime" -> -1,
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))
"unsupportedFeatures" -> List.empty[String],
"schemaPreservesCase" -> JBool(true)))

// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,71 +486,6 @@ object ParquetFileFormat extends Logging {
}
}

/**
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
* schema and Parquet schema.
*
* Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
* schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
* distinguish binary and string). This method generates a correct schema by merging Metastore
* schema data types and Parquet schema field names.
*/
def mergeMetastoreParquetSchema(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
def schemaConflictMessage: String =
s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Parquet schema:
|${parquetSchema.prettyJson}
""".stripMargin

val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)

assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)

val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap

val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))

StructType(metastoreSchema.zip(reorderedParquetSchema).map {
// Uses Parquet field names but retains Metastore data types.
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
mSchema.copy(name = pSchema.name)
case _ =>
throw new SparkException(schemaConflictMessage)
})
}

/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}

/**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,25 @@ object SQLConf {
.longConf
.createWithDefault(250 * 1024 * 1024)

object HiveCaseSensitiveInferenceMode extends Enumeration {
val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
}

val HIVE_CASE_SENSITIVE_INFERENCE = SQLConfigBuilder("spark.sql.hive.caseSensitiveInferenceMode")
.doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
"table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
"formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
"any table backed by files containing case-sensitive field names or queries may not return " +
"accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
"case-sensitive schema from the underlying data files and write it back to the table " +
"properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
"properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
"instead of inferring).")
.stringConf
.transform(_.toUpperCase())
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
Expand Down Expand Up @@ -723,6 +742,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)

def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

test("merge with metastore schema") {
// Field type conflict resolution
assertResult(
StructType(Seq(
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),

StructType(Seq(
StructField("lowerCase", BinaryType),
StructField("UPPERCase", IntegerType, nullable = true))))
}

// MetaStore schema is subset of parquet schema
assertResult(
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),

StructType(Seq(
StructField("lowerCase", BinaryType),
StructField("UPPERCase", IntegerType, nullable = true))))
}

// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),

StructType(Seq(
StructField("UPPERCase", IntegerType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))

// Conflicting non-nullable field names
intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
}

test("merge missing nullable fields from Metastore schema") {
// Standard case: Metastore schema contains additional nullable fields not present
// in the Parquet file schema.
assertResult(
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}

// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = false))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
}

test("schema merging failure error message") {
import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
}
}

override def getTable(db: String, table: String): CatalogTable = withClient {
restoreTableMetadata(getRawTable(db, table))
}
Expand Down Expand Up @@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
"from Hive metastore which is not case preserving.")
hiveTable
hiveTable.copy(schemaPreservesCase = false)
}
} else {
hiveTable
hiveTable.copy(schemaPreservesCase = false)
}
}

Expand Down
Loading