Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
SamplePushDown,
ReorderJoin,
OuterJoinElimination,
PushPredicateThroughJoin,
PushPredicateThroughProject,
SetOperationPushDown,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
LimitPushDown,
PushProjectThroughFilter,
ColumnPruning,
EliminateOperators,
// Operator combine
Expand All @@ -91,6 +92,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
// Because ColumnPruning is called after PushPredicateThroughProject, the predicate push down
// is reversed. This batch is to ensure Filter is pushed below Project, if possible.
Batch("Push Predicate Through Project", Once,
PushPredicateThroughProject) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this role in a separate batch is not correct, some other filter push down rules depend on this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not remove it from the original batch. Just added the extra batch here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I missed that, sorry.

Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
Expand Down Expand Up @@ -306,14 +311,28 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations:
* Attempts to eliminate the reading of unneeded columns from the query plan
* by pushing Project through Filter.
*
* - Inserting Projections beneath the following operators:
* - Aggregate
* - Generate
* - Project <- Join
* - LeftSemiJoin
* Note: This rule could reverse the effects of PushPredicateThroughProject.
* This rule should be run before ColumnPruning for ensuring that Project can be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little against to depending on rules order too much, sometimes we have to as other solutions are way too complex, but for this issue, can we try to find a more general solution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have the same concern. This PR is just to resolve the conflicts based on the current infrastructure.

In my opinion, in each batch, we need a few rule sets. The order of rule sets do not matter. In each rule set, the order of rules matters. However, this is a fundamental design change. @marmbrus @rxin might have a better idea in this.

* pushed as low as possible.
*/
object PushProjectThroughFilter extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We does not actual PUSH project through filter, we create new Project before to prune some columns.

As I said in another PR, we remove the those Project before filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies The naming of this rule is not right, but I still think this PR fixes the fundamental issue of the conflicts between ColumnPruning and PushPredicateThroughProject. If we do not take the ideas of this PR, I can find a test case to show the minor fix in ColumnPruning does not cover all the cases.

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p @ Project(projectList, f: Filter)
if f.condition.deterministic && projectList.forall(_.deterministic) =>
val required = f.references ++ p.references
if ((f.inputSet -- required).nonEmpty) {
p.copy(child = f.copy(child = ColumnPruning.prunedChild(f.child, required)))
} else {
p
}
}
}

/**
* Attempts to eliminate the reading of unneeded columns from the query plan
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
Expand Down Expand Up @@ -392,7 +411,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
}

/** Applies a projection only when the child is producing unnecessary attributes */
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
def prunedChild(c: LogicalPlan, allReferences: AttributeSet): LogicalPlan =
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(c.output.filter(allReferences.contains), c)
} else {
Expand Down Expand Up @@ -874,6 +893,10 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
* that were defined in the projection.
*
* This heuristic is valid assuming the expression evaluation cost is minimal.
*
* Note: Because PushProjectThroughFilter could reverse the effect of PushPredicateThroughProject,
* PushPredicateThroughProject needs to be called before the other Predicate Push Down rules for
* ensuring the predicates can be pushed as low as possible.
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class ColumnPruningSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Column pruning", FixedPoint(100),
PushPredicateThroughProject,
PushPredicateThroughJoin,
PushProjectThroughFilter,
ColumnPruning,
EliminateOperators,
CollapseProject) :: Nil
Expand Down