Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ class Analyzer(

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
lookupTableFromCatalog(u).canonicalized match {
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,15 @@ case class CatalogRelation(
Objects.hashCode(tableMeta.identifier, output)
}

/** Only compare table identifier. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should compare more, e.g. if the table schema is altered, the new table relation should not be considered as same with the old table relation, even after canonicalization. Also, it's tricky to remove the output of a plan during canonicalization as the parenting plan may rely on the output.

override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)
override def preCanonicalized: LogicalPlan = copy(tableMeta = CatalogTable(
identifier = tableMeta.identifier,
tableType = tableMeta.tableType,
storage = CatalogStorageFormat.empty,
schema = tableMeta.schema,
partitionColumnNames = tableMeta.partitionColumnNames,
bucketSpec = tableMeta.bucketSpec,
createTime = -1
))

override def computeStats(conf: SQLConf): Statistics = {
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,59 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* Canonicalized copy of this query plan.
* Returns a plan where a best effort attempt has been made to transform `this` in a way
* that preserves the result but removes cosmetic variations (case sensitivity, ordering for
* commutative operations, expression id, etc.)
*
* Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
* result.
*
* Some nodes should overwrite this to provide proper canonicalize logic.
*/
lazy val canonicalized: PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
preCanonicalized.mapExpressions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider non-deterministic expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see Expression.semanticEquals, non-deterministic expressions will never equal to other expressions.

case a: Alias =>
id += 1
// As the root of the expression, Alias will always take an arbitrary exprId, we need to
// normalize that for equality testing, by assigning expr id from 0 incrementally. The
// alias name doesn't matter and should be erased.
Alias(normalizeExprId(a.child), "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)

case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
// Top level `AttributeReference` may also be used for output like `Alias`, we should
// normalize the epxrId too.
id += 1
ar.withExprId(ExprId(id))

case other => normalizeExprId(other)
}.withNewChildren(canonicalizedChildren)
}

/**
* Do some simple transformation on this plan before canonicalizing. Implementations can override
* this method to provide customized canonicalize logic without rewriting the whole logic.
*/
protected lazy val canonicalized: PlanType = this
protected def preCanonicalized: PlanType = this

/**
* Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
* with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
* do not use `BindReferences` here as the plan may take the expression as a parameter with type
* `Attribute`, and replace it with `BoundReference` will cause error.
*/
protected def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = {
e.transformUp {
case ar: AttributeReference =>
val ordinal = input.indexOf(ar.exprId)
if (ordinal == -1) {
ar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to normalize exprIds in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, actually this is unexpected, the attribute should either reference to input attributes, or represent new output at top level. Keep it unchanged so that the equality check will fail later.

} else {
ar.withExprId(ExprId(ordinal))
}
}.canonicalized.asInstanceOf[T]
}

/**
* Returns true when the given query plan will return the same results as this query plan.
Expand All @@ -372,49 +422,19 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* enhancements like caching. However, it is not acceptable to return true if the results could
* possibly be different.
*
* By default this function performs a modified version of equality that is tolerant of cosmetic
* differences like attribute naming and or expression id differences. Operators that
* can do better should override this function.
* This function performs a modified version of equality that is tolerant of cosmetic
* differences like attribute naming and or expression id differences.
*/
def sameResult(plan: PlanType): Boolean = {
val left = this.canonicalized
val right = plan.canonicalized
left.getClass == right.getClass &&
left.children.size == right.children.size &&
left.cleanArgs == right.cleanArgs &&
(left.children, right.children).zipped.forall(_ sameResult _)
}
final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use ==, we need to overwrite many equals and hashCode function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Given that both sides are canonicalized, the default case class equals and hash code methods should work, right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Yes or no. If it is case class, scala compiler will help you define a default equals. If it is class, you need to define it by yourselves. For example,

class NestedObj (i: Int)

val m = new NestedObj(3)
val n = new NestedObj(3)

assert(m != n)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TreeNode requires its implementations to be Product, I think all of the LogicalPlans and SparkPlans are case class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also how Expression.semanticEquals works

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we already assume it in the existing solution. I just realized it. : )

We also need to ensure all the arguments of the case class are primitive types or from the class with a defined equals.

class NestedObj (i: Int)

val m = new NestedObj(3)
val n = new NestedObj(3)

assert(m != n)

case class Obj (i: NestedObj)

val p = Obj(m)
val q = Obj(n)

assert(p != q)


/**
* Returns a `hashCode` for the calculation performed by this plan. Unlike the standard
* `hashCode`, an attempt has been made to eliminate cosmetic differences.
*/
final def semanticHash(): Int = canonicalized.hashCode()

/**
* All the attributes that are used for this plan.
*/
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)

protected def cleanExpression(e: Expression): Expression = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId =
Alias(a.child, a.name)(ExprId(-1), a.qualifier, isGenerated = a.isGenerated)
BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
case other =>
BindReferences.bindReference(other, allAttributes, allowFailures = true)
}

/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
def cleanArg(arg: Any): Any = arg match {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => cleanExpression(e).canonicalized
case other => other
}

mapProductIterator {
case s: Option[_] => s.map(cleanArg)
case s: Seq[_] => s.map(cleanArg)
case m: Map[_, _] => m.mapValues(cleanArg)
case other => cleanArg(other)
}.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}

override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case LocalRelation(otherOutput, otherData) =>
otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
case _ => false
}
}

override def computeStats(conf: SQLConf): Statistics =
Statistics(sizeInBytes =
output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def childrenResolved: Boolean = children.forall(_.resolved)

override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)

/**
* Resolves a given schema to concrete [[Attribute]] references in this query plan. This function
* should only be called on analyzed plans since it will throw [[AnalysisException]] for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,8 @@ case class SubqueryAlias(
child: LogicalPlan)
extends UnaryNode {

override lazy val canonicalized: LogicalPlan = child.canonicalized

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
trait BroadcastMode {
def transform(rows: Array[InternalRow]): Any

/**
* Returns true iff this [[BroadcastMode]] generates the same result as `other`.
*/
def compatibleWith(other: BroadcastMode): Boolean
Copy link
Contributor

@rxin rxin Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we getting rid of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastMode is a field of BroadcastExchangeExec. Since we need to canonicalize a QueryPlan, the BroadcastMode also need to be canonicalized.

def canonicalized: BroadcastMode
}

/**
Expand All @@ -39,7 +36,5 @@ case object IdentityBroadcastMode extends BroadcastMode {
// TODO: pack the UnsafeRows into single bytes array.
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows

override def compatibleWith(other: BroadcastMode): Boolean = {
this eq other
}
override def canonicalized: BroadcastMode = this
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ case class RowDataSourceScanExec(
val input = ctx.freshName("input")
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
new BoundReference(i, a.dataType, a.nullable)
BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
Expand All @@ -136,27 +136,25 @@ case class RowDataSourceScanExec(
""".stripMargin
}

// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
case _ => false
}
// Only care about `relation` and `metadata` when canonicalizing.
override def preCanonicalized: SparkPlan =
copy(rdd = null, outputPartitioning = null, metastoreTableIdentifier = None)
}

/**
* Physical plan node for scanning data from HadoopFsRelations.
*
* @param relation The file-based relation to scan.
* @param output Output attributes of the scan.
* @param outputSchema Output schema of the scan.
* @param output Output attributes of the scan, including data attributes and partition attributes.
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
outputSchema: StructType,
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
Expand Down Expand Up @@ -267,7 +265,7 @@ case class FileSourceScanExec(
val metadata =
Map(
"Format" -> relation.fileFormat.toString,
"ReadSchema" -> outputSchema.catalogString,
"ReadSchema" -> requiredSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
Expand All @@ -287,7 +285,7 @@ case class FileSourceScanExec(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = outputSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
Expand Down Expand Up @@ -515,14 +513,13 @@ case class FileSourceScanExec(
}
}

override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: FileSourceScanExec =>
val thisPredicates = partitionFilters.map(cleanExpression)
val otherPredicates = other.partitionFilters.map(cleanExpression)
val result = relation == other.relation && metadata == other.metadata &&
thisPredicates.length == otherPredicates.length &&
thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2))
result
case _ => false
override lazy val canonicalized: FileSourceScanExec = {
FileSourceScanExec(
relation,
output.map(normalizeExprId(_, output)),
requiredSchema,
partitionFilters.map(normalizeExprId(_, output)),
dataFilters.map(normalizeExprId(_, output)),
None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ case class ExternalRDD[T](
override def newInstance(): ExternalRDD.this.type =
ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type]

override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id
case _ => false
}
}

override protected def stringArgs: Iterator[Any] = Iterator(output)

@transient override def computeStats(conf: SQLConf): Statistics = Statistics(
Expand Down Expand Up @@ -162,13 +155,6 @@ case class LogicalRDD(
)(session).asInstanceOf[this.type]
}

override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id
case _ => false
}
}

override protected def stringArgs: Iterator[Any] = Iterator(output)

@transient override def computeStats(conf: SQLConf): Statistics = Statistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class LocalTableScanExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

private val unsafeRows: Array[InternalRow] = {
private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
Array.empty
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numGeneratedRows" -> SQLMetrics.createMetric(sparkContext, "number of generated rows"))

// output attributes should not affect the results
override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
override lazy val canonicalized: SparkPlan = {
RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
Expand Down Expand Up @@ -607,11 +608,6 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def sameResult(o: SparkPlan): Boolean = o match {
case s: SubqueryExec => child.sameResult(s.child)
case _ => false
}

@transient
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,8 @@ case class LogicalRelation(
com.google.common.base.Objects.hashCode(relation, output)
}

override def sameResult(otherPlan: LogicalPlan): Boolean = {
otherPlan.canonicalized match {
case LogicalRelation(otherRelation, _, _) => relation == otherRelation
case _ => false
}
}

// When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need
// LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)
// Only care about relation when canonicalizing.
override def preCanonicalized: LogicalPlan = copy(catalogTable = None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The builders of external data sources need to implement equals and hashCode if they want to utilize our cache management.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's the same behavior as before


@transient override def computeStats(conf: SQLConf): Statistics = {
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ case class BroadcastExchangeExec(

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)

override def sameResult(plan: SparkPlan): Boolean = plan match {
case p: BroadcastExchangeExec =>
mode.compatibleWith(p.mode) && child.sameResult(p.child)
case _ => false
override lazy val canonicalized: SparkPlan = {
BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ abstract class Exchange extends UnaryExecNode {
case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchange)
extends LeafExecNode {

override def sameResult(plan: SparkPlan): Boolean = {
// Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here.
plan.sameResult(child)
}
// Ignore this wrapper for canonicalizing.
override lazy val canonicalized: SparkPlan = child.canonicalized

def doExecute(): RDD[InternalRow] = {
child.execute()
Expand Down
Loading