Skip to content

Commit 92f3827

Browse files
chenghao-intelmbautin
authored andcommitted
[SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list
```scala import org.apache.spark.sql.hive.execution.HiveTableScan sql("select key, value, key + 1 from src").registerTempTable("abc") cacheTable("abc") val sparkPlan = sql( """select a.key, b.key, c.key from |abc a join abc b on a.key=b.key |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed ``` The actual plan is: ``` == Parsed Logical Plan == 'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)] 'Join Inner, Some(('a.key = 'c.key)) 'Join Inner, Some(('a.key = 'b.key)) 'UnresolvedRelation [abc], Some(a) 'UnresolvedRelation [abc], Some(b) 'UnresolvedRelation [abc], Some(c) == Analyzed Logical Plan == key: int, key: int, key: int Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Join Inner, Some((key#14 = key#61)) Subquery a Subquery abc Project [key#14,value#15,(key#14 + 1) AS _c2#16] MetastoreRelation default, src, None Subquery b Subquery abc Project [key#61,value#62,(key#61 + 1) AS _c2#58] MetastoreRelation default, src, None Subquery c Subquery abc Project [key#66,value#67,(key#66 + 1) AS _c2#63] MetastoreRelation default, src, None == Optimized Logical Plan == Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Project [key#14,key#61] Join Inner, Some((key#14 = key#61)) Project [key#14] InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc) Project [key#61] MetastoreRelation default, src, None Project [key#66] MetastoreRelation default, src, None == Physical Plan == TungstenProject [key#14,key#61,key#66] BroadcastHashJoin [key#14], [key#66], BuildRight TungstenProject [key#14,key#61] BroadcastHashJoin [key#14], [key#61], BuildRight ConvertToUnsafe InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)) ConvertToUnsafe HiveTableScan [key#61], (MetastoreRelation default, src, None) ConvertToUnsafe HiveTableScan [key#66], (MetastoreRelation default, src, None) ``` Author: Cheng Hao <[email protected]> Closes apache#8494 from chenghao-intel/weird_cache.
1 parent facc48e commit 92f3827

File tree

2 files changed

+28
-3
lines changed

2 files changed

+28
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
135135
/** Args that have cleaned such that differences in expression id should not affect equality */
136136
protected lazy val cleanArgs: Seq[Any] = {
137137
val input = children.flatMap(_.output)
138+
def cleanExpression(e: Expression) = e match {
139+
case a: Alias =>
140+
// As the root of the expression, Alias will always take an arbitrary exprId, we need
141+
// to erase that for equality testing.
142+
val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
143+
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
144+
case other => BindReferences.bindReference(other, input, allowFailures = true)
145+
}
146+
138147
productIterator.map {
139148
// Children are checked using sameResult above.
140149
case tn: TreeNode[_] if containsChild(tn) => null
141-
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
150+
case e: Expression => cleanExpression(e)
142151
case s: Option[_] => s.map {
143-
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
152+
case e: Expression => cleanExpression(e)
144153
case other => other
145154
}
146155
case s: Seq[_] => s.map {
147-
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
156+
case e: Expression => cleanExpression(e)
148157
case other => other
149158
}
150159
case other => other

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.execution.PhysicalRDD
21+
2022
import scala.concurrent.duration._
2123
import scala.language.postfixOps
2224

@@ -336,4 +338,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
336338
assert((accsSize - 2) == Accumulators.originals.size)
337339
}
338340
}
341+
342+
test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
343+
ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
344+
.toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
345+
ctx.cacheTable("abc")
346+
347+
val sparkPlan = sql(
348+
"""select a.key, b.key, c.key from
349+
|abc a join abc b on a.key=b.key
350+
|join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
351+
352+
assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
353+
assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
354+
}
339355
}

0 commit comments

Comments
 (0)