Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2272,32 +2272,18 @@ class Analyzer(
"type of the field in the target object")
}

private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
toPrecedence > 0 && fromPrecedence > toPrecedence
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.childrenResolved => p
case p if p.resolved => p

case p => p transformExpressions {
case u @ UpCast(child, _, _) if !child.resolved => u

case UpCast(child, dataType, walkedTypePath) => (child.dataType, dataType) match {
case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) =>
fail(child, to, walkedTypePath)
case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) =>
fail(child, to, walkedTypePath)
case (from, to) if illegalNumericPrecedence(from, to) =>
fail(child, to, walkedTypePath)
case (TimestampType, DateType) =>
fail(child, DateType, walkedTypePath)
case (StringType, to: NumericType) =>
fail(child, to, walkedTypePath)
case _ => Cast(child, dataType.asNullable)
}
case UpCast(child, dataType, walkedTypePath)
if Cast.mayTruncate(child.dataType, dataType) =>
fail(child, dataType, walkedTypePath)

case UpCast(child, dataType, walkedTypePath) => Cast(child, dataType.asNullable)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,60 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/

/**
* Make sure that a view's child plan produces the view's output attributes. We wrap the child
* with a Project and add an alias for each output attribute. The attributes are resolved by
* name. This should be only done after the batch of Resolution, because the view attributes are
* not completely resolved during the batch of Resolution.
* Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* child by:
* 1. Generate the `queryOutput` by:
* 1.1. If the query column names are defined, map the column names to attributes in the child
* output by name(This is mostly for handling view queries like SELECT * FROM ..., the
* schema of the referenced table/view may change after the view has been created, so we
* have to save the output of the query to `viewQueryColumnNames`, and restore them during
* view resolution, in this way, we are able to get the correct view column ordering and
* omit the extra columns that we don't require);
* 1.2. Else set the child output attributes to `queryOutput`.
* 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match,
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
* If the view output doesn't have the same number of columns neither with the child output, nor
* with the query column names, throw an AnalysisException.
*
* This should be only done after the batch of Resolution, because the view attributes are not
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case v @ View(_, output, child) if child.resolved =>
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
val newOutput = output.map { attr =>
val originAttr = findAttributeByName(attr.name, child.output, resolver)
// The dataType of the output attributes may be not the same with that of the view output,
// so we should cast the attribute to the dataType of the view output attribute. If the
// cast can't perform, will throw an AnalysisException.
Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
// If the view output doesn't have the same number of columns with the query column names,
// throw an AnalysisException.
if (output.length != queryColumnNames.length) {
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
}
desc.viewQueryColumnNames.map { colName =>
findAttributeByName(colName, child.output, resolver)
}
} else {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
// output is the same with the view output.
child.output
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

val queryOutput = if (queryColumnNames.nonEmpty) {
  if (output.length != queryColumnNames.length) throw ...
  desc.viewQueryColumnNames.map { colName =>
    findAttributeByName(colName, child.output, resolver)
  }
} else {
  // For view created before Spark 2.1, the view text is already fully qualified, the plan output is view output.
  child.output
}

// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to check the size of output and queryOutput.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For views created by older versions of Spark, the view text is fully qualified, so the output is the same with the view output. Or else we have checked that the output have the same length with queryColumnNames. So perhaps we don't need to check the size of output and queryOutput here.

case (attr, originAttr) if attr != originAttr =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast can't perform or might truncate.
if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.simpleString} to ${attr.simpleString} as it may truncate\n")
} else {
Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
}
case (_, originAttr) => originAttr
}
v.copy(child = Project(newOutput, child))
}
Expand Down Expand Up @@ -74,7 +112,9 @@ object EliminateView extends Rule[LogicalPlan] {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
assert(output == child.output, "The output of the child is different from the view output")
assert(output == child.output,
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.catalog

import java.util.Date

import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.StructType



/**
Expand Down Expand Up @@ -178,6 +181,8 @@ case class CatalogTable(
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {

import CatalogTable._

/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
c => partitionColumnNames.contains(c.name)
Expand All @@ -198,9 +203,44 @@ case class CatalogTable(

/**
* Return the default database name we use to resolve a view, should be None if the CatalogTable
* is not a View.
* is not a View or created by older versions of Spark(before 2.2.0).
*/
def viewDefaultDatabase: Option[String] = properties.get(VIEW_DEFAULT_DATABASE)

/**
* Return the output column names of the query that creates a view, the column names are used to
* resolve a view, should be empty if the CatalogTable is not a View or created by older versions
* of Spark(before 2.2.0).
*/
def viewQueryColumnNames: Seq[String] = {
for {
numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq
index <- 0 until numCols.toInt
} yield properties.getOrElse(
s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
throw new AnalysisException("Corrupted view query output column names in catalog: " +
s"$numCols parts expected, but part $index is missing.")
)
}

/**
* Insert/Update the view query output column names in `properties`.
*/
def viewDefaultDatabase: Option[String] = properties.get(CatalogTable.VIEW_DEFAULT_DATABASE)
def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
val props = new mutable.HashMap[String, String]
if (columns.nonEmpty) {
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
columns.zipWithIndex.foreach { case (colName, index) =>
props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
}
}

// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
copy(properties = properties.filterNot { case (key, _) =>
key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
} ++ props)
}

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
Expand Down Expand Up @@ -254,6 +294,9 @@ case class CatalogTable(

object CatalogTable {
val VIEW_DEFAULT_DATABASE = "view.default.database"
val VIEW_QUERY_OUTPUT_PREFIX = "view.query.out."
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.math.{BigDecimal => JavaBigDecimal}

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -89,6 +89,25 @@ object Cast {
case _ => false
}

/**
* Return true iff we may truncate during casting `from` type to `to` type. e.g. long -> int,
* timestamp -> date.
*/
def mayTruncate(from: DataType, to: DataType): Boolean = (from, to) match {
case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => true
case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => true
case (from, to) if illegalNumericPrecedence(from, to) => true
case (TimestampType, DateType) => true
case (StringType, to: NumericType) => true
case _ => false
}

private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
toPrecedence > 0 && fromPrecedence > toPrecedence
}

def forceNullable(from: DataType, to: DataType): Boolean = (from, to) match {
case (NullType, _) => true
case (_, _) if from == to => false
Expand Down
Loading