Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,17 @@ object PhysicalOperation extends PredicateHelper {
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))

case Filter(condition, child) if condition.deterministic =>
case Filter(condition, child) =>
val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
val substitutedCondition = substitute(aliases)(condition)
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)

// Deterministic parts of filter condition placed before non-deterministic predicates could
// be pushed down safely.
val (pushDown, rest) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after think about it more, I think it's not safe to do so. collectProjectsAndFilters should return all deterministic projects and filters upon a scan node. And the returned filter conditions are not only used for filter pushdown, but also treated as the whole filters upon this scan node. So the rest conditions here won't get executed.

cc @liancheng to confirm this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks for your comment! But I did searched the codebase and found the returned filters only used for predicates pushdown or partition pruning, in both case it should be safe for us to drop the rest condition. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test about this? The logic in DataSourceStrategy shows that, when we get a scan node with the projects and filters upon it, we will rebuild the project and filter(with project lists and filter conditions merged) and wrap the scan node with it. So the filter condition that isn't returned by collectProjectsAndFilters won't get executed.

Copy link
Contributor

@liancheng liancheng Aug 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that silently dropping nondeterministic filters can be dangerous. Maybe we should just return all operators beneath the top-most nondeterministic filter as the bottom operator?

For example, say we have a plan tree like this:

Project a, b
 Filter a > 1
  Filter b < 3
   Filter RAND(42) > 0.5
    Filter c < 2
     TableScan t

We should return the following result:

(
  // Project list
  Seq(a, b),

  // Deterministic filters
  Seq(b < 3, a > 1),

  // The top-most nondeterministic filter with all operators beneath
  Filter(RAND(42) > 0.5,
    Filter(c < 2,
      TableScan(t)))
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cloud-fan for pointing that out, I realized my previous thoughts were wrong. I fully agree with @liancheng 's improvement idea. Will update related code as well as new testcases tomorrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after an offline discussion with @liancheng , we think it would be better to have a wrapper node for scan(table scan or file scan), and this wrapper node can also hold project list and filter conditions. Then in optimizer we can improve the ColumnPrunning and FilterPushdown rules to push down into this wrapper node. After this we don't need PhysicalOperator anymore and the planner can match on the wrapper node directly.

Copy link
Contributor Author

@jiangxb1987 jiangxb1987 Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Do you mean something like adding in basicLogicalOperators the following:

case class Scanner(
    projectionList: Seq[NamedExpression],
    filters: Seq[Expression],
    child: LogicalPlan)
  extends UnaryNode

And pass that to the planner instead of applying PhysicalOperation?

I'm willing to take this work. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Now that I could insert a Scanner operator over CatalogRelation in Optimizer, but I noticed a relation may also be something like l: LogicalRelation(relation: CatalogRelation, _, _), in this case, we couldn't analyze the class LogicalRelation because it's in package spark-sql while Optimizer is in spark-catalyst, thus we are not able to determine whether a Scanner should be added. I think we don't want to add Scanner over every BaseRelation.

splitConjunctivePredicates(condition).span(_.deterministic)

val substitutedFilters = pushDown.map(substitute(aliases)(_))

(fields, filters ++ substitutedFilters, other, aliases)

case BroadcastHint(child) =>
collectProjectsAndFilters(child)
Expand Down
Loading