diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b44d2ee69e1d1..73db4945a817e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -689,12 +689,7 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(table, child) } else { - val tableRelation = CatalogRelation( - metadata, - // we assume all the columns are nullable. - metadata.dataSchema.asNullable.toAttributes, - metadata.partitionSchema.asNullable.toAttributes) - SubqueryAlias(table, tableRelation) + SubqueryAlias(table, UnresolvedCatalogRelation(metadata)) } } else { SubqueryAlias(table, tempTables(table)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9531456434a15..8d17719641d16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -400,11 +399,22 @@ object CatalogTypes { type TablePartitionSpec = Map[String, String] } +/** + * A placeholder for a table relation, which will be replaced by concrete relation like + * `LogicalRelation` or `HiveTableRelation`, during analysis. + */ +case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode { + assert(tableMeta.identifier.database.isDefined) + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} /** - * A [[LogicalPlan]] that represents a table. + * A `LogicalPlan` that represents a hive table. + * + * TODO: remove this after we completely make hive as a data source. */ -case class CatalogRelation( +case class HiveTableRelation( tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { @@ -418,7 +428,7 @@ case class CatalogRelation( def isPartitioned: Boolean = partitionCols.nonEmpty override def equals(relation: Any): Boolean = relation match { - case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output + case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output case _ => false } @@ -426,7 +436,7 @@ case class CatalogRelation( Objects.hashCode(tableMeta.identifier, output) } - override lazy val canonicalized: LogicalPlan = copy( + override lazy val canonicalized: HiveTableRelation = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, createTime = -1 @@ -439,15 +449,12 @@ case class CatalogRelation( }) override def computeStats(): Statistics = { - // For data source tables, we will create a `LogicalRelation` and won't call this method, for - // hive serde tables, we will always generate a statistics. - // TODO: unify the table stats generation. tableMeta.stats.map(_.toPlanStats(output)).getOrElse { throw new IllegalStateException("table stats must be specified.") } } - override def newInstance(): LogicalPlan = copy( + override def newInstance(): HiveTableRelation = copy( dataCols = dataCols.map(_.newInstance()), partitionCols = partitionCols.map(_.newInstance())) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 8f856a0daad15..2867aed61ec08 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -529,14 +528,14 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name assert(catalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 079f699a181f5..af0936d85dbb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,11 +24,11 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) => - relation.tableMeta.identifier + case relation: HiveTableRelation => relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed @@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) - && srcRelations.contains(relation.tableMeta.identifier) => + case relation: HiveTableRelation + if srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index aa968d8b3c34d..a9887eb95279f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} @@ -2965,7 +2965,7 @@ class Dataset[T] private[sql]( fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles - case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) => + case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 02f45abaa3f08..301c4f02647d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic val partitionData = fsRelation.location.listFiles(Nil, Nil) LocalRelation(partAttrs, partitionData.map(_.values)) - case relation: CatalogRelation => + case relation: HiveTableRelation => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) @@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some((AttributeSet(partAttrs), l)) - case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) Some((AttributeSet(partAttrs), relation)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 587b9b450ea2a..237017742770a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ @@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast /** - * Replaces [[CatalogRelation]] with data source table if its table provider is not hive. + * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans. + * + * TODO: we should remove the special handling for hive tables after completely making hive as a + * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { - val table = r.tableMeta + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val catalogProxy = sparkSession.sessionState.catalog - - val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { + val catalog = sparkSession.sessionState.catalog + catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = @@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } - }).asInstanceOf[LogicalRelation] + }) + } - if (r.output.isEmpty) { - // It's possible that the table schema is empty and need to be inferred at runtime. For this - // case, we don't need to change the output of the cached plan. - plan - } else { - plan.copy(output = r.output) - } + private def readHiveTable(table: CatalogTable): LogicalPlan = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(r.tableMeta) => - i.copy(table = readDataSourceTable(r)) + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) + if DDLUtils.isDatasourceTable(tableMeta) => + i.copy(table = readDataSourceTable(tableMeta)) + + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => + i.copy(table = readHiveTable(tableMeta)) + + case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => + readDataSourceTable(tableMeta) - case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) => - readDataSourceTable(r) + case UnresolvedCatalogRelation(tableMeta) => + readHiveTable(tableMeta) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index cb8dc1e041a9b..84acca242aa41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { - case relation: CatalogRelation => + case relation: HiveTableRelation => val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => @@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) { private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = { operator match { - case _: CatalogRelation => 1 + case _: HiveTableRelation => 1 case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1 case _: LeafNode => 0 // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 41569762d3c59..5916cd76b8789 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Analyze only one column. sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { - case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) + case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta) case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) }.head val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 808dc013f170b..8bab059ed5e84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } def convertToLogicalRelation( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { @@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - // The inferred schema may have different filed names as the table schema, we should respect + // The inferred schema may have different field names as the table schema, we should respect // it, but also respect the exprId in table relation output. assert(result.output.length == relation.output.length && result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) @@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def inferIfNeeded( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9c60d22d35ce1..ae1e7e72e8c3f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,10 +21,9 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -116,7 +115,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { @@ -147,7 +146,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) @@ -171,13 +170,13 @@ object HiveAnalysis extends Rule[LogicalPlan] { case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { - private def isConvertible(relation: CatalogRelation): Boolean = { + private def isConvertible(relation: HiveTableRelation): Boolean = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } - private def convert(relation: CatalogRelation): LogicalRelation = { + private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { val options = Map(ParquetOptions.MERGE_SCHEMA -> @@ -194,14 +193,14 @@ case class RelationConversions( override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) // Read path - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => convert(relation) } @@ -229,7 +228,7 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: CatalogRelation) => + case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionCols) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index e191071efbf18..896f24f2e223d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ @@ -50,7 +50,7 @@ import org.apache.spark.util.Utils private[hive] case class HiveTableScanExec( requestedAttributes: Seq[Attribute], - relation: CatalogRelation, + relation: HiveTableRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) extends LeafExecNode { @@ -205,7 +205,7 @@ case class HiveTableScanExec( val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), - relation.canonicalized.asInstanceOf[CatalogRelation], + relation.canonicalized, QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c785aca985820..e01198dd53178 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1372,6 +1372,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv hiveClient.createTable(tableDesc, ignoreIfExists = false) checkAnswer(spark.table("old"), Row(1, "a")) + checkAnswer(sql("select * from old"), Row(1, "a")) val expectedSchema = StructType(Seq( StructField("i", IntegerType, nullable = true), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 36566bffb9335..71cf79c473b46 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils @@ -72,7 +72,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto |LOCATION '${tempDir.toURI}'""".stripMargin) val relation = spark.table("csv_table").queryExecution.analyzed.children.head - .asInstanceOf[CatalogRelation] + .asInstanceOf[HiveTableRelation] val properties = relation.tableMeta.ignoredProperties assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0") @@ -905,7 +905,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("estimates the size of a test Hive serde tables") { val df = sql("""SELECT * FROM src""") val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats.sizeInBytes + case relation: HiveTableRelation => relation.stats.sizeInBytes } assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), @@ -965,7 +965,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto () => (), metastoreQuery, metastoreAnswer, - implicitly[ClassTag[CatalogRelation]] + implicitly[ClassTag[HiveTableRelation]] ) } @@ -979,7 +979,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Assert src has a size smaller than the threshold. val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats.sizeInBytes + case relation: HiveTableRelation => relation.stats.sizeInBytes } assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a949e5e829e14..eea96eae5a242 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceTable) { fail( - s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " + + s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") } userSpecifiedLocation match { @@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } assert(catalogTable.provider.get === format) - case r: CatalogRelation => + case r: HiveTableRelation => if (isDataSourceTable) { fail( s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + - s"${classOf[CatalogRelation].getCanonicalName}.") + s"${classOf[HiveTableRelation].getCanonicalName}.") } userSpecifiedLocation match { case Some(location) => @@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: CatalogRelation) => // OK + case SubqueryAlias(_, r: HiveTableRelation) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 8c855730c31f2..60ccd996d6d58 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ @@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => () + case _: HiveTableRelation => () }.getOrElse { fail(s"Expecting no conversion from orc to data sources, " + s"but got:\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 23f21e6b9931e..303884da19f09 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ @@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => + case _: HiveTableRelation => }.getOrElse { fail(s"Expecting no conversion from parquet to data sources, " + s"but got:\n$queryExecution")