Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,58 @@ object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
}
}

// Project [1 AS cste#13, col#12]
// +- Union
// :- Project [col#14 AS col#12]
// : +- Join LeftOuter
// : :- MetastoreRelation default, p1
// : +- MetastoreRelation default, p2
// +- Project [col#16 AS col#12]
// +- MetastoreRelation default, p3
//
// if we replace col#12 with col#16, Project [1 AS cste#13, col#12]
// change to Project [1 AS cste#13, col#16],
// Project [1 AS cste#13, col#16]
// +- Union
// :- Project [col#14 AS col#12]
// : +- Join LeftOuter
// : :- MetastoreRelation default, p1
// : +- MetastoreRelation default, p2
// +- MetastoreRelation default, p3
//
// after then apply PushProjectionThroughUnion, because the output attributes
// of a union are always equal to the left child's output,
// while col#16 does not exist in left child, it will throw a exception.
//
// if we replace col#12 with col#14, Project [1 AS cste#13, col#12]
// change to Project [1 AS cste#13, col#14],
// Project [1 AS cste#13, col#14]
// +- Union
// : +- Join LeftOuter
// : :- MetastoreRelation default, p1
// : +- MetastoreRelation default, p2
// +- Project [col#16 AS col#12]
// +- MetastoreRelation default, p3
//
// and then apply PushProjectionThroughUnion,
// the attribute(col#14) of union exists in left child, it will be ok.
//
// so here we should not do RemoveAliasOnlyProject
// when the project is the right child of the Union,
// this function is to check it.
def isUnionRightProj(p: Project, plan: LogicalPlan): Boolean = {
plan.collectFirst {
case u @ Union(children) if children.tail.exists(_ eq p)
&& children.head.isInstanceOf[Project]
&& p.projectList.exists(proj =>
children.head.asInstanceOf[Project].projectList.exists(_.exprId == proj.exprId)) => p
}.nonEmpty
}

def apply(plan: LogicalPlan): LogicalPlan = {
val aliasOnlyProject = plan.collectFirst {
case p @ Project(pList, child) if isAliasOnly(pList, child.output) => p
case p @ Project(pList, child) if isAliasOnly(pList, child.output)
&& !isUnionRightProj(p, plan) => p
}

aliasOnlyProject.map { case proj =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,28 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}


test("union all with same column") {
sql("DROP TABLE IF EXISTS p1")
sql("DROP TABLE IF EXISTS p2")
sql("DROP TABLE IF EXISTS p3")

sql("CREATE TABLE p1 (col int)" )
sql("CREATE TABLE p2 (col int)")
sql("CREATE TABLE p3 (col int)")

sql("set spark.sql.crossJoin.enabled = true")

assert(sql(
"""
|SELECT 1 as cste,col FROM
| (SELECT col as col FROM
| (SELECT p1.col as col FROM p1 LEFT JOIN p2
| UNION ALL SELECT col FROM p3 ) T1) T2
""".stripMargin).collect.isEmpty)

}

def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
Expand Down