Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,7 @@ class SessionCatalog(
child = parser.parsePlan(viewText))
SubqueryAlias(table, child)
} else {
val tableRelation = CatalogRelation(
metadata,
// we assume all the columns are nullable.
metadata.dataSchema.asNullable.toAttributes,
metadata.partitionSchema.asNullable.toAttributes)
SubqueryAlias(table, tableRelation)
SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
}
} else {
SubqueryAlias(table, tempTables(table))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand Down Expand Up @@ -400,11 +399,22 @@ object CatalogTypes {
type TablePartitionSpec = Map[String, String]
}

/**
* A placeholder for a table relation, which will be replaced by concrete relation like
* `LogicalRelation` or `HiveTableRelation`, during analysis.
*/
case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
assert(tableMeta.identifier.database.isDefined)
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}

/**
* A [[LogicalPlan]] that represents a table.
* A `LogicalPlan` that represents a hive table.
*
* TODO: remove this after we completely make hive as a data source.
*/
case class CatalogRelation(
case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
Expand All @@ -418,15 +428,15 @@ case class CatalogRelation(
def isPartitioned: Boolean = partitionCols.nonEmpty

override def equals(relation: Any): Boolean = relation match {
case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
case _ => false
}

override def hashCode(): Int = {
Objects.hashCode(tableMeta.identifier, output)
}

override lazy val canonicalized: LogicalPlan = copy(
override lazy val canonicalized: HiveTableRelation = copy(
tableMeta = tableMeta.copy(
storage = CatalogStorageFormat.empty,
createTime = -1
Expand All @@ -439,15 +449,12 @@ case class CatalogRelation(
})

override def computeStats(): Statistics = {
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
// hive serde tables, we will always generate a statistics.
// TODO: unify the table stats generation.
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
throw new IllegalStateException("table stats must be specified.")
}
}

override def newInstance(): LogicalPlan = copy(
override def newInstance(): HiveTableRelation = copy(
dataCols = dataCols.map(_.newInstance()),
partitionCols = partitionCols.map(_.newInstance()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -529,14 +528,14 @@ abstract class SessionCatalogSuite extends AnalysisTest {
catalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
// Otherwise, we'll first look up a temporary table with the same name
assert(catalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1))
// Then, if that does not exist, look up the relation in the current database
catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
relation.tableMeta.identifier
case relation: HiveTableRelation => relation.tableMeta.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
Expand All @@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
&& srcRelations.contains(relation.tableMeta.identifier) =>
case relation: HiveTableRelation
if srcRelations.contains(relation.tableMeta.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
Expand Down Expand Up @@ -2965,7 +2965,7 @@ class Dataset[T] private[sql](
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
}.flatten
files.toSet.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))

case relation: CatalogRelation =>
case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
val caseInsensitiveProperties =
CaseInsensitiveMap(relation.tableMeta.storage.properties)
Expand Down Expand Up @@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some((AttributeSet(partAttrs), l))

case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Some((AttributeSet(partAttrs), relation))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast


/**
* Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
* Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
*
* TODO: we should remove the special handling for hive tables after completely making hive as a
* data source.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
val table = r.tableMeta
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val catalogProxy = sparkSession.sessionState.catalog

val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
val catalog = sparkSession.sessionState.catalog
catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
Expand All @@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}).asInstanceOf[LogicalRelation]
})
}

if (r.output.isEmpty) {
// It's possible that the table schema is empty and need to be inferred at runtime. For this
// case, we don't need to change the output of the cached plan.
plan
} else {
plan.copy(output = r.output)
}
private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(r.tableMeta) =>
i.copy(table = readDataSourceTable(r))
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)

case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
readDataSourceTable(r)
case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
case relation: CatalogRelation =>
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
Expand Down Expand Up @@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {

private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
operator match {
case _: CatalogRelation => 1
case _: HiveTableRelation => 1
case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
case _: LeafNode => 0
// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.util.Random

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}

def convertToLogicalRelation(
relation: CatalogRelation,
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
Expand Down Expand Up @@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
}
// The inferred schema may have different filed names as the table schema, we should respect
// The inferred schema may have different field names as the table schema, we should respect
// it, but also respect the exprId in table relation output.
assert(result.output.length == relation.output.length &&
result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
Expand All @@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}

private def inferIfNeeded(
relation: CatalogRelation,
relation: HiveTableRelation,
options: Map[String, String],
fileFormat: FileFormat,
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
Expand Down
Loading