Skip to content

Commit cffc080

Browse files
ueshincloud-fan
authored andcommitted
[SPARK-15915][SQL] Logical plans should use subqueries eliminated plan when override sameResult.
## What changes were proposed in this pull request? This pr is a backport of #13638 for `branch-1.6`. ## How was this patch tested? Added the same test as #13638 modified for `branch-1.6`. Author: Takuya UESHIN <[email protected]> Closes #13668 from ueshin/issues/SPARK-15915_1.6.
1 parent 2f3e327 commit cffc080

File tree

5 files changed

+32
-14
lines changed

5 files changed

+32
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
2122
import org.apache.spark.sql.catalyst.expressions.Attribute
2223
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis}
2324
import org.apache.spark.sql.types.{StructField, StructType}
@@ -56,10 +57,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
5657

5758
override protected def stringArgs = Iterator(output)
5859

59-
override def sameResult(plan: LogicalPlan): Boolean = plan match {
60-
case LocalRelation(otherOutput, otherData) =>
61-
otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
62-
case _ => false
60+
override def sameResult(plan: LogicalPlan): Boolean = {
61+
EliminateSubQueries(plan) match {
62+
case LocalRelation(otherOutput, otherData) =>
63+
otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
64+
case _ => false
65+
}
6366
}
6467

6568
override lazy val statistics =

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
22-
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
22+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation}
2323
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
2424
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
2525
import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
@@ -79,9 +79,11 @@ private[sql] case class LogicalRDD(
7979
override def newInstance(): LogicalRDD.this.type =
8080
LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type]
8181

82-
override def sameResult(plan: LogicalPlan): Boolean = plan match {
83-
case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
84-
case _ => false
82+
override def sameResult(plan: LogicalPlan): Boolean = {
83+
EliminateSubQueries(plan) match {
84+
case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
85+
case _ => false
86+
}
8587
}
8688

8789
@transient override lazy val statistics: Statistics = Statistics(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources
1818

19-
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
19+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation}
2020
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
2121
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2222
import org.apache.spark.sql.sources.BaseRelation
@@ -57,9 +57,11 @@ case class LogicalRelation(
5757
com.google.common.base.Objects.hashCode(relation, output)
5858
}
5959

60-
override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
61-
case LogicalRelation(otherRelation, _) => relation == otherRelation
62-
case _ => false
60+
override def sameResult(otherPlan: LogicalPlan): Boolean = {
61+
EliminateSubQueries(otherPlan) match {
62+
case LogicalRelation(otherRelation, _) => relation == otherRelation
63+
case _ => false
64+
}
6365
}
6466

6567
// When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,4 +506,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
506506
sqlContext.uncacheTable("t2")
507507
}
508508
}
509+
510+
test("SPARK-15915 Logical plans should use subqueries eliminated plan when override sameResult") {
511+
val localRelation = sqlContext.createDataset(Seq(1, 2, 3)).toDF()
512+
localRelation.registerTempTable("localRelation")
513+
514+
sqlContext.cacheTable("localRelation")
515+
assert(
516+
localRelation.queryExecution.withCachedData.collect {
517+
case i: InMemoryRelation => i
518+
}.size == 1)
519+
}
509520
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata._
3131
import org.apache.hadoop.hive.ql.plan.TableDesc
3232

3333
import org.apache.spark.Logging
34-
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
34+
import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries, MultiInstanceRelation, OverrideCatalog}
3535
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
3636
import org.apache.spark.sql.catalyst.expressions._
3737
import org.apache.spark.sql.catalyst.plans.logical
@@ -832,7 +832,7 @@ private[hive] case class MetastoreRelation
832832

833833
/** Only compare database and tablename, not alias. */
834834
override def sameResult(plan: LogicalPlan): Boolean = {
835-
plan match {
835+
EliminateSubQueries(plan) match {
836836
case mr: MetastoreRelation =>
837837
mr.databaseName == databaseName && mr.tableName == tableName
838838
case _ => false

0 commit comments

Comments
 (0)