From 99d1a0225e6361bcc97f30c5fafb35b15820d29e Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 25 Jul 2022 13:43:59 +0800 Subject: [PATCH 1/3] [SPARK-39858][SQL] Remove unnecessary AliasHelper for some rules --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 2 +- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7667b4fef715c..974c25b818e4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -739,7 +739,7 @@ class Analyzer(override val catalogManager: CatalogManager) } } - object ResolvePivot extends Rule[LogicalPlan] with AliasHelper { + object ResolvePivot extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(PIVOT), ruleId) { case p: Pivot if !p.childrenResolved || !p.aggregates.forall(_.resolved) @@ -2531,7 +2531,7 @@ class Analyzer(override val catalogManager: CatalogManager) * those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the * underlying aggregate operator and then projected away after the original operator. */ - object ResolveAggregateFunctions extends Rule[LogicalPlan] with AliasHelper { + object ResolveAggregateFunctions extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(AGGREGATE), ruleId) { // Resolve aggregate with having clause to Filter(..., Aggregate()). Note, to avoid wrongly diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 382909d6d6f71..ab0d8404a10cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} import org.apache.spark.sql.internal.SQLConf -trait OperationHelper extends AliasHelper with PredicateHelper { +trait OperationHelper extends PredicateHelper { import org.apache.spark.sql.catalyst.optimizer.CollapseProject.canCollapseExpressions type ReturnType = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index f1e0e6d80c561..01b0ae451b2a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, AliasHelper, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.ScanOperation @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType, StructType} import org.apache.spark.sql.util.SchemaUtils._ -object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper { +object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { import DataSourceV2Implicits._ def apply(plan: LogicalPlan): LogicalPlan = { From 5edc5b18be41dec7d242f1ebc2482ac3e1913385 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 25 Jul 2022 14:54:34 +0800 Subject: [PATCH 2/3] Update code --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala | 2 +- .../sql/catalyst/optimizer/MergeScalarSubqueries.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 6 +++--- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 4 ++-- .../expressions/ExtractPredicatesWithinOutputSetSuite.scala | 5 +---- .../optimizer/BinaryComparisonSimplificationSuite.scala | 2 +- .../sql/catalyst/optimizer/BooleanSimplificationSuite.scala | 2 +- .../catalyst/optimizer/EliminateSubqueryAliasesSuite.scala | 3 +-- .../catalyst/optimizer/PushFoldableIntoBranchesSuite.scala | 3 +-- .../optimizer/RemoveRedundantAliasAndProjectSuite.scala | 2 +- .../sql/catalyst/optimizer/SimplifyConditionalSuite.scala | 2 +- .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 2 +- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 +--- .../sql/execution/adaptive/LogicalQueryStageStrategy.scala | 3 +-- .../execution/datasources/PruneFileSourcePartitions.scala | 3 +-- .../spark/sql/execution/datasources/v2/PushDownUtils.scala | 4 ++-- .../dynamicpruning/PlanDynamicPruningFilters.scala | 4 ++-- .../spark/sql/execution/python/ExtractPythonUDFs.scala | 2 +- .../sql/execution/datasources/FileSourceStrategySuite.scala | 4 ++-- .../spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala | 3 +-- 22 files changed, 29 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 974c25b818e4f..c3c036d335443 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2358,7 +2358,7 @@ class Analyzer(override val catalogManager: CatalogManager) * * Note: CTEs are handled in CTESubstitution. */ - object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper { + object ResolveSubquery extends Rule[LogicalPlan] { /** * Resolve the correlated expressions in a subquery, as if the expressions live in the outer * plan. All resolved outer references are wrapped in an [[OuterReference]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index 659384a507746..471f0bd554105 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -401,7 +401,7 @@ case class Cost(card: BigInt, size: BigInt) { * * Filters (2) and (3) are not implemented. */ -object JoinReorderDPFilters extends PredicateHelper { +object JoinReorderDPFilters { /** * Builds join graph information to be used by the filtering strategies. * Currently, it builds the sets of star/non-star joins. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 44f3b653de75c..4369ad9f96a6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -100,7 +100,7 @@ import org.apache.spark.sql.types.DataType * : +- ReusedSubquery Subquery scalar-subquery#242, [id=#125] * +- *(1) Scan OneRowRelation[] */ -object MergeScalarSubqueries extends Rule[LogicalPlan] with PredicateHelper { +object MergeScalarSubqueries extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { plan match { // Subquery reuse needs to be enabled for this optimization. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 78fb8b5de8886..14d05750ef796 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -768,7 +768,7 @@ object LimitPushDown extends Rule[LogicalPlan] { * safe to pushdown Filters and Projections through it. Filter pushdown is handled by another * rule PushDownPredicates. Once we add UNION DISTINCT, we will not be able to pushdown Projections. */ -object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper { +object PushProjectionThroughUnion extends Rule[LogicalPlan] { /** * Maps Attributes from the left side to the corresponding Attribute on the right side. @@ -1617,7 +1617,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { * This rule improves performance of predicate pushdown for cascading joins such as: * Filter-Join-Join-Join. Most predicates can be pushed down in a single pass. */ -object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper { +object PushDownPredicates extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsAnyPattern(FILTER, JOIN)) { CombineFilters.applyLocally diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 3fc23c31ac74d..28ecd31937e91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -109,7 +109,7 @@ object ConstantFolding extends Rule[LogicalPlan] { * - Using this mapping, replace occurrence of the attributes with the corresponding constant values * in the AND node. */ -object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { +object ConstantPropagation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAllPatterns(LITERAL, FILTER), ruleId) { case f: Filter => @@ -532,7 +532,7 @@ object SimplifyBinaryComparison /** * Simplifies conditional expressions (if / case). */ -object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { +object SimplifyConditionals extends Rule[LogicalPlan] { private def falseOrNullLiteral(e: Expression): Boolean = e match { case FalseLiteral => true case Literal(null, _) => true @@ -617,7 +617,7 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { /** * Push the foldable expression into (if / case) branches. */ -object PushFoldableIntoBranches extends Rule[LogicalPlan] with PredicateHelper { +object PushFoldableIntoBranches extends Rule[LogicalPlan] { // To be conservative here: it's only a guaranteed win if all but at most only one branch // end up being not foldable. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index ab0d8404a10cc..4e12b811acd1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -119,7 +119,7 @@ trait OperationHelper extends PredicateHelper { * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if * necessary. */ -object PhysicalOperation extends OperationHelper with PredicateHelper { +object PhysicalOperation extends OperationHelper { override protected def legacyMode: Boolean = true } @@ -128,7 +128,7 @@ object PhysicalOperation extends OperationHelper with PredicateHelper { * operations even if they are non-deterministic, as long as they satisfy the * requirement of CollapseProject and CombineFilters. */ -object ScanOperation extends OperationHelper with PredicateHelper { +object ScanOperation extends OperationHelper { override protected def legacyMode: Boolean = false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExtractPredicatesWithinOutputSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExtractPredicatesWithinOutputSetSuite.scala index ed141ef923e0a..10f9a88c429c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExtractPredicatesWithinOutputSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExtractPredicatesWithinOutputSetSuite.scala @@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.types.BooleanType -class ExtractPredicatesWithinOutputSetSuite - extends SparkFunSuite - with PredicateHelper - with PlanTest { +class ExtractPredicatesWithinOutputSetSuite extends SparkFunSuite with PlanTest { private val a = AttributeReference("A", BooleanType)(exprId = ExprId(1)) private val b = AttributeReference("B", BooleanType)(exprId = ExprId(2)) private val c = AttributeReference("C", BooleanType)(exprId = ExprId(3)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala index 92e78a1eebe3c..0e4cd99af6953 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper { +class BinaryComparisonSimplificationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 32fc2ac103301..fc2697d55f6d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.BooleanType -class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper with PredicateHelper { +class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper { object Optimize extends RuleExecutor[LogicalPlan] { val batches = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala index e876c1b8ccac1..97ef6b4a6e07e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala @@ -20,14 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { +class EliminateSubqueryAliasesSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushFoldableIntoBranchesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushFoldableIntoBranchesSuite.scala index e4753747fad6a..bdeb192fc1218 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushFoldableIntoBranchesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushFoldableIntoBranchesSuite.scala @@ -32,8 +32,7 @@ import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, Timesta import org.apache.spark.unsafe.types.CalendarInterval -class PushFoldableIntoBranchesSuite - extends PlanTest with ExpressionEvalHelper with PredicateHelper { +class PushFoldableIntoBranchesSuite extends PlanTest with ExpressionEvalHelper { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("PushFoldableIntoBranches", FixedPoint(50), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 90c14a0fa84c7..3df4687c19cf5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.MetadataBuilder -class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper { +class RemoveRedundantAliasAndProjectSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala index b7628eef4e263..98ca727573358 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.{BooleanType, IntegerType} -class SimplifyConditionalSuite extends PlanTest with ExpressionEvalHelper with PredicateHelper { +class SimplifyConditionalSuite extends PlanTest with ExpressionEvalHelper { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("SimplifyConditionals", FixedPoint(50), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index d95e86bba0528..00b1ec749d762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -160,7 +160,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic * A pattern that finds the partitioned table relation node inside the given plan, and returns a * pair of the partition attributes and the table relation node. */ - object PartitionedRelation extends PredicateHelper { + object PartitionedRelation { def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = { plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e6ec97b3491be..6104104c7bea4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -168,9 +168,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Supports both equi-joins and non-equi-joins. * Supports only inner like joins. */ - object JoinSelection extends Strategy - with PredicateHelper - with JoinSelectionHelper { + object JoinSelection extends Strategy with JoinSelectionHelper { private val hintErrorHandler = conf.hintErrorHandler private def checkHintBuildSide( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala index 18924d09999ac..e424af5343fcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.Strategy -import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractSingleColumnNullAwareAntiJoin} import org.apache.spark.sql.catalyst.plans.LeftAnti @@ -35,7 +34,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes * stage in case of the larger join child relation finishes before the smaller relation. Note * that this rule needs to be applied before regular join strategies. */ -object LogicalQueryStageStrategy extends Strategy with PredicateHelper { +object LogicalQueryStageStrategy extends Strategy { private def isBroadcastStage(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, _: BroadcastQueryStageExec) => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index be70e18d220e5..1dffea4e1bc87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -32,8 +32,7 @@ import org.apache.spark.sql.catalyst.rules.Rule * statistics will be updated. And the partition filters will be kept in the filters of returned * logical plan. */ -private[sql] object PruneFileSourcePartitions - extends Rule[LogicalPlan] with PredicateHelper { +private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { private def rebuildPhysicalOperation( projects: Seq[NamedExpression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 5fb16aa7323ae..53a34a8f5f153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper, SchemaPruning} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, NamedExpression, SchemaPruning} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.expressions.SortOrder import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType -object PushDownUtils extends PredicateHelper { +object PushDownUtils { /** * Pushes down filters to the data source reader * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 252565fd9077b..e4127759627af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.dynamicpruning import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.joins._ * the fallback mechanism with subquery duplicate. */ case class PlanDynamicPruningFilters(sparkSession: SparkSession) - extends Rule[SparkPlan] with PredicateHelper { + extends Rule[SparkPlan] { /** * Identify the shape in which keys of a given plan are broadcasted. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index de47561926e55..568fc0ae56d55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -157,7 +157,7 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] { * This has the limitation that the input to the Python UDF is not allowed include attributes from * multiple child operators. */ -object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { +object ExtractPythonUDFs extends Rule[LogicalPlan] { private type EvalType = Int private type EvalTypeChecker = EvalType => Boolean diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 49429401d51cc..4dbe619610ef7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet} import org.apache.spark.sql.catalyst.util import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation @@ -40,7 +40,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} import org.apache.spark.util.Utils -class FileSourceStrategySuite extends QueryTest with SharedSparkSession with PredicateHelper { +class FileSourceStrategySuite extends QueryTest with SharedSparkSession { import testImplicits._ protected override def sparkConf = super.sparkConf.set("spark.default.parallelism", "1") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 2ec593b95c9b6..006ef14ed73da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.sources import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.CatalogUtils -import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.types._ -class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper { +class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName // We have a very limited number of supported types at here since it is just for a From c53ef0d1c21e09b630a03283901118df2cc5b7d2 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 25 Jul 2022 14:59:35 +0800 Subject: [PATCH 3/3] Update code --- .../execution/dynamicpruning/PlanDynamicPruningFilters.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index e4127759627af..c9ff28eb0459f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -34,8 +34,7 @@ import org.apache.spark.sql.execution.joins._ * results of broadcast. For joins that are not planned as broadcast hash joins we keep * the fallback mechanism with subquery duplicate. */ -case class PlanDynamicPruningFilters(sparkSession: SparkSession) - extends Rule[SparkPlan] { +case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[SparkPlan] { /** * Identify the shape in which keys of a given plan are broadcasted.