Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class Analyzer(
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this rule doesn't need to be in the giant resolution batch. Shall we put it in the Post-Hoc Resolution batch which run only once?

Copy link
Contributor Author

@rdblue rdblue Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule may add Projection, UpCast, and Alias nodes to the plan, so there are some rules in this batch that should be run after the output is resolved. ResolveUpCast will rewrite the casts that were inserted and throw exceptions if the cast would truncate and needs to run after this rule.

I could create a batch just after resolution for output resolution. We could just run this rule and ResolveUpCast. I think the optimizer will handle collapsing Projection nodes and aliases are only resolved in this batch, so adding resolved aliases shouldn't be a problem.

Would you like a separate batch for output resolution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok then let's just keep it here

ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
Expand Down Expand Up @@ -2227,6 +2228,98 @@ class Analyzer(
}
}

/**
* Resolves columns of an output table from the data in a logical plan. This rule will:
*
* - Reorder columns when the write is by name
* - Insert safe casts when data types do not match
* - Insert aliases when column names do not match
* - Detect plans that are not compatible with the output table and throw AnalysisException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some tests to verify these behaviors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'm already working on a suite for canWrite and will be adding better tests shortly.

*/
object ResolveOutputRelation extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case append @ AppendData(table, query, isByName)
if table.resolved && query.resolved && !append.resolved =>
val projection = resolveOutputColumns(table.name, table.output, query, isByName)

if (projection != query) {
append.copy(query = projection)
} else {
append
}
}

def resolveOutputColumns(
tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean): LogicalPlan = {

if (expected.size < query.output.size) {
throw new AnalysisException(
s"""Cannot write to '$tableName', too many data columns:
|Table columns: ${expected.map(_.name).mkString(", ")}
|Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
}

val errors = new mutable.ArrayBuffer[String]()
val resolved: Seq[NamedExpression] = if (byName) {
expected.flatMap { tableAttr =>
query.resolveQuoted(tableAttr.name, resolver) match {
case Some(queryExpr) =>
checkField(tableAttr, queryExpr, err => errors += err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems we can pass the errors directly instead of a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather pass functions than mutable state into other methods (side-effects). Plus, a function is cleaner because it doesn't require a a particular storage for the caller. If this were in a tight loop, there would be an argument for changing it but this only happens once for a plan.

case None =>
errors += s"Cannot find data for output column '${tableAttr.name}'"
None
}
}

} else {
if (expected.size > query.output.size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That check is the other direction: not enough columns.

When matching by position, we need to have the same number of columns so we add this check (we already know that there aren't too few columns, so this checks for too many). When matching by name, we can call out specific columns that are missing, which is why we do the validation differently for the two cases.

throw new AnalysisException(
s"""Cannot write to '$tableName', not enough data columns:
|Table columns: ${expected.map(_.name).mkString(", ")}
|Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
}

query.output.zip(expected).flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these checks duplicated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles both append cases, write by name and write by position. This block is checking by position. I'll see if I can refactor the checks into a private method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored these into a helper method.

case (queryExpr, tableAttr) =>
checkField(tableAttr, queryExpr, err => errors += err)
}
}

if (errors.nonEmpty) {
throw new AnalysisException(
s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
}

Project(resolved, query)
}

private def checkField(
tableAttr: Attribute,
queryExpr: NamedExpression,
addError: String => Unit): Option[NamedExpression] = {

if (queryExpr.nullable && !tableAttr.nullable) {
addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'")
None

} else if (!DataType.canWrite(
tableAttr.dataType, queryExpr.dataType, resolver, tableAttr.name, addError)) {
None

} else {
// always add an UpCast. it will be removed in the optimizer if it is unnecessary.
Some(Alias(
UpCast(queryExpr, tableAttr.dataType, Seq()), tableAttr.name
)(
explicitMetadata = Option(tableAttr.metadata)
))
}
}
}

private def commonNaturalJoinProcessing(
left: LogicalPlan,
right: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

trait NamedRelation extends LogicalPlan {
def name: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.{AliasIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning,
RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -353,6 +352,37 @@ case class Join(
}
}

/**
* Append data to an existing table.
*/
case class AppendData(
table: NamedRelation,
query: LogicalPlan,
isByName: Boolean) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is table not a child? Then we can't transform the table relation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What transforms would be enabled by making table a child? Do we want to transform the relation? It is fixed so I didn't think that was a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see that InsertIntoTable doesn't list the relation as a child.

override def output: Seq[Attribute] = Seq.empty

override lazy val resolved: Boolean = {
query.output.size == table.output.size && query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
(outAttr.nullable || !inAttr.nullable)
}
}
}

object AppendData {
def byName(table: NamedRelation, df: LogicalPlan): AppendData = {
new AppendData(table, df, true)
}

def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = {
new AppendData(table, query, false)
}
}

/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
* concrete implementations during analysis.
Expand Down
123 changes: 122 additions & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -336,4 +337,124 @@ object DataType {
case (fromDataType, toDataType) => fromDataType == toDataType
}
}

private val SparkGeneratedName = """col\d+""".r
private def isSparkGeneratedName(name: String): Boolean = name match {
case SparkGeneratedName(_*) => true
case _ => false
}

/**
* Returns true if the write data type can be read using the read data type.
*
* The write type is compatible with the read type if:
* - Both types are arrays, the array element types are compatible, and element nullability is
* compatible (read allows nulls or write does not contain nulls).
* - Both types are maps and the map key and value types are compatible, and value nullability
* is compatible (read allows nulls or write does not contain nulls).
* - Both types are structs and each field in the read struct is present in the write struct and
* compatible (including nullability), or is nullable if the write struct does not contain the
* field. Write-side structs are not compatible if they contain fields that are not present in
* the read-side struct.
* - Both types are atomic and the write type can be safely cast to the read type.
*
* Extra fields in write-side structs are not allowed to avoid accidentally writing data that
* the read schema will not read, and to ensure map key equality is not changed when data is read.
*
* @param write a write-side data type to validate against the read type
* @param read a read-side data type
* @return true if data written with the write type can be read using the read type
*/
def canWrite(
write: DataType,
read: DataType,
resolver: Resolver,
context: String,
addError: String => Unit = (_: String) => {}): Boolean = {
(write, read) match {
case (wArr: ArrayType, rArr: ArrayType) =>
// run compatibility check first to produce all error messages
val typesCompatible =
canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)

if (wArr.containsNull && !rArr.containsNull) {
addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
false
} else {
typesCompatible
}

case (wMap: MapType, rMap: MapType) =>
// map keys cannot include data fields not in the read schema without changing equality when
// read. map keys can be missing fields as long as they are nullable in the read schema.

// run compatibility check first to produce all error messages
val keyCompatible =
canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
val valueCompatible =
canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
val typesCompatible = keyCompatible && valueCompatible

if (wMap.valueContainsNull && !rMap.valueContainsNull) {
addError(s"Cannot write nullable values to map of non-nulls: '$context'")
false
} else {
typesCompatible
}

case (StructType(writeFields), StructType(readFields)) =>
var fieldCompatible = true
readFields.zip(writeFields).foreach {
case (rField, wField) =>
val namesMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check spark generated name? if the number of fields is the same, the generated names also match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i saw the test cases, makes sense to me.

val fieldContext = s"$context.${rField.name}"
val typesCompatible =
canWrite(wField.dataType, rField.dataType, resolver, fieldContext, addError)

if (!namesMatch) {
addError(s"Struct '$context' field name does not match (may be out of order): " +
s"expected '${rField.name}', found '${wField.name}'")
fieldCompatible = false
} else if (!rField.nullable && wField.nullable) {
addError(s"Cannot write nullable values to non-null field: '$fieldContext'")
fieldCompatible = false
} else if (!typesCompatible) {
// errors are added in the recursive call to canWrite above
fieldCompatible = false
}
}

if (readFields.size > writeFields.size) {
val missingFieldsStr = readFields.takeRight(readFields.size - writeFields.size)
.map(f => s"'${f.name}'").mkString(", ")
if (missingFieldsStr.nonEmpty) {
addError(s"Struct '$context' missing fields: $missingFieldsStr")
fieldCompatible = false
}

} else if (writeFields.size > readFields.size) {
val extraFieldsStr = writeFields.takeRight(writeFields.size - readFields.size)
.map(f => s"'${f.name}'").mkString(", ")
addError(s"Cannot write extra fields to struct '$context': $extraFieldsStr")
fieldCompatible = false
}

fieldCompatible

case (w: AtomicType, r: AtomicType) =>
if (!Cast.canSafeCast(w, r)) {
addError(s"Cannot safely cast '$context': $w to $r")
false
} else {
true
}

case (w, r) if w.sameType(r) && !w.isInstanceOf[NullType] =>
true

case (w, r) =>
addError(s"Cannot write '$context': $w is incompatible with $r")
false
}
}
}
Loading