-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table #15024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -196,18 +196,32 @@ class InMemoryCatalog( | |
| throw new TableAlreadyExistsException(db = db, table = table) | ||
| } | ||
| } else { | ||
| if (tableDefinition.tableType == CatalogTableType.MANAGED) { | ||
| val dir = new Path(catalog(db).db.locationUri, table) | ||
| // Set the default table location if this is a managed table and its location is not | ||
| // specified. | ||
| // Ideally we should not create a managed table with location, but Hive serde table can | ||
| // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have | ||
| // to create the table directory and write out data before we create this table, to avoid | ||
| // exposing a partial written table. | ||
| val needDefaultTableLocation = | ||
| tableDefinition.tableType == CatalogTableType.MANAGED && | ||
| tableDefinition.storage.locationUri.isEmpty | ||
|
|
||
| val tableWithLocation = if (needDefaultTableLocation) { | ||
| val defaultTableLocation = new Path(catalog(db).db.locationUri, table) | ||
| try { | ||
| val fs = dir.getFileSystem(hadoopConfig) | ||
| fs.mkdirs(dir) | ||
| val fs = defaultTableLocation.getFileSystem(hadoopConfig) | ||
| fs.mkdirs(defaultTableLocation) | ||
| } catch { | ||
| case e: IOException => | ||
| throw new SparkException(s"Unable to create table $table as failed " + | ||
| s"to create its directory $dir", e) | ||
| s"to create its directory $defaultTableLocation", e) | ||
| } | ||
| tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString)) | ||
| } else { | ||
| tableDefinition | ||
| } | ||
| catalog(db).tables.put(table, new TableDesc(tableDefinition)) | ||
|
|
||
| catalog(db).tables.put(table, new TableDesc(tableWithLocation)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -218,8 +232,12 @@ class InMemoryCatalog( | |
| purge: Boolean): Unit = synchronized { | ||
| requireDbExists(db) | ||
| if (tableExists(db, table)) { | ||
| if (getTable(db, table).tableType == CatalogTableType.MANAGED) { | ||
| val dir = new Path(catalog(db).db.locationUri, table) | ||
| val tableMeta = getTable(db, table) | ||
| if (tableMeta.tableType == CatalogTableType.MANAGED) { | ||
| assert(tableMeta.storage.locationUri.isDefined, | ||
| "Managed table should always have table location, as we will assign a default location " + | ||
| "to it if it doesn't have one.") | ||
| val dir = new Path(tableMeta.storage.locationUri.get) | ||
| try { | ||
| val fs = dir.getFileSystem(hadoopConfig) | ||
| fs.delete(dir, true) | ||
|
|
@@ -244,7 +262,10 @@ class InMemoryCatalog( | |
| oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) | ||
|
|
||
| if (oldDesc.table.tableType == CatalogTableType.MANAGED) { | ||
| val oldDir = new Path(catalog(db).db.locationUri, oldName) | ||
| assert(oldDesc.table.storage.locationUri.isDefined, | ||
| "Managed table should always have table location, as we will assign a default location " + | ||
| "to it if it doesn't have one.") | ||
| val oldDir = new Path(oldDesc.table.storage.locationUri.get) | ||
| val newDir = new Path(catalog(db).db.locationUri, newName) | ||
| try { | ||
| val fs = oldDir.getFileSystem(hadoopConfig) | ||
|
|
@@ -254,6 +275,7 @@ class InMemoryCatalog( | |
| throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + | ||
| s"to rename its directory $oldDir", e) | ||
| } | ||
| oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString)) | ||
|
||
| } | ||
|
|
||
| catalog(db).tables.put(newName, oldDesc) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,13 +57,14 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
|
|
||
| // Create the relation to validate the arguments before writing the metadata to the metastore, | ||
| // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | ||
| val pathOption = table.storage.locationUri.map("path" -> _) | ||
| val dataSource: BaseRelation = | ||
| DataSource( | ||
| sparkSession = sparkSession, | ||
| userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), | ||
| className = table.provider.get, | ||
| bucketSpec = table.bucketSpec, | ||
| options = table.storage.properties).resolveRelation() | ||
| options = table.storage.properties ++ pathOption).resolveRelation() | ||
|
|
||
| dataSource match { | ||
| case fs: HadoopFsRelation => | ||
|
|
@@ -85,14 +86,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
| } | ||
| } | ||
|
|
||
| val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { | ||
| table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) | ||
| } else { | ||
| table.storage.properties | ||
| } | ||
|
|
||
| val newTable = table.copy( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where do we assign the default location? |
||
| storage = table.storage.copy(properties = optionsWithPath), | ||
| schema = dataSource.schema, | ||
| partitionColumnNames = partitionColumnNames, | ||
| // If metastore partition management for file source tables is enabled, we start off with | ||
|
|
@@ -140,12 +134,6 @@ case class CreateDataSourceTableAsSelectCommand( | |
| val tableIdentWithDB = table.identifier.copy(database = Some(db)) | ||
| val tableName = tableIdentWithDB.unquotedString | ||
|
|
||
| val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { | ||
| table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) | ||
| } else { | ||
| table.storage.properties | ||
| } | ||
|
|
||
| var createMetastoreTable = false | ||
| var existingSchema = Option.empty[StructType] | ||
| if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { | ||
|
|
@@ -162,13 +150,7 @@ case class CreateDataSourceTableAsSelectCommand( | |
| return Seq.empty[Row] | ||
| case SaveMode.Append => | ||
| // Check if the specified data source match the data source of the existing table. | ||
| val dataSource = DataSource( | ||
| sparkSession = sparkSession, | ||
| userSpecifiedSchema = Some(query.schema.asNullable), | ||
| partitionColumns = table.partitionColumnNames, | ||
| bucketSpec = table.bucketSpec, | ||
| className = provider, | ||
| options = optionsWithPath) | ||
| val existingProvider = DataSource.lookupDataSource(provider) | ||
| // TODO: Check that options from the resolved relation match the relation that we are | ||
| // inserting into (i.e. using the same compression). | ||
|
|
||
|
|
@@ -178,7 +160,7 @@ case class CreateDataSourceTableAsSelectCommand( | |
| case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => | ||
| // check if the file formats match | ||
| l.relation match { | ||
| case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass => | ||
| case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider => | ||
| throw new AnalysisException( | ||
| s"The file format of the existing table $tableName is " + | ||
| s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + | ||
|
|
@@ -213,13 +195,20 @@ case class CreateDataSourceTableAsSelectCommand( | |
| case None => data | ||
| } | ||
|
|
||
| val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { | ||
| Some(sessionState.catalog.defaultTablePath(table.identifier)) | ||
| } else { | ||
| table.storage.locationUri | ||
| } | ||
|
|
||
| // Create the relation based on the data of df. | ||
| val pathOption = tableLocation.map("path" -> _) | ||
| val dataSource = DataSource( | ||
| sparkSession, | ||
| className = provider, | ||
| partitionColumns = table.partitionColumnNames, | ||
| bucketSpec = table.bucketSpec, | ||
| options = optionsWithPath) | ||
| options = table.storage.properties ++ pathOption) | ||
|
|
||
| val result = try { | ||
| dataSource.write(mode, df) | ||
|
|
@@ -230,7 +219,7 @@ case class CreateDataSourceTableAsSelectCommand( | |
| } | ||
| if (createMetastoreTable) { | ||
| val newTable = table.copy( | ||
| storage = table.storage.copy(properties = optionsWithPath), | ||
| storage = table.storage.copy(locationUri = tableLocation), | ||
| // We will use the schema of resolved.relation as the schema of the table (instead of | ||
| // the schema of df). It is important since the nullability may be changed by the relation | ||
| // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -485,14 +485,6 @@ case class AlterTableRecoverPartitionsCommand( | |
| } | ||
| } | ||
|
|
||
| private def getBasePath(table: CatalogTable): Option[String] = { | ||
| if (table.provider == Some("hive")) { | ||
| table.storage.locationUri | ||
| } else { | ||
| new CaseInsensitiveMap(table.storage.properties).get("path") | ||
| } | ||
| } | ||
|
|
||
| override def run(spark: SparkSession): Seq[Row] = { | ||
| val catalog = spark.sessionState.catalog | ||
| val table = catalog.getTableMetadata(tableName) | ||
|
|
@@ -503,13 +495,12 @@ case class AlterTableRecoverPartitionsCommand( | |
| s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") | ||
| } | ||
|
|
||
| val tablePath = getBasePath(table) | ||
| if (tablePath.isEmpty) { | ||
| if (table.storage.locationUri.isEmpty) { | ||
| throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + | ||
| s"location provided: $tableIdentWithDB") | ||
| } | ||
|
|
||
| val root = new Path(tablePath.get) | ||
| val root = new Path(table.storage.locationUri.get) | ||
| logInfo(s"Recover all the partitions in $root") | ||
| val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) | ||
|
|
||
|
|
@@ -688,15 +679,7 @@ case class AlterTableSetLocationCommand( | |
| catalog.alterPartitions(table.identifier, Seq(newPart)) | ||
| case None => | ||
| // No partition spec is specified, so we set the location for the table itself | ||
| val newTable = | ||
| if (DDLUtils.isDatasourceTable(table)) { | ||
| table.withNewStorage( | ||
| locationUri = Some(location), | ||
| properties = table.storage.properties ++ Map("path" -> location)) | ||
| } else { | ||
| table.withNewStorage(locationUri = Some(location)) | ||
| } | ||
| catalog.alterTable(newTable) | ||
| catalog.alterTable(table.withNewStorage(locationUri = Some(location))) | ||
|
||
| } | ||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | |
| import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
| import org.apache.spark.sql.catalyst.util.quoteIdentifier | ||
| import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
| import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -62,25 +63,6 @@ case class CreateTableLikeCommand( | |
| val catalog = sparkSession.sessionState.catalog | ||
| val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable) | ||
|
|
||
| // Storage format | ||
| val newStorage = | ||
| if (sourceTableDesc.tableType == CatalogTableType.VIEW) { | ||
| val newPath = catalog.defaultTablePath(targetTable) | ||
| CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath)) | ||
| } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) { | ||
| val newPath = catalog.defaultTablePath(targetTable) | ||
| val newSerdeProp = | ||
| sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++ | ||
| Map("path" -> newPath) | ||
| sourceTableDesc.storage.copy( | ||
| locationUri = None, | ||
| properties = newSerdeProp) | ||
| } else { | ||
| sourceTableDesc.storage.copy( | ||
| locationUri = None, | ||
| properties = sourceTableDesc.storage.properties) | ||
| } | ||
|
|
||
| val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) { | ||
| Some(sparkSession.sessionState.conf.defaultDataSourceName) | ||
| } else { | ||
|
|
@@ -91,7 +73,8 @@ case class CreateTableLikeCommand( | |
| CatalogTable( | ||
| identifier = targetTable, | ||
| tableType = CatalogTableType.MANAGED, | ||
| storage = newStorage, | ||
| // We are creating a new managed table, which should not have custom table location. | ||
| storage = sourceTableDesc.storage.copy(locationUri = None), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When will we set the location? Is it set by hive metastore? |
||
| schema = sourceTableDesc.schema, | ||
| provider = newProvider, | ||
| partitionColumnNames = sourceTableDesc.partitionColumnNames, | ||
|
|
@@ -170,13 +153,6 @@ case class AlterTableRenameCommand( | |
| case NonFatal(e) => log.warn(e.toString, e) | ||
| } | ||
| } | ||
| // For datasource tables, we also need to update the "path" serde property | ||
| if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { | ||
| val newPath = catalog.defaultTablePath(newName) | ||
| val newTable = table.withNewStorage( | ||
| properties = table.storage.properties ++ Map("path" -> newPath)) | ||
| catalog.alterTable(newTable) | ||
| } | ||
| // Invalidate the table last, otherwise uncaching the table would load the logical plan | ||
| // back into the hive metastore cache | ||
| catalog.refreshTable(oldName) | ||
|
|
@@ -367,8 +343,9 @@ case class TruncateTableCommand( | |
| DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") | ||
| } | ||
| val locations = | ||
| if (DDLUtils.isDatasourceTable(table)) { | ||
| Seq(table.storage.properties.get("path")) | ||
| // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec. | ||
| if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") { | ||
| Seq(table.storage.locationUri) | ||
| } else if (table.partitionColumnNames.isEmpty) { | ||
| Seq(table.storage.locationUri) | ||
| } else { | ||
|
|
@@ -916,17 +893,18 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman | |
| } | ||
|
|
||
| private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { | ||
| val props = metadata.properties | ||
|
|
||
| builder ++= s"USING ${metadata.provider.get}\n" | ||
|
|
||
| val dataSourceOptions = metadata.storage.properties.filterNot { | ||
| case (key, value) => | ||
| val dataSourceOptions = metadata.storage.properties.map { | ||
| case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" | ||
| } ++ metadata.storage.locationUri.flatMap { location => | ||
| if (metadata.tableType == MANAGED) { | ||
| // If it's a managed table, omit PATH option. Spark SQL always creates external table | ||
| // when the table creation DDL contains the PATH option. | ||
| key.toLowerCase == "path" && metadata.tableType == MANAGED | ||
| }.map { | ||
| case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" | ||
| None | ||
| } else { | ||
| Some(s"path '${escapeSingleQuotedString(location)}'") | ||
| } | ||
| } | ||
|
|
||
| if (dataSourceOptions.nonEmpty) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a managed table, when will its location uri be empty? When it is a data source table?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally all managed tables should not set location uri, but we have 2 special cases: