Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression

import org.apache.spark.sql.types.StructType

/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
Expand Down Expand Up @@ -104,6 +104,19 @@ abstract class ExternalCatalog {
*/
def alterTable(tableDefinition: CatalogTable): Unit

/**
* Alter the schema of a table identified by the provided database and table name. The new schema
* should still contain the existing bucket columns and partition columns used by the table. This
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
* as the schema itself).
*
* @param db Database that table to alter schema for exists in
* @param table Name of table to alter schema for
* @param schema Updated schema to be used for the table (must contain existing partition and
* bucket columns)
*/
def alterTableSchema(db: String, table: String, schema: StructType): Unit

def getTable(db: String, table: String): CatalogTable

def getTableOption(db: String, table: String): Option[CatalogTable]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.StructType

/**
* An in-memory (ephemeral) implementation of the system catalog.
Expand Down Expand Up @@ -297,6 +298,15 @@ class InMemoryCatalog(
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}

override def alterTableSchema(
db: String,
table: String,
schema: StructType): Unit = synchronized {
requireTableExists(db, table)
val origTable = catalog(db).tables(table).table
catalog(db).tables(table).table = origTable.copy(schema = schema)
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
requireTableExists(db, table)
catalog(db).tables(table).table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
* @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
* configured.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -180,7 +185,8 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we create a special table property? object CatalogTable defines some specify properties for view and we can follow it. If we keeps adding more parameters, we may blow up the CatalogTable one day...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and when we fix the schema and try to write it back, remember to remove this property first.

Copy link
Author

@budde budde Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered taking this approach but I think adding this as a parameter to CatalogTable itself is more explicit and less flaky. I share your concern that adding more and more parameters to CatalogTable could make this less usable, especially since params like schemaPreservesCase really only matter when dealing with Hive tables.

However, I don't think dumping more and more parameters into properties is a great solution either. As you've pointed out, we would need to filter out the properties only used internally by Spark before writing them to the catalog. HiveExternalCatalog already filters out Spark SQL-specific properties from the CatalogTable returned by HiveClient. Adding additional internal properties would put us in a place where properties contains:

  • Actual properties key/value pairs returned from the Hive metastore table which should be preserved by HiveExternalCatalog and written back when altering the table.
  • Spark SQL-specific properties that are stored in the Hive metastore table but filtered out by HiveExternalCatalog when used by Spark internally. These properties must be restored before writing back.
  • Spark SQL internal-only properties that are added after reading the table from the metastore and must be removed before writing it.

Which isn't even to mention that we'll have to be serializing/deserializing this value to and from a (String, String) pair just to pass information between HiveExternalCatalog and HiveMetastoreCatalog.

I think that if CatalogTable ends up with too many datasource-specific internal parameters then maybe it makes more sense to introduce a new Map element, e.g. internalProperties, so these don't get mixed in with the table properties.


import CatalogTable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -240,6 +240,19 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}

test("alter table schema") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
val newSchema = StructType(Seq(
StructField("new_field_1", IntegerType),
StructField("new_field_2", StringType),
StructField("a", IntegerType),
StructField("b", StringType)))
catalog.alterTableSchema("db2", "tbl1", newSchema)
val newTbl1 = catalog.getTable("db2", "tbl1")
assert(newTbl1.schema == newSchema)
}

test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ class TreeNodeSuite extends SparkFunSuite {
"lastAccessTime" -> -1,
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))
"unsupportedFeatures" -> List.empty[String],
"schemaPreservesCase" -> JBool(true)))

// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,71 +475,6 @@ object ParquetFileFormat extends Logging {
}
}

/**
* Reconciles Hive Metastore case insensitivity issue and data type conflicts between Metastore
* schema and Parquet schema.
*
* Hive doesn't retain case information, while Parquet is case sensitive. On the other hand, the
* schema read from Parquet files may be incomplete (e.g. older versions of Parquet doesn't
* distinguish binary and string). This method generates a correct schema by merging Metastore
* schema data types and Parquet schema field names.
*/
def mergeMetastoreParquetSchema(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
def schemaConflictMessage: String =
s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
|${metastoreSchema.prettyJson}
|
|Parquet schema:
|${parquetSchema.prettyJson}
""".stripMargin

val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)

assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)

val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap

val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))

StructType(metastoreSchema.zip(reorderedParquetSchema).map {
// Uses Parquet field names but retains Metastore data types.
case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
mSchema.copy(name = pSchema.name)
case _ =>
throw new SparkException(schemaConflictMessage)
})
}

/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}

/**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,25 @@ object SQLConf {
.longConf
.createWithDefault(250 * 1024 * 1024)

object HiveCaseSensitiveInferenceMode extends Enumeration {
Copy link
Author

@budde budde Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more appropriate place I can put this Enumeration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can follow PARQUET_COMPRESSION and write the string literal directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to avoid using string literals. If we want to change the possible values for this param we would need to find each and every place the literal value is used and update it. I think this is too flaky and runs the risk of introducing bugs that will only be apparent at runtime. Expressing this as an enumeration gives us some level of type safety and at the very least will cause a compiler error if the possible values are changed and comparisons elsewhere in the code aren't updated.

I'm willing to remove the enumeration if it isn't consistent with Spark code practices but at the very least the possible values should be expressed as constants rather than literals.

val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
}

val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
.doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
"table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
"formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
"any table backed by files containing case-sensitive field names or queries may not return " +
"accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
"case-sensitive schema from the underlying data files and write it back to the table " +
"properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
"properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
"instead of inferring).")
.stringConf
.transform(_.toUpperCase())
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
Expand Down Expand Up @@ -792,6 +811,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)

def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,88 +368,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

test("merge with metastore schema") {
// Field type conflict resolution
assertResult(
StructType(Seq(
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),

StructType(Seq(
StructField("lowerCase", BinaryType),
StructField("UPPERCase", IntegerType, nullable = true))))
}

// MetaStore schema is subset of parquet schema
assertResult(
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {

ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),

StructType(Seq(
StructField("lowerCase", BinaryType),
StructField("UPPERCase", IntegerType, nullable = true))))
}

// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),

StructType(Seq(
StructField("UPPERCase", IntegerType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))

// Conflicting non-nullable field names
intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
}

test("merge missing nullable fields from Metastore schema") {
// Standard case: Metastore schema contains additional nullable fields not present
// in the Parquet file schema.
assertResult(
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}

// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetFileFormat.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = false))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
}

test("schema merging failure error message") {
import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,25 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do two copy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 606 we need to invoke tableMetaToTableProps() with a CatalogTable record containing the new table schema in order to generate the Hive table properties based on the new schema.

// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
}
}

override def getTable(db: String, table: String): CatalogTable = withClient {
restoreTableMetadata(getRawTable(db, table))
}
Expand Down Expand Up @@ -690,10 +709,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
"from Hive metastore which is not case preserving.")
hiveTable
hiveTable.copy(schemaPreservesCase = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not only about case-preserving, maybe we should leave it unchanged

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we are discarding the schema obtained from the table properties and explicitly falling back to using the case-insenstive schema obtained from the metastore. schemaPreservesCase needs to be set to false here for the same reason it does at line 702.

}
} else {
hiveTable
hiveTable.copy(schemaPreservesCase = false)
}
}

Expand Down
Loading