From e0957b1b6f441703c758b9b83490295760531046 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 4 Jul 2014 17:30:54 +0900 Subject: [PATCH 1/3] Add column pruning for the right side of LeftSemi join. --- .../sql/catalyst/optimizer/Optimizer.scala | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fb517e40677e..a4c772c1c822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { * - Inserting Projections beneath the following operators: * - Aggregate * - Project <- Join + * - LeftSemiJoin * - Collapse adjacent projections, performing alias substitution. */ object ColumnPruning extends Rule[LogicalPlan] { @@ -67,15 +68,18 @@ object ColumnPruning extends Rule[LogicalPlan] { projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ - def prunedChild(c: LogicalPlan) = - if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { - Project(allReferences.filter(c.outputSet.contains).toSeq, c) - } else { - c - } + def prunedChild(c: LogicalPlan) = ColumnPruning.prunedChild(c, allReferences) Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + // Eliminate unneeded attributes from right side of a LeftSemiJoin. + case Join(left, right, LeftSemi, condition) => + // Collect the list of off references required either above or to evaluate the condition. + val allReferences: Set[Attribute] = + condition.map(_.references).getOrElse(Set.empty) + + Join(left, prunedChild(right, allReferences), LeftSemi, condition) + // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => // Create a map of Aliases to their values from the child projection. @@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) = + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } } /** From 786d3a03cc7b51b56de0fb6a9b73409b05fc428d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 12:20:16 +0900 Subject: [PATCH 2/3] Rename method name. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a4c772c1c822..6ea2a43b40f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -68,9 +68,9 @@ object ColumnPruning extends Rule[LogicalPlan] { projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ - def prunedChild(c: LogicalPlan) = ColumnPruning.prunedChild(c, allReferences) + def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences) - Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) // Eliminate unneeded attributes from right side of a LeftSemiJoin. case Join(left, right, LeftSemi, condition) => From 7677a395f2515df659f75f77f148e5d81043a4c9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 12:20:41 +0900 Subject: [PATCH 3/3] Update comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6ea2a43b40f3..48ca31e70fe7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -63,7 +63,7 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => - // Collect the list of off references required either above or to evaluate the condition. + // Collect the list of all references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) @@ -74,7 +74,7 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate unneeded attributes from right side of a LeftSemiJoin. case Join(left, right, LeftSemi, condition) => - // Collect the list of off references required either above or to evaluate the condition. + // Collect the list of all references required to evaluate the condition. val allReferences: Set[Attribute] = condition.map(_.references).getOrElse(Set.empty)