From 3c992a9eb39e3258776e52d0524b8bc46bc3ee08 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 14 Jul 2016 00:08:43 -0700 Subject: [PATCH 01/13] fix. --- .../apache/spark/sql/catalog/Catalog.scala | 3 + .../command/createDataSourceTables.scala | 113 +++++++++++------- .../spark/sql/execution/command/ddl.scala | 4 + .../spark/sql/internal/CatalogImpl.scala | 39 +++++- .../sql/hive/MetastoreDataSourcesSuite.scala | 10 +- 5 files changed, 123 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 91ed9b3258a12..38f7fd5e07870 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -222,6 +222,9 @@ abstract class Catalog { * If this table is cached as an InMemoryRelation, drop the original cached version and make the * new version cached lazily. * + * If the table's schema is inferred at runtime, infer the schema again and update the schema + * in the external catalog. + * * @since 2.0.0 */ def refreshTable(tableName: String): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index c38eca5156e5a..41240e675545b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand( userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], - partitionColumns: Array[String], + userSpecifiedPartitionColumns: Array[String], bucketSpec: Option[BucketSpec], ignoreIfExists: Boolean, managedIfNoPath: Boolean) @@ -95,17 +95,37 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. - DataSource( - sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation(checkPathExist = false) + val dataSource: HadoopFsRelation = + DataSource( + sparkSession = sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + className = provider, + bucketSpec = None, + options = optionsWithPath) + .resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] + + val partitionColumns = + if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simply ignore them and provide a warning message. + logWarning( + s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will " + + "be ignored.") + dataSource.partitionSchema.fieldNames + } else { + userSpecifiedPartitionColumns + } + + val schemaType = if (userSpecifiedSchema.isEmpty) SchemaType.INFERRED else SchemaType.USER CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, - userSpecifiedSchema = userSpecifiedSchema, + schema = dataSource.schema, + schemaType = schemaType, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -256,7 +276,8 @@ case class CreateDataSourceTableAsSelectCommand( CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, - userSpecifiedSchema = Some(result.schema), + schema = result.schema, + schemaType = SchemaType.USER, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -270,6 +291,11 @@ case class CreateDataSourceTableAsSelectCommand( } } +case class SchemaType private(name: String) +object SchemaType { + val USER = new SchemaType("USER") + val INFERRED = new SchemaType("INFERRED") +} object CreateDataSourceTableUtils extends Logging { @@ -279,6 +305,7 @@ object CreateDataSourceTableUtils extends Logging { val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." + val DATASOURCE_SCHEMA_TYPE = DATASOURCE_SCHEMA_PREFIX + "type" val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" @@ -303,10 +330,36 @@ object CreateDataSourceTableUtils extends Logging { matcher.matches() } + def createTablePropertiesForSchema( + sparkSession: SparkSession, + schema: StructType, + partitionColumns: Array[String], + tableProperties: mutable.HashMap[String, String]): Unit = { + // Saves the schema. Serialized JSON schema string may be too long to be stored into a single + // metastore SerDe property. In this case, we split the JSON string and store each part as + // a separate table property. + val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold + val schemaJsonString = schema.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) + } + + if (partitionColumns.length > 0) { + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) + partitionColumns.zipWithIndex.foreach { case (partCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) + } + } + } + def createDataSourceTable( sparkSession: SparkSession, tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], + schema: StructType, + schemaType: SchemaType, partitionColumns: Array[String], bucketSpec: Option[BucketSpec], provider: String, @@ -314,29 +367,15 @@ object CreateDataSourceTableUtils extends Logging { isExternal: Boolean): Unit = { val tableProperties = new mutable.HashMap[String, String] tableProperties.put(DATASOURCE_PROVIDER, provider) + tableProperties.put(DATASOURCE_SCHEMA_TYPE, schemaType.name) - // Saves optional user specified schema. Serialized JSON schema string may be too long to be - // stored into a single metastore SerDe property. In this case, we split the JSON string and - // store each part as a separate SerDe property. - userSpecifiedSchema.foreach { schema => - val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold - val schemaJsonString = schema.json - // Split the JSON string. - val parts = schemaJsonString.grouped(threshold).toSeq - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) - parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) - } - } - - if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) { - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) - partitionColumns.zipWithIndex.foreach { case (partCol, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) - } - } + createTablePropertiesForSchema( + sparkSession, + schema, + partitionColumns, + tableProperties) - if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) { + if (bucketSpec.isDefined) { val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) @@ -353,16 +392,6 @@ object CreateDataSourceTableUtils extends Logging { } } - if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + - s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") - } - val tableType = if (isExternal) { tableProperties.put("EXTERNAL", "TRUE") CatalogTableType.EXTERNAL @@ -375,7 +404,7 @@ object CreateDataSourceTableUtils extends Logging { val dataSource = DataSource( sparkSession, - userSpecifiedSchema = userSpecifiedSchema, + userSpecifiedSchema = Some(schema), partitionColumns = partitionColumns, bucketSpec = bucketSpec, className = provider, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 226f61ef404ae..ee6dd7c54d9e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -485,6 +485,10 @@ object DDLUtils { isDatasourceTable(table.properties) } + def isSchemaInferred(table: CatalogTable): Boolean = { + table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name) + } + /** * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view, * issue an exception [[AnalysisException]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 44babcc93a1de..f358716909ca2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -27,7 +28,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifie import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.datasources.CreateTableUsing +import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils} +import org.apache.spark.sql.execution.datasources.{CreateTableUsing, DataSource, HadoopFsRelation} import org.apache.spark.sql.types.StructType @@ -346,6 +348,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty } + /** + * Refresh the inferred schema stored in the external catalog for data source tables. + */ + private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = { + val table = sessionCatalog.getTableMetadataOption(tableIdent) + table.foreach { tableDesc => + if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) { + val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc) + val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc) + val dataSource = + DataSource( + sparkSession, + userSpecifiedSchema = None, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), + options = tableDesc.storage.serdeProperties) + .resolveRelation().asInstanceOf[HadoopFsRelation] + + // TODO: If we do not remove them, the table properties might contain useless part. + // Should we remove them? + // val tableProperties = tableDesc.properties.filterKeys { k => + // k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS && + // !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) } + val properties = new mutable.HashMap[String, String] + CreateDataSourceTableUtils.createTablePropertiesForSchema( + sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, properties) + + val newTable = tableDesc.copy(properties = tableDesc.properties ++ properties) + sessionCatalog.alterTable(newTable) + } + } + } + /** * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata * is refreshed. @@ -355,6 +391,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + refreshInferredSchema(tableIdent) sessionCatalog.refreshTable(tableIdent) // If this table is cached as an InMemoryRelation, drop the original diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 12d250d4fb604..b80486e602807 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.execution.command.SchemaType import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -703,7 +704,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("wide_schema"), - userSpecifiedSchema = Some(schema), + schema = schema, + schemaType = SchemaType.USER, partitionColumns = Array.empty[String], bucketSpec = None, provider = "json", @@ -988,7 +990,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("not_skip_hive_metadata"), - userSpecifiedSchema = Some(schema), + schema = schema, + schemaType = SchemaType.USER, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", @@ -1003,7 +1006,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("skip_hive_metadata"), - userSpecifiedSchema = Some(schema), + schema = schema, + schemaType = SchemaType.USER, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", From 3be0dc0b7cfd942459c598c0d35f3d67a2c020ba Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 14 Jul 2016 12:19:40 -0700 Subject: [PATCH 02/13] fix. --- .../command/createDataSourceTables.scala | 16 +++++----- .../spark/sql/internal/CatalogImpl.scala | 30 ++++++++++++------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 41240e675545b..36ae932d95277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -330,12 +330,16 @@ object CreateDataSourceTableUtils extends Logging { matcher.matches() } - def createTablePropertiesForSchema( + /** + * Saves the schema (including partition info) into the table properties. + * Overwrites the schema, if already existed. + */ + def saveSchema( sparkSession: SparkSession, schema: StructType, partitionColumns: Array[String], tableProperties: mutable.HashMap[String, String]): Unit = { - // Saves the schema. Serialized JSON schema string may be too long to be stored into a single + // Serialized JSON schema string may be too long to be stored into a single // metastore SerDe property. In this case, we split the JSON string and store each part as // a separate table property. val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold @@ -367,13 +371,9 @@ object CreateDataSourceTableUtils extends Logging { isExternal: Boolean): Unit = { val tableProperties = new mutable.HashMap[String, String] tableProperties.put(DATASOURCE_PROVIDER, provider) - tableProperties.put(DATASOURCE_SCHEMA_TYPE, schemaType.name) - createTablePropertiesForSchema( - sparkSession, - schema, - partitionColumns, - tableProperties) + tableProperties.put(DATASOURCE_SCHEMA_TYPE, schemaType.name) + saveSchema(sparkSession, schema, partitionColumns, tableProperties) if (bucketSpec.isDefined) { val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 47663118f7b05..4a613c152be84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -371,16 +371,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { options = tableDesc.storage.serdeProperties) .resolveRelation().asInstanceOf[HadoopFsRelation] - // TODO: If we do not remove them, the table properties might contain useless part. - // Should we remove them? - // val tableProperties = tableDesc.properties.filterKeys { k => - // k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS && - // !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) } - val properties = new mutable.HashMap[String, String] - CreateDataSourceTableUtils.createTablePropertiesForSchema( - sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, properties) - - val newTable = tableDesc.copy(properties = tableDesc.properties ++ properties) + val schemaProperties = new mutable.HashMap[String, String] + CreateDataSourceTableUtils.saveSchema( + sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties) + + val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k => + // Keep the properties that are not for schema or partition columns + k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS && + !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) && + k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS && + !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX) } + + val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties) + + // Alter the schema-related table properties that are stored in external catalog. sessionCatalog.alterTable(newTable) } } @@ -395,7 +399,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + // Refresh the schema in external catalog, if it is a data source table whose schema is inferred + // at runtime. For user-specified schema, we do not infer and update the schema. + // TODO: Support column-related ALTER TABLE DDL commands, and then users can update + // the user-specified schema. refreshInferredSchema(tableIdent) + // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively. + // Non-temp tables: refresh the metadata cache. sessionCatalog.refreshTable(tableIdent) // If this table is cached as an InMemoryRelation, drop the original From c6afbbb9941113d6a78bfd3aaa627653ba0f6151 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 00:16:09 -0700 Subject: [PATCH 03/13] address comments and added test cases --- .../command/createDataSourceTables.scala | 40 +++--- .../spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala | 23 ++-- .../spark/sql/internal/CatalogImpl.scala | 20 ++- .../sql/execution/command/DDLSuite.scala | 127 +++++++++++++++++- .../sql/hive/MetastoreDataSourcesSuite.scala | 7 +- 6 files changed, 176 insertions(+), 43 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 36ae932d95277..a15ee6896b656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -104,28 +104,30 @@ case class CreateDataSourceTableCommand( options = optionsWithPath) .resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] + if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simply ignore them and provide a warning message. + logWarning( + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + + s"ignored. The schema and partition columns of table $tableIdent are inferred. " + + s"Schema: ${dataSource.schema.simpleString}; " + + s"Partition columns: ${dataSource.partitionSchema.fieldNames}") + } + val partitionColumns = - if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + - s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will " + - "be ignored.") + if (userSpecifiedSchema.isEmpty) { dataSource.partitionSchema.fieldNames } else { userSpecifiedPartitionColumns } - val schemaType = if (userSpecifiedSchema.isEmpty) SchemaType.INFERRED else SchemaType.USER - CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, schema = dataSource.schema, - schemaType = schemaType, + isSchemaInferred = userSpecifiedSchema.isEmpty, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -277,7 +279,7 @@ case class CreateDataSourceTableAsSelectCommand( sparkSession = sparkSession, tableIdent = tableIdent, schema = result.schema, - schemaType = SchemaType.USER, + isSchemaInferred = false, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -291,12 +293,6 @@ case class CreateDataSourceTableAsSelectCommand( } } -case class SchemaType private(name: String) -object SchemaType { - val USER = new SchemaType("USER") - val INFERRED = new SchemaType("INFERRED") -} - object CreateDataSourceTableUtils extends Logging { val DATASOURCE_PREFIX = "spark.sql.sources." @@ -305,7 +301,7 @@ object CreateDataSourceTableUtils extends Logging { val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_TYPE = DATASOURCE_SCHEMA_PREFIX + "type" + val DATASOURCE_SCHEMA_ISINFERRED = DATASOURCE_SCHEMA_PREFIX + "isInferred" val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" @@ -363,7 +359,7 @@ object CreateDataSourceTableUtils extends Logging { sparkSession: SparkSession, tableIdent: TableIdentifier, schema: StructType, - schemaType: SchemaType, + isSchemaInferred: Boolean, partitionColumns: Array[String], bucketSpec: Option[BucketSpec], provider: String, @@ -372,7 +368,7 @@ object CreateDataSourceTableUtils extends Logging { val tableProperties = new mutable.HashMap[String, String] tableProperties.put(DATASOURCE_PROVIDER, provider) - tableProperties.put(DATASOURCE_SCHEMA_TYPE, schemaType.name) + tableProperties.put(DATASOURCE_SCHEMA_ISINFERRED, isSchemaInferred.toString.toUpperCase) saveSchema(sparkSession, schema, partitionColumns, tableProperties) if (bucketSpec.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f37948062f482..65febc461eb6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -488,7 +488,7 @@ object DDLUtils { } def isSchemaInferred(table: CatalogTable): Boolean = { - table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name) + table.properties.get(DATASOURCE_SCHEMA_ISINFERRED) == Option(true.toString.toUpperCase) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6651c33a3a9d0..f2024ddcfdcd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -413,15 +413,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } else { val metadata = catalog.getTableMetadata(table) - if (DDLUtils.isDatasourceTable(metadata)) { - DDLUtils.getSchemaFromTableProperties(metadata) match { - case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result) - case None => describeSchema(catalog.lookupRelation(table).schema, result) - } - } else { - describeSchema(metadata.schema, result) - } - + describeSchema(metadata, result) if (isExtended) { describeExtended(metadata, result) } else if (isFormatted) { @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeSchema( + tableDesc: CatalogTable, + buffer: ArrayBuffer[Row]): Unit = { + if (DDLUtils.isDatasourceTable(tableDesc)) { + DDLUtils.getSchemaFromTableProperties(tableDesc) match { + case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, buffer) + case None => append(buffer, "# Schema of this table is inferred at runtime", "", "") + } + } else { + describeSchema(tableDesc.schema, buffer) + } + } + private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 4a613c152be84..ed74362f7e7e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -375,12 +375,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CreateDataSourceTableUtils.saveSchema( sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties) + def isPropertyForInferredSchema(key: String): Boolean = { + key match { + case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS => true + case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS => true + case _ + if key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) || + key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX) + => true + case _ => false + } + } + + // Keep the properties that are not for schema or partition columns val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k => - // Keep the properties that are not for schema or partition columns - k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS && - !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) && - k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS && - !k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX) } + !isPropertyForInferredSchema(k) + } val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 169250d9bb1c2..1ef7acb195d02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.config._ -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { @@ -252,6 +252,129 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create data source table with partitioning columns but no schema") { + import testImplicits._ + + val tabName = "tab1" + withTempPath { dir => + val pathToPartitionedTable = new File(dir, "partitioned") + val pathToNonPartitionedTable = new File(dir, "nonPartitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + + Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val catalog = spark.sessionState.catalog + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + + if (tableMetadata.storage.serdeProperties.get("path") == + Option(pathToPartitionedTable.getCanonicalPath)) { + assert(partCols == Seq("num")) + assert(tableSchema == + Option(StructType(StructField("str", StringType, nullable = true) :: + StructField("num", IntegerType, nullable = true) :: Nil))) + } else { + assert(partCols.isEmpty) + assert(tableSchema == + Option(StructType(StructField("num", IntegerType, nullable = true) :: + StructField("str", StringType, nullable = true) :: Nil))) + } + } + } + } + } + + test("Refresh table after changing the data source table partitioning") { + import testImplicits._ + + val tabName = "tab1" + val catalog = spark.sessionState.catalog + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i)) + .toDF("col1", "col2", "col3", "col4") + df.write.format("json").partitionBy("col1", "col3").save(path) + val schema = StructType( + StructField("col2", StringType, nullable = true) :: + StructField("col4", LongType, nullable = true) :: + StructField("col1", IntegerType, nullable = true) :: + StructField("col3", IntegerType, nullable = true) :: Nil) + val partitionCols = Seq("col1", "col3") + + // Ensure the schema is split to multiple properties. + withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "500") { + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING json + |OPTIONS ( + | path '$path' + |) + """.stripMargin) + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema == Option(schema)) + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(partCols == partitionCols) + + // Change the schema + val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) + .toDF("newCol1", "newCol2") + newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path) + val newSchema = StructType( + StructField("newCol2", StringType, nullable = true) :: + StructField("newCol1", IntegerType, nullable = true) :: Nil) + + // No change on the schema + val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaBeforeRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) + assert(tableSchemaBeforeRefresh == Option(schema)) + val partColsBeforeRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) + assert(partColsBeforeRefresh == partitionCols) + + // Refresh + spark.catalog.refreshTable(tabName) + + val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaAfterRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) + assert(tableSchemaAfterRefresh == Option(newSchema)) + val partColsAfterRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) + assert(partColsAfterRefresh == Seq("newCol1")) + + // Refresh after no change + spark.catalog.refreshTable(tabName) + + val tableMetadataNoChangeAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaNoChangeAfterRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataNoChangeAfterRefresh) + assert(tableSchemaNoChangeAfterRefresh == Option(newSchema)) + val partColsNoChangeAfterRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataNoChangeAfterRefresh) + assert(partColsNoChangeAfterRefresh == Seq("newCol1")) + } + } + } + } + test("desc table for parquet data source table using in-memory catalog") { assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") val tabName = "tab1" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b80486e602807..c558c1b87ac9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.command.SchemaType import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -705,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("wide_schema"), schema = schema, - schemaType = SchemaType.USER, + isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "json", @@ -991,7 +990,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("not_skip_hive_metadata"), schema = schema, - schemaType = SchemaType.USER, + isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", @@ -1007,7 +1006,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("skip_hive_metadata"), schema = schema, - schemaType = SchemaType.USER, + isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", From 55c2c5e2623478a79971af3b0513727b03c1ee87 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 00:22:03 -0700 Subject: [PATCH 04/13] updated the test case --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 1ef7acb195d02..4cd0779e3eb73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -316,7 +316,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val partitionCols = Seq("col1", "col3") // Ensure the schema is split to multiple properties. - withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "500") { + withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "1") { withTable(tabName) { spark.sql( s""" From a043ca28fc06082bc8b4104d9b38f2fbf1aa337a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 11:44:51 -0700 Subject: [PATCH 05/13] address comments --- .../apache/spark/sql/catalog/Catalog.scala | 3 - .../command/createDataSourceTables.scala | 42 ++++--------- .../spark/sql/execution/command/ddl.scala | 4 -- .../spark/sql/internal/CatalogImpl.scala | 59 +------------------ .../sql/execution/command/DDLSuite.scala | 20 +------ .../sql/hive/MetastoreDataSourcesSuite.scala | 7 +-- 6 files changed, 20 insertions(+), 115 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index b42289b412728..1aed245fdd332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -223,9 +223,6 @@ abstract class Catalog { * If this table is cached as an InMemoryRelation, drop the original cached version and make the * new version cached lazily. * - * If the table's schema is inferred at runtime, infer the schema again and update the schema - * in the external catalog. - * * @since 2.0.0 */ def refreshTable(tableName: String): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index a15ee6896b656..11892bf0b3b64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -127,7 +127,6 @@ case class CreateDataSourceTableCommand( sparkSession = sparkSession, tableIdent = tableIdent, schema = dataSource.schema, - isSchemaInferred = userSpecifiedSchema.isEmpty, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -279,7 +278,6 @@ case class CreateDataSourceTableAsSelectCommand( sparkSession = sparkSession, tableIdent = tableIdent, schema = result.schema, - isSchemaInferred = false, partitionColumns = partitionColumns, bucketSpec = bucketSpec, provider = provider, @@ -293,6 +291,7 @@ case class CreateDataSourceTableAsSelectCommand( } } + object CreateDataSourceTableUtils extends Logging { val DATASOURCE_PREFIX = "spark.sql.sources." @@ -301,7 +300,6 @@ object CreateDataSourceTableUtils extends Logging { val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_ISINFERRED = DATASOURCE_SCHEMA_PREFIX + "isInferred" val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" @@ -326,18 +324,21 @@ object CreateDataSourceTableUtils extends Logging { matcher.matches() } - /** - * Saves the schema (including partition info) into the table properties. - * Overwrites the schema, if already existed. - */ - def saveSchema( + def createDataSourceTable( sparkSession: SparkSession, + tableIdent: TableIdentifier, schema: StructType, partitionColumns: Array[String], - tableProperties: mutable.HashMap[String, String]): Unit = { - // Serialized JSON schema string may be too long to be stored into a single - // metastore SerDe property. In this case, we split the JSON string and store each part as - // a separate table property. + bucketSpec: Option[BucketSpec], + provider: String, + options: Map[String, String], + isExternal: Boolean): Unit = { + val tableProperties = new mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Saves optional user specified schema. Serialized JSON schema string may be too long to be + // stored into a single metastore SerDe property. In this case, we split the JSON string and + // store each part as a separate SerDe property. val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold val schemaJsonString = schema.json // Split the JSON string. @@ -353,23 +354,6 @@ object CreateDataSourceTableUtils extends Logging { tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) } } - } - - def createDataSourceTable( - sparkSession: SparkSession, - tableIdent: TableIdentifier, - schema: StructType, - isSchemaInferred: Boolean, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - val tableProperties = new mutable.HashMap[String, String] - tableProperties.put(DATASOURCE_PROVIDER, provider) - - tableProperties.put(DATASOURCE_SCHEMA_ISINFERRED, isSchemaInferred.toString.toUpperCase) - saveSchema(sparkSession, schema, partitionColumns, tableProperties) if (bucketSpec.isDefined) { val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 65febc461eb6b..a3a057a5628fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -487,10 +487,6 @@ object DDLUtils { isDatasourceTable(table.properties) } - def isSchemaInferred(table: CatalogTable): Boolean = { - table.properties.get(DATASOURCE_SCHEMA_ISINFERRED) == Option(true.toString.toUpperCase) - } - /** * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view, * issue an exception [[AnalysisException]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index ed74362f7e7e9..a516ea122e89f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.internal import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -28,8 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifie import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils} -import org.apache.spark.sql.execution.datasources.{CreateTableUsing, DataSource, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.CreateTableUsing import org.apache.spark.sql.types.StructType @@ -352,68 +350,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty } - /** - * Refresh the inferred schema stored in the external catalog for data source tables. - */ - private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = { - val table = sessionCatalog.getTableMetadataOption(tableIdent) - table.foreach { tableDesc => - if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) { - val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc) - val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc) - val dataSource = - DataSource( - sparkSession, - userSpecifiedSchema = None, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), - options = tableDesc.storage.serdeProperties) - .resolveRelation().asInstanceOf[HadoopFsRelation] - - val schemaProperties = new mutable.HashMap[String, String] - CreateDataSourceTableUtils.saveSchema( - sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties) - - def isPropertyForInferredSchema(key: String): Boolean = { - key match { - case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS => true - case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS => true - case _ - if key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) || - key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX) - => true - case _ => false - } - } - - // Keep the properties that are not for schema or partition columns - val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k => - !isPropertyForInferredSchema(k) - } - - val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties) - - // Alter the schema-related table properties that are stored in external catalog. - sessionCatalog.alterTable(newTable) - } - } - } - /** * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata - * is refreshed. + * is refreshed. For data source tables, the schema will not be inferred and refreshed. * * @group cachemgmt * @since 2.0.0 */ override def refreshTable(tableName: String): Unit = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - // Refresh the schema in external catalog, if it is a data source table whose schema is inferred - // at runtime. For user-specified schema, we do not infer and update the schema. - // TODO: Support column-related ALTER TABLE DDL commands, and then users can update - // the user-specified schema. - refreshInferredSchema(tableIdent) // Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively. // Non-temp tables: refresh the metadata cache. sessionCatalog.refreshTable(tableIdent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4cd0779e3eb73..d2f38da0d5e61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -336,9 +336,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) .toDF("newCol1", "newCol2") newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path) - val newSchema = StructType( - StructField("newCol2", StringType, nullable = true) :: - StructField("newCol1", IntegerType, nullable = true) :: Nil) // No change on the schema val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) @@ -349,27 +346,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) assert(partColsBeforeRefresh == partitionCols) - // Refresh + // Refresh does not affect the schema spark.catalog.refreshTable(tabName) val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) val tableSchemaAfterRefresh = DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) - assert(tableSchemaAfterRefresh == Option(newSchema)) + assert(tableSchemaAfterRefresh == Option(schema)) val partColsAfterRefresh = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) - assert(partColsAfterRefresh == Seq("newCol1")) - - // Refresh after no change - spark.catalog.refreshTable(tabName) - - val tableMetadataNoChangeAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchemaNoChangeAfterRefresh = - DDLUtils.getSchemaFromTableProperties(tableMetadataNoChangeAfterRefresh) - assert(tableSchemaNoChangeAfterRefresh == Option(newSchema)) - val partColsNoChangeAfterRefresh = - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataNoChangeAfterRefresh) - assert(partColsNoChangeAfterRefresh == Seq("newCol1")) + assert(partColsAfterRefresh == partitionCols) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c558c1b87ac9b..ec36bd8f34ab9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -191,10 +191,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("REFRESH TABLE jsonTable") - // Check that the refresh worked + // After refresh, schema is not changed. checkAnswer( sql("SELECT * FROM jsonTable"), - Row("a1", "b1", "c1")) + Row("a1", "b1")) } } } @@ -704,7 +704,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("wide_schema"), schema = schema, - isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "json", @@ -990,7 +989,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("not_skip_hive_metadata"), schema = schema, - isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", @@ -1006,7 +1004,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession = spark, tableIdent = TableIdentifier("skip_hive_metadata"), schema = schema, - isSchemaInferred = false, partitionColumns = Array.empty[String], bucketSpec = None, provider = "parquet", From e93081918b170d3fbd08d992ef251c83af9e433d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 19:32:17 -0700 Subject: [PATCH 06/13] address comments --- .../command/createDataSourceTables.scala | 26 +++++++++++-------- .../spark/sql/execution/command/tables.scala | 2 +- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 11892bf0b3b64..a6e8ecd2d544e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.HiveSerDe -import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} import org.apache.spark.sql.types._ /** @@ -95,14 +95,25 @@ case class CreateDataSourceTableCommand( } // Create the relation to validate the arguments before writing the metadata to the metastore. - val dataSource: HadoopFsRelation = + val dataSource: BaseRelation = DataSource( sparkSession = sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = provider, bucketSpec = None, options = optionsWithPath) - .resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation] + .resolveRelation(checkPathExist = false) + + val partitionColumns = + dataSource match { + case r: HadoopFsRelation => + if (userSpecifiedSchema.isEmpty) { + r.partitionSchema.fieldNames + } else { + userSpecifiedPartitionColumns + } + case _ => Array.empty[String] + } if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) { // The table does not have a specified schema, which means that the schema will be inferred @@ -113,16 +124,9 @@ case class CreateDataSourceTableCommand( s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + s"ignored. The schema and partition columns of table $tableIdent are inferred. " + s"Schema: ${dataSource.schema.simpleString}; " + - s"Partition columns: ${dataSource.partitionSchema.fieldNames}") + s"Partition columns: $partitionColumns") } - val partitionColumns = - if (userSpecifiedSchema.isEmpty) { - dataSource.partitionSchema.fieldNames - } else { - userSpecifiedPartitionColumns - } - CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, tableIdent = tableIdent, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f2024ddcfdcd6..4ab38e65f6bca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -516,7 +516,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF if (DDLUtils.isDatasourceTable(tableDesc)) { DDLUtils.getSchemaFromTableProperties(tableDesc) match { case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, buffer) - case None => append(buffer, "# Schema of this table is inferred at runtime", "", "") + case None => append(buffer, "# Schema of this table is corrupted", "", "") } } else { describeSchema(tableDesc.schema, buffer) From 727ecf87463d6fe02cd29e0bbf3f488c043b1962 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 22:17:10 -0700 Subject: [PATCH 07/13] address comments --- .../command/createDataSourceTables.scala | 2 +- .../spark/sql/execution/command/ddl.scala | 34 ++--- .../spark/sql/execution/command/tables.scala | 15 +-- .../datasources/DataSourceStrategy.scala | 4 +- .../sql/execution/command/DDLSuite.scala | 124 ++++++++++++------ 5 files changed, 112 insertions(+), 67 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index a6e8ecd2d544e..5c0dd5fe79817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -238,7 +238,7 @@ case class CreateDataSourceTableAsSelectCommand( } existingSchema = Some(l.schema) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata) + existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata)) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a3a057a5628fe..b98c3a5429ced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -522,31 +522,31 @@ object DDLUtils { table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } - // A persisted data source table may not store its schema in the catalog. In this case, its schema - // will be inferred at runtime when the table is referenced. - def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { + // A persisted data source table always store its schema in the catalog. + def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { require(isDatasourceTable(metadata)) + val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties if (props.isDefinedAt(DATASOURCE_SCHEMA)) { // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType]) - } else { - metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => - val parts = (0 until numParts.toInt).map { index => - val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException( - "Could not read schema from the metastore because it is corrupted " + - s"(missing part $index of the schema, $numParts parts are expected).") - } - - part + val schema = props.get(DATASOURCE_SCHEMA).get + DataType.fromJson(schema).asInstanceOf[StructType] + } else if (props.isDefinedAt(DATASOURCE_SCHEMA_NUMPARTS)) { + val numParts = props.get(DATASOURCE_SCHEMA_NUMPARTS).get + val parts = (0 until numParts.toInt).map { index => + val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull + if (part == null) { + throw new AnalysisException(msgSchemaCorrupted + + s" (missing part $index of the schema, $numParts parts are expected).") } - // Stick all parts back to a single schema string. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] + part } + // Stick all parts back to a single schema string. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] + } else { + throw new AnalysisException(msgSchemaCorrupted) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 4ab38e65f6bca..3da8a74316a8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -514,10 +514,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF tableDesc: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { if (DDLUtils.isDatasourceTable(tableDesc)) { - DDLUtils.getSchemaFromTableProperties(tableDesc) match { - case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, buffer) - case None => append(buffer, "# Schema of this table is corrupted", "", "") - } + val schema = DDLUtils.getSchemaFromTableProperties(tableDesc) + describeSchema(schema, buffer) } else { describeSchema(tableDesc.schema, buffer) } @@ -884,12 +882,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showDataSourceTableDataColumns( metadata: CatalogTable, builder: StringBuilder): Unit = { - DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema => - val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") - builder ++= columns.mkString("(", ", ", ")") - } - - builder ++= "\n" + val schema = DDLUtils.getSchemaFromTableProperties(metadata) + val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") + builder ++= columns.mkString("(", ", ", ")\n") } private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0841636d3309f..1d7b9f8a90088 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -205,7 +205,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi */ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { - val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) + val schema = DDLUtils.getSchemaFromTableProperties(table) // We only need names at here since userSpecifiedSchema we loaded from the metastore // contains partition columns. We can always get datatypes of partitioning columns @@ -218,7 +218,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ val dataSource = DataSource( sparkSession, - userSpecifiedSchema = userSpecifiedSchema, + userSpecifiedSchema = Some(schema), partitionColumns = partitionColumns, bucketSpec = bucketSpec, className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d2f38da0d5e61..d7f2e503f025d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -252,48 +252,98 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("Create data source table with partitioning columns but no schema") { + test("Create partitioned data source table with partitioning columns but no schema") { import testImplicits._ - val tabName = "tab1" withTempPath { dir => val pathToPartitionedTable = new File(dir, "partitioned") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + val tabName = "tab1" + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$pathToPartitionedTable' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema == + StructType(StructField("str", StringType, nullable = true) :: + StructField("num", IntegerType, nullable = true) :: Nil)) + + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(partCols == Seq("num")) + } + } + } + + test("Create non-partitioned data source table with partitioning columns but no schema") { + import testImplicits._ + + withTempPath { dir => val pathToNonPartitionedTable = new File(dir, "nonPartitioned") val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) - df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + val tabName = "tab1" + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$pathToNonPartitionedTable' + |) + |PARTITIONED BY (inexistentColumns) + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) - Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path => - withTable(tabName) { - spark.sql( - s""" - |CREATE TABLE $tabName - |USING parquet - |OPTIONS ( - | path '$path' - |) - |PARTITIONED BY (inexistentColumns) - """.stripMargin) - val catalog = spark.sessionState.catalog - val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: + StructField("str", StringType, nullable = true) :: Nil)) - val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) - assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded") - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(partCols.isEmpty) + } + } + } - if (tableMetadata.storage.serdeProperties.get("path") == - Option(pathToPartitionedTable.getCanonicalPath)) { - assert(partCols == Seq("num")) - assert(tableSchema == - Option(StructType(StructField("str", StringType, nullable = true) :: - StructField("num", IntegerType, nullable = true) :: Nil))) - } else { - assert(partCols.isEmpty) - assert(tableSchema == - Option(StructType(StructField("num", IntegerType, nullable = true) :: - StructField("str", StringType, nullable = true) :: Nil))) - } - } + test("Describe Table with Corrupted Schema") { + import testImplicits._ + + val tabName = "tab1" + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2") + df.write.format("json").save(path) + + withTable(tabName) { + sql( + s""" + |CREATE TABLE $tabName + |USING json + |OPTIONS ( + | path '$path' + |) + """.stripMargin) + + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(TableIdentifier(tabName)) + val newProperties = table.properties.filterKeys(key => + key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) + val newTable = table.copy(properties = newProperties) + catalog.alterTable(newTable) + + val e = intercept[AnalysisException] { + sql(s"DESC $tabName") + }.getMessage + assert(e.contains(s"Could not read schema from the metastore because it is corrupted")) } } } @@ -328,7 +378,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin) val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) - assert(tableSchema == Option(schema)) + assert(tableSchema == schema) val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) assert(partCols == partitionCols) @@ -341,7 +391,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) val tableSchemaBeforeRefresh = DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) - assert(tableSchemaBeforeRefresh == Option(schema)) + assert(tableSchemaBeforeRefresh == schema) val partColsBeforeRefresh = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) assert(partColsBeforeRefresh == partitionCols) @@ -352,7 +402,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) val tableSchemaAfterRefresh = DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) - assert(tableSchemaAfterRefresh == Option(schema)) + assert(tableSchemaAfterRefresh == schema) val partColsAfterRefresh = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) assert(partColsAfterRefresh == partitionCols) @@ -522,7 +572,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == - Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + new StructType().add("a", IntegerType).add("b", IntegerType)) assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a")) } @@ -538,7 +588,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == - Some(new StructType().add("a", IntegerType).add("b", IntegerType))) + new StructType().add("a", IntegerType).add("b", IntegerType)) assert(DDLUtils.getBucketSpecFromTableProperties(table) == Some(BucketSpec(5, Seq("a"), Seq("b")))) } From 1ee1743906b41ffcc182cb8c74b4134bce8a3006 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Jul 2016 22:23:38 -0700 Subject: [PATCH 08/13] address comments --- .../sql/execution/command/createDataSourceTables.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5c0dd5fe79817..74a4150673055 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -101,8 +101,7 @@ case class CreateDataSourceTableCommand( userSpecifiedSchema = userSpecifiedSchema, className = provider, bucketSpec = None, - options = optionsWithPath) - .resolveRelation(checkPathExist = false) + options = optionsWithPath).resolveRelation(checkPathExist = false) val partitionColumns = dataSource match { @@ -341,8 +340,8 @@ object CreateDataSourceTableUtils extends Logging { tableProperties.put(DATASOURCE_PROVIDER, provider) // Saves optional user specified schema. Serialized JSON schema string may be too long to be - // stored into a single metastore SerDe property. In this case, we split the JSON string and - // store each part as a separate SerDe property. + // stored into a single metastore table property. In this case, we split the JSON string and + // store each part as a separate table property. val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold val schemaJsonString = schema.json // Split the JSON string. From b404eecfd69dd73124157b339d6d68939ad040aa Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Jul 2016 13:41:09 -0700 Subject: [PATCH 09/13] address comments --- .../command/createDataSourceTables.scala | 42 ++++----- .../spark/sql/execution/command/ddl.scala | 28 +++--- .../sql/execution/command/DDLSuite.scala | 92 ++++++++++--------- 3 files changed, 81 insertions(+), 81 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 74a4150673055..401a479802159 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -103,27 +103,25 @@ case class CreateDataSourceTableCommand( bucketSpec = None, options = optionsWithPath).resolveRelation(checkPathExist = false) - val partitionColumns = - dataSource match { - case r: HadoopFsRelation => - if (userSpecifiedSchema.isEmpty) { - r.partitionSchema.fieldNames - } else { - userSpecifiedPartitionColumns - } + val partitionColumns = if (userSpecifiedSchema.nonEmpty) { + userSpecifiedPartitionColumns + } else { + val res = dataSource match { + case r: HadoopFsRelation => r.partitionSchema.fieldNames case _ => Array.empty[String] } - - if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + - s"ignored. The schema and partition columns of table $tableIdent are inferred. " + - s"Schema: ${dataSource.schema.simpleString}; " + - s"Partition columns: $partitionColumns") + if (userSpecifiedPartitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simply ignore them and provide a warning message. + logWarning( + s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + + s"ignored. The schema and partition columns of table $tableIdent are inferred. " + + s"Schema: ${dataSource.schema.simpleString}; " + + s"Partition columns: $res") + } + res } CreateDataSourceTableUtils.createDataSourceTable( @@ -339,9 +337,9 @@ object CreateDataSourceTableUtils extends Logging { val tableProperties = new mutable.HashMap[String, String] tableProperties.put(DATASOURCE_PROVIDER, provider) - // Saves optional user specified schema. Serialized JSON schema string may be too long to be - // stored into a single metastore table property. In this case, we split the JSON string and - // store each part as a separate table property. + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold val schemaJsonString = schema.json // Split the JSON string. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b98c3a5429ced..ce37328b07b06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -527,26 +527,24 @@ object DDLUtils { require(isDatasourceTable(metadata)) val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." val props = metadata.properties - if (props.isDefinedAt(DATASOURCE_SCHEMA)) { + props.get(DATASOURCE_SCHEMA).map { schema => // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - val schema = props.get(DATASOURCE_SCHEMA).get DataType.fromJson(schema).asInstanceOf[StructType] - } else if (props.isDefinedAt(DATASOURCE_SCHEMA_NUMPARTS)) { - val numParts = props.get(DATASOURCE_SCHEMA_NUMPARTS).get - val parts = (0 until numParts.toInt).map { index => - val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException(msgSchemaCorrupted + - s" (missing part $index of the schema, $numParts parts are expected).") + } getOrElse { + props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull + if (part == null) { + throw new AnalysisException(msgSchemaCorrupted + + s" (missing part $index of the schema, $numParts parts are expected).") + } + part } - part - } - // Stick all parts back to a single schema string. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } else { - throw new AnalysisException(msgSchemaCorrupted) + // Stick all parts back to a single schema string. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] + } getOrElse(throw new AnalysisException(msgSchemaCorrupted)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d7f2e503f025d..42aa09df94e92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -252,63 +252,67 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("Create partitioned data source table with partitioning columns but no schema") { - import testImplicits._ - - withTempPath { dir => - val pathToPartitionedTable = new File(dir, "partitioned") - val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) - val tabName = "tab1" - withTable(tabName) { - spark.sql( - s""" - |CREATE TABLE $tabName - |USING parquet - |OPTIONS ( - | path '$pathToPartitionedTable' - |) - |PARTITIONED BY (inexistentColumns) - """.stripMargin) - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + private def createDataSourceTableWithoutSchema( + path: File, + partitionCols: Option[Seq[String]]): (StructType, Seq[String]) = { + var tableSchema = StructType(Nil) + var partCols = Seq.empty[String] - val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + val tabName = "tab1" + withTable(tabName) { + val partitionClause = + partitionCols.map(p => s"PARTITIONED BY ${p.mkString("(", ", ", ")")}").getOrElse("") + spark.sql( + s""" + |CREATE TABLE $tabName + |USING parquet + |OPTIONS ( + | path '$path' + |) + |$partitionClause + """.stripMargin) + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) + + tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + } + (tableSchema, partCols) + } + + test("Create partitioned data source table without schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns" :: Nil) + // Case 2: without schema and partitioning columns: None + Seq(Option("inexistentColumns" :: Nil), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => + df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + val (tableSchema, partCols) = + createDataSourceTableWithoutSchema(pathToPartitionedTable, partitionCols = partitionCols) assert(tableSchema == StructType(StructField("str", StringType, nullable = true) :: StructField("num", IntegerType, nullable = true) :: Nil)) - - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) assert(partCols == Seq("num")) } } } - test("Create non-partitioned data source table with partitioning columns but no schema") { + test("Create non-partitioned data source table without schema") { import testImplicits._ - - withTempPath { dir => - val pathToNonPartitionedTable = new File(dir, "nonPartitioned") - val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) - val tabName = "tab1" - withTable(tabName) { - spark.sql( - s""" - |CREATE TABLE $tabName - |USING parquet - |OPTIONS ( - | path '$pathToNonPartitionedTable' - |) - |PARTITIONED BY (inexistentColumns) - """.stripMargin) - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) - - val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns" :: Nil) + // Case 2: without schema and partitioning columns: None + Seq(Option("inexistentColumns" :: Nil), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + val (tableSchema, partCols) = + createDataSourceTableWithoutSchema( + pathToNonPartitionedTable, partitionCols = partitionCols) assert(tableSchema == StructType(StructField("num", IntegerType, nullable = true) :: StructField("str", StringType, nullable = true) :: Nil)) - - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) assert(partCols.isEmpty) } } From 224b0489917e53116a7122ed8e97b8b7f9af4966 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Jul 2016 14:23:43 -0700 Subject: [PATCH 10/13] more test cases --- .../command/createDataSourceTables.scala | 2 +- .../sql/execution/command/DDLSuite.scala | 83 +++++++++++++++---- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 401a479802159..ac5e36259313a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -119,7 +119,7 @@ case class CreateDataSourceTableCommand( s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + s"ignored. The schema and partition columns of table $tableIdent are inferred. " + s"Schema: ${dataSource.schema.simpleString}; " + - s"Partition columns: $res") + s"Partition columns: ${res.mkString("(", ", ", ")")}") } res } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 42aa09df94e92..d4963517d186e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -252,19 +252,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - private def createDataSourceTableWithoutSchema( + private def createDataSourceTable( path: File, - partitionCols: Option[Seq[String]]): (StructType, Seq[String]) = { + userSpecifiedSchema: Option[String], + userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { var tableSchema = StructType(Nil) var partCols = Seq.empty[String] val tabName = "tab1" withTable(tabName) { val partitionClause = - partitionCols.map(p => s"PARTITIONED BY ${p.mkString("(", ", ", ")")}").getOrElse("") - spark.sql( + userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("") + val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("") + sql( s""" - |CREATE TABLE $tabName + |CREATE TABLE $tabName $schemaClause |USING parquet |OPTIONS ( | path '$path' @@ -279,17 +281,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { (tableSchema, partCols) } - test("Create partitioned data source table without schema") { + test("Create partitioned data source table without user specified schema") { import testImplicits._ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - // Case 1: with partitioning columns but no schema: Option("inexistentColumns" :: Nil) + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") // Case 2: without schema and partitioning columns: None - Seq(Option("inexistentColumns" :: Nil), None).foreach { partitionCols => + Seq(Option("inexistentColumns"), None).foreach { partitionCols => withTempPath { pathToPartitionedTable => - df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath) + df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) val (tableSchema, partCols) = - createDataSourceTableWithoutSchema(pathToPartitionedTable, partitionCols = partitionCols) + createDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols) assert(tableSchema == StructType(StructField("str", StringType, nullable = true) :: StructField("num", IntegerType, nullable = true) :: Nil)) @@ -298,18 +304,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("Create non-partitioned data source table without schema") { + test("Create partitioned data source table with user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("num") + // Case 2: without schema and partitioning columns: None + Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToPartitionedTable => + df.write.format("parquet").partitionBy("num") + .save(pathToPartitionedTable.getCanonicalPath) + val (tableSchema, partCols) = + createDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols) + assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: + StructField("str", StringType, nullable = true) :: Nil)) + assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + } + } + } + + test("Create non-partitioned data source table without user specified schema") { import testImplicits._ val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") - // Case 1: with partitioning columns but no schema: Option("inexistentColumns" :: Nil) + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") // Case 2: without schema and partitioning columns: None - Seq(Option("inexistentColumns" :: Nil), None).foreach { partitionCols => + Seq(Option("inexistentColumns"), None).foreach { partitionCols => withTempPath { pathToNonPartitionedTable => df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) val (tableSchema, partCols) = - createDataSourceTableWithoutSchema( - pathToNonPartitionedTable, partitionCols = partitionCols) + createDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols) assert(tableSchema == StructType(StructField("num", IntegerType, nullable = true) :: StructField("str", StringType, nullable = true) :: Nil)) @@ -318,6 +349,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create non-partitioned data source table with user specified schema") { + import testImplicits._ + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + + // Case 1: with partitioning columns but no schema: Option("inexistentColumns") + // Case 2: without schema and partitioning columns: None + Seq(Option("num"), None).foreach { partitionCols => + withTempPath { pathToNonPartitionedTable => + df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) + val (tableSchema, partCols) = + createDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols) + assert(tableSchema == + StructType(StructField("num", IntegerType, nullable = true) :: + StructField("str", StringType, nullable = true) :: Nil)) + assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + } + } + } + test("Describe Table with Corrupted Schema") { import testImplicits._ From 264ad35a1a749e14f8d8a33e4977cddda0916204 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Jul 2016 18:30:25 -0700 Subject: [PATCH 11/13] addressed comments --- .../sql/execution/command/DDLSuite.scala | 105 ++++++++---------- 1 file changed, 46 insertions(+), 59 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d4963517d186e..d018ac3490cd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -296,9 +296,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToPartitionedTable, userSpecifiedSchema = None, userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == - StructType(StructField("str", StringType, nullable = true) :: - StructField("num", IntegerType, nullable = true) :: Nil)) + assert(tableSchema == new StructType().add("str", StringType).add("num", IntegerType)) assert(partCols == Seq("num")) } } @@ -319,9 +317,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToPartitionedTable, userSpecifiedSchema = Option("num int, str string"), userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == - StructType(StructField("num", IntegerType, nullable = true) :: - StructField("str", StringType, nullable = true) :: Nil)) + assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) assert(partCols.mkString(", ") == partitionCols.getOrElse("")) } } @@ -341,9 +337,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToNonPartitionedTable, userSpecifiedSchema = None, userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == - StructType(StructField("num", IntegerType, nullable = true) :: - StructField("str", StringType, nullable = true) :: Nil)) + assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) assert(partCols.isEmpty) } } @@ -363,9 +357,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToNonPartitionedTable, userSpecifiedSchema = Option("num int, str string"), userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == - StructType(StructField("num", IntegerType, nullable = true) :: - StructField("str", StringType, nullable = true) :: Nil)) + assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) assert(partCols.mkString(", ") == partitionCols.getOrElse("")) } } @@ -415,55 +407,50 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i)) .toDF("col1", "col2", "col3", "col4") df.write.format("json").partitionBy("col1", "col3").save(path) - val schema = StructType( - StructField("col2", StringType, nullable = true) :: - StructField("col4", LongType, nullable = true) :: - StructField("col1", IntegerType, nullable = true) :: - StructField("col3", IntegerType, nullable = true) :: Nil) + val schema = new StructType() + .add("col2", StringType).add("col4", LongType) + .add("col1", IntegerType).add("col3", IntegerType) val partitionCols = Seq("col1", "col3") - // Ensure the schema is split to multiple properties. - withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "1") { - withTable(tabName) { - spark.sql( - s""" - |CREATE TABLE $tabName - |USING json - |OPTIONS ( - | path '$path' - |) - """.stripMargin) - val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) - assert(tableSchema == schema) - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) - assert(partCols == partitionCols) - - // Change the schema - val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) - .toDF("newCol1", "newCol2") - newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path) - - // No change on the schema - val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchemaBeforeRefresh = - DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) - assert(tableSchemaBeforeRefresh == schema) - val partColsBeforeRefresh = - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) - assert(partColsBeforeRefresh == partitionCols) - - // Refresh does not affect the schema - spark.catalog.refreshTable(tabName) - - val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchemaAfterRefresh = - DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) - assert(tableSchemaAfterRefresh == schema) - val partColsAfterRefresh = - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) - assert(partColsAfterRefresh == partitionCols) - } + withTable(tabName) { + spark.sql( + s""" + |CREATE TABLE $tabName + |USING json + |OPTIONS ( + | path '$path' + |) + """.stripMargin) + val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) + assert(tableSchema == schema) + val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) + assert(partCols == partitionCols) + + // Change the schema + val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) + .toDF("newCol1", "newCol2") + newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path) + + // No change on the schema + val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaBeforeRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) + assert(tableSchemaBeforeRefresh == schema) + val partColsBeforeRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) + assert(partColsBeforeRefresh == partitionCols) + + // Refresh does not affect the schema + spark.catalog.refreshTable(tabName) + + val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) + val tableSchemaAfterRefresh = + DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) + assert(tableSchemaAfterRefresh == schema) + val partColsAfterRefresh = + DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) + assert(partColsAfterRefresh == partitionCols) } } } From 6492e98f80aae6d95b5683c4b2a1aab8e3edb94d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 27 Jul 2016 01:23:49 -0700 Subject: [PATCH 12/13] address comments --- .../sql/execution/command/DDLSuite.scala | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d018ac3490cd9..58cbc79890bb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -252,10 +252,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - private def createDataSourceTable( + private def checkSchemaInCreatedDataSourceTable( path: File, userSpecifiedSchema: Option[String], - userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { + userSpecifiedPartitionCols: Option[String], + expectedSchema: StructType, + expectedPartitionCols: Seq[String]): Unit = { var tableSchema = StructType(Nil) var partCols = Seq.empty[String] @@ -278,7 +280,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) } - (tableSchema, partCols) + assert(tableSchema == expectedSchema) + assert(partCols == expectedPartitionCols) } test("Create partitioned data source table without user specified schema") { @@ -291,13 +294,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempPath { pathToPartitionedTable => df.write.format("parquet").partitionBy("num") .save(pathToPartitionedTable.getCanonicalPath) - val (tableSchema, partCols) = - createDataSourceTable( - pathToPartitionedTable, - userSpecifiedSchema = None, - userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == new StructType().add("str", StringType).add("num", IntegerType)) - assert(partCols == Seq("num")) + checkSchemaInCreatedDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("str", StringType).add("num", IntegerType), + expectedPartitionCols = Seq("num")) } } } @@ -312,13 +314,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempPath { pathToPartitionedTable => df.write.format("parquet").partitionBy("num") .save(pathToPartitionedTable.getCanonicalPath) - val (tableSchema, partCols) = - createDataSourceTable( - pathToPartitionedTable, - userSpecifiedSchema = Option("num int, str string"), - userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) - assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + checkSchemaInCreatedDataSourceTable( + pathToPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) } } } @@ -332,13 +333,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Seq(Option("inexistentColumns"), None).foreach { partitionCols => withTempPath { pathToNonPartitionedTable => df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) - val (tableSchema, partCols) = - createDataSourceTable( - pathToNonPartitionedTable, - userSpecifiedSchema = None, - userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) - assert(partCols.isEmpty) + checkSchemaInCreatedDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = None, + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = Seq.empty[String]) } } } @@ -352,13 +352,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Seq(Option("num"), None).foreach { partitionCols => withTempPath { pathToNonPartitionedTable => df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath) - val (tableSchema, partCols) = - createDataSourceTable( - pathToNonPartitionedTable, - userSpecifiedSchema = Option("num int, str string"), - userSpecifiedPartitionCols = partitionCols) - assert(tableSchema == new StructType().add("num", IntegerType).add("str", StringType)) - assert(partCols.mkString(", ") == partitionCols.getOrElse("")) + checkSchemaInCreatedDataSourceTable( + pathToNonPartitionedTable, + userSpecifiedSchema = Option("num int, str string"), + userSpecifiedPartitionCols = partitionCols, + expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) } } } From b694d8bd3666e54e4d3ab972edcb04f2be64669b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 27 Jul 2016 08:11:51 -0700 Subject: [PATCH 13/13] fix after merge the latest changes --- .../org/apache/spark/sql/execution/command/tables.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 9e12dcc903d80..f85373c751110 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -431,12 +431,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { if (DDLUtils.isDatasourceTable(table)) { - val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table) - for (schema <- userSpecifiedSchema if partColNames.nonEmpty) { + if (partColNames.nonEmpty) { + val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) append(buffer, "# Partition Information", "", "") append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) - describeSchema(StructType(partColNames.map(schema(_))), buffer) + describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer) } } else { if (table.partitionColumns.nonEmpty) {