Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
val input = children.flatMap(_.output)
def cleanExpression(e: Expression) = e match {
case a: Alias =>
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
case other => BindReferences.bindReference(other, input, allowFailures = true)
}

productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case s: Option[_] => s.map {
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case other => other
}
case s: Seq[_] => s.map {
case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
case e: Expression => cleanExpression(e)
case other => other
}
case other => other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.PhysicalRDD

import scala.concurrent.duration._
import scala.language.postfixOps

Expand Down Expand Up @@ -336,4 +338,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert((accsSize - 2) == Accumulators.originals.size)
}
}

test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
.toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
ctx.cacheTable("abc")

val sparkPlan = sql(
"""select a.key, b.key, c.key from
|abc a join abc b on a.key=b.key
|join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan

assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
}
}