Skip to content

Commit b243ff8

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-6674] Project fail to be collapsed (#1283)
* [CARMEL-6674]: Project fail to be collapsed * fix checkstyle * fix
1 parent 93f89f9 commit b243ff8

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
234234
DeduplicateRightSideOfLeftSemiAntiJoin,
235235
RemoveNoopOperators) :+
236236
Batch("Pull out complex join condition", Once,
237-
PullOutComplexJoinCondition) :+
237+
PullOutComplexJoinCondition,
238+
CollapseProject) :+
238239
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
239240
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
240241

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullOutComplexJoinCondition.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ object PullOutComplexJoinCondition extends Rule[LogicalPlan]
6464
}
6565
val newLeft = Project(left.output ++ leftComplexExpMap.values, left)
6666
val newRight = Project(right.output ++ rightComplexExpMap.values, right)
67-
CollapseProject(Project(j.output, j.copy(left = newLeft, right = newRight,
68-
condition = Some(newCondition))))
67+
Project(j.output, j.copy(left = newLeft, right = newRight, condition = Some(newCondition)))
6968
} else {
7069
j
7170
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRow, In}
2929
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
3030
import org.apache.spark.sql.catalyst.optimizer._
3131
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, JoinType}
32-
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
32+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
3333
import org.apache.spark.sql.catalyst.rules.Rule
3434
import org.apache.spark.sql.catalyst.util.StringUtils
3535
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
@@ -6800,6 +6800,22 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
68006800
}
68016801
}
68026802
}
6803+
6804+
test("CARMEL-6674: Project fail to be collapsed") {
6805+
withTable("t1", "t2") {
6806+
spark.sql("CREATE TABLE t1(id STRING) USING PARQUET")
6807+
spark.sql("CREATE TABLE t2(sid DECIMAL(18,0), ct TIMESTAMP) USING PARQUET")
6808+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
6809+
val plan = spark.sql(
6810+
"""
6811+
|SELECT ct as run_time
6812+
|FROM t1 LEFT JOIN t2 ON id = sid
6813+
|""".stripMargin).queryExecution.optimizedPlan
6814+
assert(plan.isInstanceOf[Project] &&
6815+
plan.asInstanceOf[Project].child.isInstanceOf[Join])
6816+
}
6817+
}
6818+
}
68036819
}
68046820

68056821
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)