Skip to content

Commit 22db44a

Browse files
committed
review comments
1 parent 7346dca commit 22db44a

File tree

4 files changed

+32
-28
lines changed

4 files changed

+32
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ abstract class SubqueryExpression(
4747
plan: LogicalPlan,
4848
children: Seq[Expression],
4949
exprId: ExprId) extends PlanExpression[LogicalPlan] {
50-
5150
override lazy val resolved: Boolean = childrenResolved && plan.resolved
5251
override lazy val references: AttributeSet =
5352
if (plan.resolved) super.references -- plan.outputSet else super.references
@@ -59,11 +58,10 @@ abstract class SubqueryExpression(
5958
children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
6059
case _ => false
6160
}
62-
6361
def canonicalize(attrs: AttributeSeq): SubqueryExpression = {
6462
// Normalize the outer references in the subquery plan.
6563
val subPlan = plan.transformAllExpressions {
66-
case OuterReference(r) => plan.normalizeExprId(r, attrs)
64+
case OuterReference(r) => QueryPlan.normalizeExprId(r, attrs)
6765
}
6866
withNewPlan(subPlan).canonicalized.asInstanceOf[SubqueryExpression]
6967
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -377,15 +377,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
377377
// As the root of the expression, Alias will always take an arbitrary exprId, we need to
378378
// normalize that for equality testing, by assigning expr id from 0 incrementally. The
379379
// alias name doesn't matter and should be erased.
380-
Alias(normalizeExprId(a.child), "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)
380+
val normalizedChild = QueryPlan.normalizeExprId(a.child, allAttributes)
381+
Alias(normalizedChild, "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)
381382

382383
case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
383384
// Top level `AttributeReference` may also be used for output like `Alias`, we should
384385
// normalize the epxrId too.
385386
id += 1
386387
ar.withExprId(ExprId(id))
387388

388-
case other => normalizeExprId(other)
389+
case other => QueryPlan.normalizeExprId(other, allAttributes)
389390
}.withNewChildren(canonicalizedChildren)
390391
}
391392

@@ -395,24 +396,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
395396
*/
396397
protected def preCanonicalized: PlanType = this
397398

398-
/**
399-
* Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
400-
* with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
401-
* do not use `BindReferences` here as the plan may take the expression as a parameter with type
402-
* `Attribute`, and replace it with `BoundReference` will cause error.
403-
*/
404-
def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = {
405-
e.transformUp {
406-
case s: SubqueryExpression => s.canonicalize(input)
407-
case ar: AttributeReference =>
408-
val ordinal = input.indexOf(ar.exprId)
409-
if (ordinal == -1) {
410-
ar
411-
} else {
412-
ar.withExprId(ExprId(ordinal))
413-
}
414-
}.canonicalized.asInstanceOf[T]
415-
}
416399

417400
/**
418401
* Returns true when the given query plan will return the same results as this query plan.
@@ -439,3 +422,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
439422
*/
440423
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
441424
}
425+
426+
object QueryPlan {
427+
/**
428+
* Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
429+
* with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
430+
* do not use `BindReferences` here as the plan may take the expression as a parameter with type
431+
* `Attribute`, and replace it with `BoundReference` will cause error.
432+
*/
433+
def normalizeExprId[T <: Expression](e: T, input: AttributeSeq): T = {
434+
e.transformUp {
435+
case s: SubqueryExpression => s.canonicalize(input)
436+
case ar: AttributeReference =>
437+
val ordinal = input.indexOf(ar.exprId)
438+
if (ordinal == -1) {
439+
ar
440+
} else {
441+
ar.withExprId(ExprId(ordinal))
442+
}
443+
}.canonicalized.asInstanceOf[T]
444+
}
445+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2929
import org.apache.spark.sql.catalyst.expressions._
3030
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
31+
import org.apache.spark.sql.catalyst.plans.QueryPlan
3132
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
3233
import org.apache.spark.sql.execution.datasources._
3334
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -516,10 +517,10 @@ case class FileSourceScanExec(
516517
override lazy val canonicalized: FileSourceScanExec = {
517518
FileSourceScanExec(
518519
relation,
519-
output.map(normalizeExprId(_, output)),
520+
output.map(QueryPlan.normalizeExprId(_, output)),
520521
requiredSchema,
521-
partitionFilters.map(normalizeExprId(_, output)),
522-
dataFilters.map(normalizeExprId(_, output)),
522+
partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
523+
dataFilters.map(QueryPlan.normalizeExprId(_, output)),
523524
None)
524525
}
525526
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.SparkSession
3232
import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
3434
import org.apache.spark.sql.catalyst.expressions._
35+
import org.apache.spark.sql.catalyst.plans.QueryPlan
3536
import org.apache.spark.sql.execution._
3637
import org.apache.spark.sql.execution.metric.SQLMetrics
3738
import org.apache.spark.sql.hive._
@@ -203,9 +204,9 @@ case class HiveTableScanExec(
203204
override lazy val canonicalized: HiveTableScanExec = {
204205
val input: AttributeSeq = relation.output
205206
HiveTableScanExec(
206-
requestedAttributes.map(normalizeExprId(_, input)),
207+
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
207208
relation.canonicalized.asInstanceOf[CatalogRelation],
208-
partitionPruningPred.map(normalizeExprId(_, input)))(sparkSession)
209+
partitionPruningPred.map(QueryPlan.normalizeExprId(_, input)))(sparkSession)
209210
}
210211

211212
override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession)

0 commit comments

Comments
 (0)