Skip to content

Commit e635cbb

Browse files
jiangxb1987cloud-fan
authored andcommitted
[SPARK-18801][SQL][FOLLOWUP] Alias the view with its child
## What changes were proposed in this pull request? This PR is a follow-up to address the comments https://github.com/apache/spark/pull/16233/files#r95669988 and https://github.com/apache/spark/pull/16233/files#r95662299. We try to wrap the child by: 1. Generate the `queryOutput` by: 1.1. If the query column names are defined, map the column names to attributes in the child output by name; 1.2. Else set the child output attributes to `queryOutput`. 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match, try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. 3. Add a Project over the child, with the new output generated by the previous steps. If the view output doesn't have the same number of columns neither with the child output, nor with the query column names, throw an AnalysisException. ## How was this patch tested? Add new test cases in `SQLViewSuite`. Author: jiangxingbo <[email protected]> Closes #16561 from jiangxb1987/alias-view.
1 parent 61e48f5 commit e635cbb

File tree

5 files changed

+214
-57
lines changed

5 files changed

+214
-57
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2281,32 +2281,18 @@ class Analyzer(
22812281
"type of the field in the target object")
22822282
}
22832283

2284-
private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
2285-
val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
2286-
val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
2287-
toPrecedence > 0 && fromPrecedence > toPrecedence
2288-
}
2289-
22902284
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
22912285
case p if !p.childrenResolved => p
22922286
case p if p.resolved => p
22932287

22942288
case p => p transformExpressions {
22952289
case u @ UpCast(child, _, _) if !child.resolved => u
22962290

2297-
case UpCast(child, dataType, walkedTypePath) => (child.dataType, dataType) match {
2298-
case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) =>
2299-
fail(child, to, walkedTypePath)
2300-
case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) =>
2301-
fail(child, to, walkedTypePath)
2302-
case (from, to) if illegalNumericPrecedence(from, to) =>
2303-
fail(child, to, walkedTypePath)
2304-
case (TimestampType, DateType) =>
2305-
fail(child, DateType, walkedTypePath)
2306-
case (StringType, to: NumericType) =>
2307-
fail(child, to, walkedTypePath)
2308-
case _ => Cast(child, dataType.asNullable)
2309-
}
2291+
case UpCast(child, dataType, walkedTypePath)
2292+
if Cast.mayTruncate(child.dataType, dataType) =>
2293+
fail(child, dataType, walkedTypePath)
2294+
2295+
case UpCast(child, dataType, walkedTypePath) => Cast(child, dataType.asNullable)
23102296
}
23112297
}
23122298
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,60 @@ import org.apache.spark.sql.catalyst.rules.Rule
2828
*/
2929

3030
/**
31-
* Make sure that a view's child plan produces the view's output attributes. We wrap the child
32-
* with a Project and add an alias for each output attribute. The attributes are resolved by
33-
* name. This should be only done after the batch of Resolution, because the view attributes are
34-
* not completely resolved during the batch of Resolution.
31+
* Make sure that a view's child plan produces the view's output attributes. We try to wrap the
32+
* child by:
33+
* 1. Generate the `queryOutput` by:
34+
* 1.1. If the query column names are defined, map the column names to attributes in the child
35+
* output by name(This is mostly for handling view queries like SELECT * FROM ..., the
36+
* schema of the referenced table/view may change after the view has been created, so we
37+
* have to save the output of the query to `viewQueryColumnNames`, and restore them during
38+
* view resolution, in this way, we are able to get the correct view column ordering and
39+
* omit the extra columns that we don't require);
40+
* 1.2. Else set the child output attributes to `queryOutput`.
41+
* 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match,
42+
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
43+
* 3. Add a Project over the child, with the new output generated by the previous steps.
44+
* If the view output doesn't have the same number of columns neither with the child output, nor
45+
* with the query column names, throw an AnalysisException.
46+
*
47+
* This should be only done after the batch of Resolution, because the view attributes are not
48+
* completely resolved during the batch of Resolution.
3549
*/
3650
case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] {
3751
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
38-
case v @ View(_, output, child) if child.resolved =>
52+
case v @ View(desc, output, child) if child.resolved && output != child.output =>
3953
val resolver = conf.resolver
40-
val newOutput = output.map { attr =>
41-
val originAttr = findAttributeByName(attr.name, child.output, resolver)
42-
// The dataType of the output attributes may be not the same with that of the view output,
43-
// so we should cast the attribute to the dataType of the view output attribute. If the
44-
// cast can't perform, will throw an AnalysisException.
45-
Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
46-
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
54+
val queryColumnNames = desc.viewQueryColumnNames
55+
val queryOutput = if (queryColumnNames.nonEmpty) {
56+
// If the view output doesn't have the same number of columns with the query column names,
57+
// throw an AnalysisException.
58+
if (output.length != queryColumnNames.length) {
59+
throw new AnalysisException(
60+
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
61+
s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
62+
}
63+
desc.viewQueryColumnNames.map { colName =>
64+
findAttributeByName(colName, child.output, resolver)
65+
}
66+
} else {
67+
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
68+
// output is the same with the view output.
69+
child.output
70+
}
71+
// Map the attributes in the query output to the attributes in the view output by index.
72+
val newOutput = output.zip(queryOutput).map {
73+
case (attr, originAttr) if attr != originAttr =>
74+
// The dataType of the output attributes may be not the same with that of the view
75+
// output, so we should cast the attribute to the dataType of the view output attribute.
76+
// Will throw an AnalysisException if the cast can't perform or might truncate.
77+
if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
78+
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
79+
s"${originAttr.dataType.simpleString} to ${attr.simpleString} as it may truncate\n")
80+
} else {
81+
Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
82+
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
83+
}
84+
case (_, originAttr) => originAttr
4785
}
4886
v.copy(child = Project(newOutput, child))
4987
}
@@ -74,7 +112,9 @@ object EliminateView extends Rule[LogicalPlan] {
74112
// The child should have the same output attributes with the View operator, so we simply
75113
// remove the View operator.
76114
case View(_, output, child) =>
77-
assert(output == child.output, "The output of the child is different from the view output")
115+
assert(output == child.output,
116+
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
117+
s"view output ${output.mkString("[", ",", "]")}")
78118
child
79119
}
80120
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.catalog
1919

2020
import java.util.Date
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.AnalysisException
2325
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
2426
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
2527
import org.apache.spark.sql.catalyst.plans.logical._
2628
import org.apache.spark.sql.catalyst.util.quoteIdentifier
27-
import org.apache.spark.sql.types.{StructField, StructType}
29+
import org.apache.spark.sql.types.StructType
30+
2831

2932

3033
/**
@@ -178,6 +181,8 @@ case class CatalogTable(
178181
unsupportedFeatures: Seq[String] = Seq.empty,
179182
tracksPartitionsInCatalog: Boolean = false) {
180183

184+
import CatalogTable._
185+
181186
/** schema of this table's partition columns */
182187
def partitionSchema: StructType = StructType(schema.filter {
183188
c => partitionColumnNames.contains(c.name)
@@ -198,9 +203,44 @@ case class CatalogTable(
198203

199204
/**
200205
* Return the default database name we use to resolve a view, should be None if the CatalogTable
201-
* is not a View.
206+
* is not a View or created by older versions of Spark(before 2.2.0).
207+
*/
208+
def viewDefaultDatabase: Option[String] = properties.get(VIEW_DEFAULT_DATABASE)
209+
210+
/**
211+
* Return the output column names of the query that creates a view, the column names are used to
212+
* resolve a view, should be empty if the CatalogTable is not a View or created by older versions
213+
* of Spark(before 2.2.0).
214+
*/
215+
def viewQueryColumnNames: Seq[String] = {
216+
for {
217+
numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq
218+
index <- 0 until numCols.toInt
219+
} yield properties.getOrElse(
220+
s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
221+
throw new AnalysisException("Corrupted view query output column names in catalog: " +
222+
s"$numCols parts expected, but part $index is missing.")
223+
)
224+
}
225+
226+
/**
227+
* Insert/Update the view query output column names in `properties`.
202228
*/
203-
def viewDefaultDatabase: Option[String] = properties.get(CatalogTable.VIEW_DEFAULT_DATABASE)
229+
def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
230+
val props = new mutable.HashMap[String, String]
231+
if (columns.nonEmpty) {
232+
props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
233+
columns.zipWithIndex.foreach { case (colName, index) =>
234+
props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
235+
}
236+
}
237+
238+
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
239+
// while `CatalogTable` should be serializable.
240+
copy(properties = properties.filterNot { case (key, _) =>
241+
key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
242+
} ++ props)
243+
}
204244

205245
/** Syntactic sugar to update a field in `storage`. */
206246
def withNewStorage(
@@ -254,6 +294,9 @@ case class CatalogTable(
254294

255295
object CatalogTable {
256296
val VIEW_DEFAULT_DATABASE = "view.default.database"
297+
val VIEW_QUERY_OUTPUT_PREFIX = "view.query.out."
298+
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
299+
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
257300
}
258301

259302
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.math.{BigDecimal => JavaBigDecimal}
2121

2222
import org.apache.spark.SparkException
2323
import org.apache.spark.sql.catalyst.InternalRow
24-
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
24+
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
2525
import org.apache.spark.sql.catalyst.expressions.codegen._
2626
import org.apache.spark.sql.catalyst.util._
2727
import org.apache.spark.sql.types._
@@ -89,6 +89,25 @@ object Cast {
8989
case _ => false
9090
}
9191

92+
/**
93+
* Return true iff we may truncate during casting `from` type to `to` type. e.g. long -> int,
94+
* timestamp -> date.
95+
*/
96+
def mayTruncate(from: DataType, to: DataType): Boolean = (from, to) match {
97+
case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => true
98+
case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => true
99+
case (from, to) if illegalNumericPrecedence(from, to) => true
100+
case (TimestampType, DateType) => true
101+
case (StringType, to: NumericType) => true
102+
case _ => false
103+
}
104+
105+
private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
106+
val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
107+
val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
108+
toPrecedence > 0 && fromPrecedence > toPrecedence
109+
}
110+
92111
def forceNullable(from: DataType, to: DataType): Boolean = (from, to) match {
93112
case (NullType, _) => true
94113
case (_, _) if from == to => false

0 commit comments

Comments
 (0)