From c4e0fe3a24fbab2b1fb1615f0ac2c56bf230eda3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 15 Aug 2017 09:04:56 -0700 Subject: [PATCH] [SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties This is a follow-up of https://github.com/apache/spark/pull/15900 , to fix one more bug: When table schema is empty and need to be inferred at runtime, we should not resolve parent plans before the schema has been inferred, or the parent plans will be resolved against an empty schema and may get wrong result for something like `select *` The fix logic is: introduce `UnresolvedCatalogRelation` as a placeholder. Then we replace it with `LogicalRelation` or `HiveTableRelation` during analysis, so that it's guaranteed that we won't resolve parent plans until the schema has been inferred. regression test Author: Wenchen Fan Closes #18907 from cloud-fan/bug. --- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +-- .../sql/catalyst/catalog/interface.scala | 22 ++++++--- .../catalog/SessionCatalogSuite.scala | 4 +- .../apache/spark/sql/DataFrameWriter.scala | 9 ++-- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../execution/OptimizeMetadataOnlyQuery.scala | 6 +-- .../datasources/DataSourceStrategy.scala | 47 +++++++++++-------- .../sql/execution/datasources/rules.scala | 2 +- .../spark/sql/StatisticsCollectionSuite.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +-- .../spark/sql/hive/HiveStrategies.scala | 16 +++---- .../hive/execution/HiveTableScanExec.scala | 6 +-- .../sql/hive/MetastoreDataSourcesSuite.scala | 1 + .../spark/sql/hive/StatisticsSuite.scala | 10 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 10 ++-- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- 17 files changed, 86 insertions(+), 76 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8d9fb4c452fc9..df8f9aae1e1fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -679,12 +679,7 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(table, child) } else { - val tableRelation = CatalogRelation( - metadata, - // we assume all the columns are nullable. - metadata.dataSchema.asNullable.toAttributes, - metadata.partitionSchema.asNullable.toAttributes) - SubqueryAlias(table, tableRelation) + SubqueryAlias(table, UnresolvedCatalogRelation(metadata)) } } else { SubqueryAlias(table, tempTables(table)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 976d78749bfdc..5c8e5709a34f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -397,11 +397,22 @@ object CatalogTypes { type TablePartitionSpec = Map[String, String] } +/** + * A placeholder for a table relation, which will be replaced by concrete relation like + * `LogicalRelation` or `HiveTableRelation`, during analysis. + */ +case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode { + assert(tableMeta.identifier.database.isDefined) + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} /** - * A [[LogicalPlan]] that represents a table. + * A `LogicalPlan` that represents a hive table. + * + * TODO: remove this after we completely make hive as a data source. */ -case class CatalogRelation( +case class HiveTableRelation( tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { @@ -415,7 +426,7 @@ case class CatalogRelation( def isPartitioned: Boolean = partitionCols.nonEmpty override def equals(relation: Any): Boolean = relation match { - case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output + case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output case _ => false } @@ -434,15 +445,12 @@ case class CatalogRelation( )) override def computeStats(conf: SQLConf): Statistics = { - // For data source tables, we will create a `LogicalRelation` and won't call this method, for - // hive serde tables, we will always generate a statistics. - // TODO: unify the table stats generation. tableMeta.stats.map(_.toPlanStats(output)).getOrElse { throw new IllegalStateException("table stats must be specified.") } } - override def newInstance(): LogicalPlan = copy( + override def newInstance(): HiveTableRelation = copy( dataCols = dataCols.map(_.newInstance()), partitionCols = partitionCols.map(_.newInstance())) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 5ee729ef5cfad..9c1b6385d049b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -506,14 +506,14 @@ abstract class SessionCatalogSuite extends PlanTest { catalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name assert(catalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b71c5eb843eec..0259fffeab2db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand} @@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) => - relation.tableMeta.identifier + case relation: HiveTableRelation => relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed @@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) - && srcRelations.contains(relation.tableMeta.identifier) => + case relation: HiveTableRelation + if srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 00a5edfab9446..a775fb8ed4ed3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -2777,7 +2777,7 @@ class Dataset[T] private[sql]( fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles - case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) => + case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 3c046ce494285..d59b3c6f0caf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -101,7 +101,7 @@ case class OptimizeMetadataOnlyQuery( val partitionData = fsRelation.location.listFiles(Nil, Nil) LocalRelation(partAttrs, partitionData.map(_.values)) - case relation: CatalogRelation => + case relation: HiveTableRelation => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) @@ -137,7 +137,7 @@ case class OptimizeMetadataOnlyQuery( val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some(AttributeSet(partAttrs), l) - case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) Some(AttributeSet(partAttrs), relation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ded9303de55fe..04ee081a0f9ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast /** - * Replaces [[CatalogRelation]] with data source table if its table provider is not hive. + * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans. + * + * TODO: we should remove the special handling for hive tables after completely making hive as a + * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { - val table = r.tableMeta + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val catalogProxy = sparkSession.sessionState.catalog - - val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { + val catalog = sparkSession.sessionState.catalog + catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = @@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } - }).asInstanceOf[LogicalRelation] + }) + } - if (r.output.isEmpty) { - // It's possible that the table schema is empty and need to be inferred at runtime. For this - // case, we don't need to change the output of the cached plan. - plan - } else { - plan.copy(output = r.output) - } + private def readHiveTable(table: CatalogTable): LogicalPlan = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(r.tableMeta) => - i.copy(table = readDataSourceTable(r)) + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) + if DDLUtils.isDatasourceTable(tableMeta) => + i.copy(table = readDataSourceTable(tableMeta)) + + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => + i.copy(table = readHiveTable(tableMeta)) + + case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => + readDataSourceTable(tableMeta) - case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) => - readDataSourceTable(r) + case UnresolvedCatalogRelation(tableMeta) => + readHiveTable(tableMeta) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 45f2a41f24937..9647f2c0edccb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -376,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { - case relation: CatalogRelation => + case relation: HiveTableRelation => val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index ae0f219b281ea..86d19af9dd548 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -304,7 +304,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Analyze only one column. sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { - case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) + case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta) case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) }.head val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9b3cbb63a21b0..e1fee9af420dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } def convertToLogicalRelation( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { @@ -212,7 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - // The inferred schema may have different filed names as the table schema, we should respect + // The inferred schema may have different field names as the table schema, we should respect // it, but also respect the exprId in table relation output. assert(result.output.length == relation.output.length && result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) @@ -223,7 +223,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def inferIfNeeded( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4f090d545cd18..53e500ea78fc4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -116,7 +116,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta // TODO: check if this estimate is valid for tables after partition pruning. @@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) @@ -184,13 +184,13 @@ object HiveAnalysis extends Rule[LogicalPlan] { case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { - private def isConvertible(relation: CatalogRelation): Boolean = { + private def isConvertible(relation: HiveTableRelation): Boolean = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } - private def convert(relation: CatalogRelation): LogicalRelation = { + private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { val options = Map(ParquetOptions.MERGE_SCHEMA -> @@ -207,14 +207,14 @@ case class RelationConversions( override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) // Read path - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => convert(relation) } @@ -242,7 +242,7 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: CatalogRelation) => + case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionCols) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index e191071efbf18..75b076b07f4bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ @@ -50,7 +50,7 @@ import org.apache.spark.util.Utils private[hive] case class HiveTableScanExec( requestedAttributes: Seq[Attribute], - relation: CatalogRelation, + relation: HiveTableRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) extends LeafExecNode { @@ -205,7 +205,7 @@ case class HiveTableScanExec( val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), - relation.canonicalized.asInstanceOf[CatalogRelation], + relation.canonicalized.asInstanceOf[HiveTableRelation], QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b554694815571..06a30b726549e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1377,6 +1377,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv hiveClient.createTable(tableDesc, ignoreIfExists = false) checkAnswer(spark.table("old"), Row(1, "a")) + checkAnswer(sql("select * from old"), Row(1, "a")) val expectedSchema = StructType(Seq( StructField("i", IntegerType, nullable = true), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 819180d8b972e..a9caad897c589 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.joins._ @@ -60,7 +60,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto |LOCATION '${tempDir.toURI}'""".stripMargin) val relation = spark.table("csv_table").queryExecution.analyzed.children.head - .asInstanceOf[CatalogRelation] + .asInstanceOf[HiveTableRelation] val properties = relation.tableMeta.properties assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0") @@ -497,7 +497,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("estimates the size of a test Hive serde tables") { val df = sql("""SELECT * FROM src""") val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats(conf).sizeInBytes + case relation: HiveTableRelation => relation.stats(conf).sizeInBytes } assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), @@ -557,7 +557,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto () => (), metastoreQuery, metastoreAnswer, - implicitly[ClassTag[CatalogRelation]] + implicitly[ClassTag[HiveTableRelation]] ) } @@ -571,7 +571,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Assert src has a size smaller than the threshold. val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats(conf).sizeInBytes + case relation: HiveTableRelation => relation.stats(conf).sizeInBytes } assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 002ddd44e3bdb..dc79bfa96725c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceTable) { fail( - s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " + + s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") } userSpecifiedLocation match { @@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } assert(catalogTable.provider.get === format) - case r: CatalogRelation => + case r: HiveTableRelation => if (isDataSourceTable) { fail( s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + - s"${classOf[CatalogRelation].getCanonicalName}.") + s"${classOf[HiveTableRelation].getCanonicalName}.") } userSpecifiedLocation match { case Some(location) => @@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: CatalogRelation) => // OK + case SubqueryAlias(_, r: HiveTableRelation) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 8c855730c31f2..60ccd996d6d58 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ @@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => () + case _: HiveTableRelation => () }.getOrElse { fail(s"Expecting no conversion from orc to data sources, " + s"but got:\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 23f21e6b9931e..303884da19f09 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ @@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => + case _: HiveTableRelation => }.getOrElse { fail(s"Expecting no conversion from parquet to data sources, " + s"but got:\n$queryExecution")