Skip to content

Commit 5feeff0

Browse files
committed
Improve column pruning.
1 parent 70bcdef commit 5feeff0

File tree

1 file changed

+11
-5
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer

1 file changed

+11
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._
2525

2626
object Optimizer extends RuleExecutor[LogicalPlan] {
2727
val batches =
28-
Batch("ConstantFolding", Once,
28+
Batch("ConstantFolding", FixedPoint(100),
2929
NullPropagation,
3030
ConstantFolding,
3131
BooleanSimplification,
3232
SimplifyFilters,
3333
SimplifyCasts) ::
34-
Batch("Filter Pushdown", Once,
34+
Batch("Filter Pushdown", FixedPoint(100),
3535
CombineFilters,
3636
PushPredicateThroughProject,
3737
PushPredicateThroughInnerJoin,
@@ -49,24 +49,27 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
4949
*/
5050
object ColumnPruning extends Rule[LogicalPlan] {
5151
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
52+
// Eliminate attributes that are not needed to calculate the specified aggregates.
5253
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
53-
// Project away references that are not needed to calculate the required aggregates.
5454
a.copy(child = Project(a.references.toSeq, child))
5555

56+
// Eliminate unneeded attributes from either side of a Join.
5657
case Project(projectList, Join(left, right, joinType, condition)) =>
5758
// Collect the list of off references required either above or to evaluate the condition.
5859
val allReferences: Set[Attribute] =
5960
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
60-
/** Applies a projection when the child is producing unnecessary attributes */
61+
62+
/** Applies a projection only when the child is producing unnecessary attributes */
6163
def prunedChild(c: LogicalPlan) =
62-
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
64+
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
6365
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
6466
} else {
6567
c
6668
}
6769

6870
Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
6971

72+
// Combine adjacent Projects.
7073
case Project(projectList1, Project(projectList2, child)) =>
7174
// Create a map of Aliases to their values from the child projection.
7275
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
8386
}).asInstanceOf[Seq[NamedExpression]]
8487

8588
Project(substitutedProjection, child)
89+
90+
// Eliminate no-op Projects
91+
case Project(projectList, child) if(child.output == projectList) => child
8692
}
8793
}
8894

0 commit comments

Comments
 (0)