Skip to content

Commit 4ebabc7

Browse files
committed
a new fix
1 parent 03633e3 commit 4ebabc7

File tree

3 files changed

+21
-22
lines changed

3 files changed

+21
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2323
import org.apache.spark.sql.catalyst.plans._
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525

26-
/**
27-
* A pattern that matches any number of project if fields is deterministic
28-
* or child is LeafNode of project on top of another relational operator.
29-
*/
30-
object ProjectOperation extends PredicateHelper {
31-
type ReturnType = (Seq[NamedExpression], LogicalPlan)
32-
33-
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
34-
case Project(fields, child) if fields.forall(_.deterministic) =>
35-
Some((fields, child))
36-
37-
case p @ Project(fields, child: LeafNode) if p.references.nonEmpty =>
38-
Some((fields, child))
39-
40-
case _ => None
41-
}
42-
}
43-
4426
/**
4527
* A pattern that matches any number of project or filter operations on top of another relational
4628
* operator. All filter operators are collected and their conditions are broken up and returned
@@ -73,7 +55,7 @@ object PhysicalOperation extends PredicateHelper {
7355
private def collectProjectsAndFilters(plan: LogicalPlan):
7456
(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) =
7557
plan match {
76-
case ProjectOperation(fields, child) =>
58+
case Project(fields, child) if fields.forall(_.deterministic) =>
7759
val (_, filters, other, aliases) = collectProjectsAndFilters(child)
7860
val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
7961
(Some(substitutedFields), filters, other, collectAliases(substitutedFields))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
2222
import org.apache.spark.sql.catalyst.expressions
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
25-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
25+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
2626
import org.apache.spark.sql.execution.FileSourceScanExec
2727
import org.apache.spark.sql.execution.SparkPlan
2828

@@ -51,6 +51,15 @@ import org.apache.spark.sql.execution.SparkPlan
5151
*/
5252
object FileSourceStrategy extends Strategy with Logging {
5353
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
54+
case p @ Project(fields, child)
55+
if !fields.forall(_.deterministic) && p.references.nonEmpty =>
56+
collectFileSource(Project(child.output.filter(p.references.contains), child))
57+
.map(p => execution.ProjectExec(fields, p)).toList
58+
59+
case _ => collectFileSource(plan)
60+
}
61+
62+
private def collectFileSource(plan: LogicalPlan): Seq[SparkPlan] = plan match {
5463
case PhysicalOperation(projects, filters,
5564
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
5665
// Filters on this relation fall into four categories based on where we can use them to avoid

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import org.apache.spark.sql._
2626
import org.apache.spark.sql.catalyst.catalog._
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.planning._
29-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
30-
ScriptTransformation}
29+
import org.apache.spark.sql.catalyst.plans.logical._
3130
import org.apache.spark.sql.catalyst.rules.Rule
3231
import org.apache.spark.sql.execution._
3332
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
@@ -239,6 +238,15 @@ private[hive] trait HiveStrategies {
239238
*/
240239
object HiveTableScans extends Strategy {
241240
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
241+
case p @ Project(fields, child)
242+
if !fields.forall(_.deterministic) && p.references.nonEmpty =>
243+
collectHiveTableSource(Project(child.output.filter(p.references.contains), child))
244+
.map(p => ProjectExec(fields, p)).toList
245+
246+
case _ => collectHiveTableSource(plan)
247+
}
248+
249+
private def collectHiveTableSource(plan: LogicalPlan): Seq[SparkPlan] = plan match {
242250
case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
243251
// Filter out all predicates that only deal with partition keys, these are given to the
244252
// hive table scan operator to be used for partition pruning.

0 commit comments

Comments
 (0)