diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 196971a22a44..57fc1bd99be2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1211,6 +1211,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val REMOVE_REDUNDANT_PROJECTS_ENABLED = buildConf("spark.sql.execution.removeRedundantProjects") + .internal() + .doc("Whether to remove redundant project exec node based on children's output and " + + "ordering requirement.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val STATE_STORE_PROVIDER_CLASS = buildConf("spark.sql.streaming.stateStore.providerClass") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index eecf16d43156..ace2a11ddaa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -342,6 +342,7 @@ object QueryExecution { CoalesceBucketsInJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), + RemoveRedundantProjects(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala new file mode 100644 index 000000000000..ecb4ad0f6e8d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, PartialMerge} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase +import org.apache.spark.sql.execution.window.WindowExec +import org.apache.spark.sql.internal.SQLConf + +/** + * Remove redundant ProjectExec node from the spark plan. A ProjectExec node is redundant when + * - It has the same output attributes and orders as its child's output and the ordering of + * the attributes is required. + * - It has the same output attributes as its child's output when attribute output ordering + * is not required. + * This rule needs to be a physical rule because project nodes are useful during logical + * optimization to prune data. During physical planning, redundant project nodes can be removed + * to simplify the query plan. + */ +case class RemoveRedundantProjects(conf: SQLConf) extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED)) { + plan + } else { + removeProject(plan, true) + } + } + + private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = { + plan match { + case p @ ProjectExec(_, child) => + if (isRedundant(p, child, requireOrdering)) { + val newPlan = removeProject(child, requireOrdering) + newPlan.setLogicalLink(child.logicalLink.get) + newPlan + } else { + p.mapChildren(removeProject(_, false)) + } + case op: TakeOrderedAndProjectExec => + op.mapChildren(removeProject(_, false)) + case a: BaseAggregateExec => + // BaseAggregateExec require specific column ordering when mode is Final or PartialMerge. + // See comments in BaseAggregateExec inputAttributes method. + val keepOrdering = a.aggregateExpressions + .exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge)) + a.mapChildren(removeProject(_, keepOrdering)) + case g: GenerateExec => g.mapChildren(removeProject(_, false)) + // JoinExec ordering requirement will inherit from its parent. If there is no ProjectExec in + // its ancestors, JoinExec should require output columns to be ordered. + case o => o.mapChildren(removeProject(_, requireOrdering)) + } + } + + /** + * Check if the nullability change is positive. It catches the case when the project output + * attribute is not nullable, but the child output attribute is nullable. + */ + private def checkNullability(output: Seq[Attribute], childOutput: Seq[Attribute]): Boolean = + output.zip(childOutput).forall { case (attr1, attr2) => attr1.nullable || !attr2.nullable } + + private def isRedundant( + project: ProjectExec, + child: SparkPlan, + requireOrdering: Boolean): Boolean = { + child match { + // If a DataSourceV2ScanExec node does not support columnar, a ProjectExec node is required + // to convert the rows to UnsafeRow. See DataSourceV2Strategy for more details. + case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false + case _ => + if (requireOrdering) { + project.output.map(_.exprId.id) == child.output.map(_.exprId.id) && + checkNullability(project.output, child.output) + } else { + val orderedProjectOutput = project.output.sortBy(_.exprId.id) + val orderedChildOutput = child.output.sortBy(_.exprId.id) + orderedProjectOutput.map(_.exprId.id) == orderedChildOutput.map(_.exprId.id) && + checkNullability(orderedProjectOutput, orderedChildOutput) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 43226feee21d..fb922b05b588 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -83,12 +83,14 @@ case class AdaptiveSparkPlanExec( ) } + @transient private val removeRedundantProjects = RemoveRedundantProjects(conf) @transient private val ensureRequirements = EnsureRequirements(conf) // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( + removeRedundantProjects, ensureRequirements ) ++ context.session.sessionState.queryStagePrepRules diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 098576d72f54..6973f55e8dca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.dynamicpruning import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode @@ -40,8 +40,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) /** * Identify the shape in which keys of a given plan are broadcasted. */ - private def broadcastMode(keys: Seq[Expression], plan: LogicalPlan): BroadcastMode = { - val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), plan.output) + private def broadcastMode(keys: Seq[Expression], output: AttributeSeq): BroadcastMode = { + val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), output) HashedRelationBroadcastMode(packedKeys) } @@ -67,8 +67,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) }.isDefined if (canReuseExchange) { - val mode = broadcastMode(buildKeys, buildPlan) val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan) + val mode = broadcastMode(buildKeys, executedPlan.output) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) val name = s"dynamicpruning#${exprId.id}" diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 388e916ec330..6688c39a6687 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -53,25 +53,23 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (9) +AdaptiveSparkPlan (8) +- == Current Plan == - Sort (8) - +- Exchange (7) - +- HashAggregate (6) - +- Exchange (5) - +- HashAggregate (4) - +- Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Sort (7) + +- Exchange (6) + +- HashAggregate (5) + +- Exchange (4) + +- HashAggregate (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) +- == Initial Plan == - Sort (8) - +- Exchange (7) - +- HashAggregate (6) - +- Exchange (5) - +- HashAggregate (4) - +- Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Sort (7) + +- Exchange (6) + +- HashAggregate (5) + +- Exchange (4) + +- HashAggregate (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -85,37 +83,33 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) HashAggregate +(3) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(5) Exchange +(4) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(6) HashAggregate +(5) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(7) Exchange +(6) Exchange Input [2]: [key#x, max(val)#x] Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] -(8) Sort +(7) Sort Input [2]: [key#x, max(val)#x] Arguments: [key#x ASC NULLS FIRST], true, 0 -(9) AdaptiveSparkPlan +(8) AdaptiveSparkPlan Output [2]: [key#x, max(val)#x] Arguments: isFinalPlan=false @@ -131,25 +125,23 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (9) +AdaptiveSparkPlan (8) +- == Current Plan == - Project (8) - +- Filter (7) - +- HashAggregate (6) - +- Exchange (5) - +- HashAggregate (4) - +- Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Project (7) + +- Filter (6) + +- HashAggregate (5) + +- Exchange (4) + +- HashAggregate (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) +- == Initial Plan == - Project (8) - +- Filter (7) - +- HashAggregate (6) - +- Exchange (5) - +- HashAggregate (4) - +- Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Project (7) + +- Filter (6) + +- HashAggregate (5) + +- Exchange (4) + +- HashAggregate (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -163,37 +155,33 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) HashAggregate +(3) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(5) Exchange +(4) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(6) HashAggregate +(5) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] -(7) Filter +(6) Filter Input [3]: [key#x, max(val)#x, max(val#x)#x] Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) -(8) Project +(7) Project Output [2]: [key#x, max(val)#x] Input [3]: [key#x, max(val)#x, max(val#x)#x] -(9) AdaptiveSparkPlan +(8) AdaptiveSparkPlan Output [2]: [key#x, max(val)#x] Arguments: isFinalPlan=false @@ -207,29 +195,25 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (11) +AdaptiveSparkPlan (9) +- == Current Plan == - HashAggregate (10) - +- Exchange (9) - +- HashAggregate (8) - +- Union (7) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp1 (4) + HashAggregate (8) + +- Exchange (7) + +- HashAggregate (6) + +- Union (5) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- Filter (4) + +- Scan parquet default.explain_temp1 (3) +- == Initial Plan == - HashAggregate (10) - +- Exchange (9) - +- HashAggregate (8) - +- Union (7) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp1 (4) + HashAggregate (8) + +- Exchange (7) + +- HashAggregate (6) + +- Union (5) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- Filter (4) + +- Scan parquet default.explain_temp1 (3) (1) Scan parquet default.explain_temp1 @@ -243,46 +227,38 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) Scan parquet default.explain_temp1 +(3) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct -(5) Filter +(4) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(6) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] +(5) Union -(7) Union - -(8) HashAggregate +(6) HashAggregate Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] Results [2]: [key#x, val#x] -(9) Exchange +(7) Exchange Input [2]: [key#x, val#x] Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] -(10) HashAggregate +(8) HashAggregate Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] Results [2]: [key#x, val#x] -(11) AdaptiveSparkPlan +(9) AdaptiveSparkPlan Output [2]: [key#x, val#x] Arguments: isFinalPlan=false @@ -297,25 +273,21 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (9) +AdaptiveSparkPlan (7) +- == Current Plan == - BroadcastHashJoin Inner BuildRight (8) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (7) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp2 (4) + BroadcastHashJoin Inner BuildRight (6) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Filter (4) + +- Scan parquet default.explain_temp2 (3) +- == Initial Plan == - BroadcastHashJoin Inner BuildRight (8) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (7) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp2 (4) + BroadcastHashJoin Inner BuildRight (6) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Filter (4) + +- Scan parquet default.explain_temp2 (3) (1) Scan parquet default.explain_temp1 @@ -329,35 +301,27 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) Scan parquet default.explain_temp2 +(3) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct -(5) Filter +(4) Filter Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(6) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(7) BroadcastExchange +(5) BroadcastExchange Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(8) BroadcastHashJoin +(6) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None -(9) AdaptiveSparkPlan +(7) AdaptiveSparkPlan Output [4]: [key#x, val#x, key#x, val#x] Arguments: isFinalPlan=false @@ -372,21 +336,19 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (7) +AdaptiveSparkPlan (6) +- == Current Plan == - BroadcastHashJoin LeftOuter BuildRight (6) + BroadcastHashJoin LeftOuter BuildRight (5) :- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (5) - +- Project (4) - +- Filter (3) - +- Scan parquet default.explain_temp2 (2) + +- BroadcastExchange (4) + +- Filter (3) + +- Scan parquet default.explain_temp2 (2) +- == Initial Plan == - BroadcastHashJoin LeftOuter BuildRight (6) + BroadcastHashJoin LeftOuter BuildRight (5) :- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (5) - +- Project (4) - +- Filter (3) - +- Scan parquet default.explain_temp2 (2) + +- BroadcastExchange (4) + +- Filter (3) + +- Scan parquet default.explain_temp2 (2) (1) Scan parquet default.explain_temp1 @@ -406,20 +368,16 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(4) Project -Output [2]: [key#x, val#x] +(4) BroadcastExchange Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(5) BroadcastExchange -Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] - -(6) BroadcastHashJoin +(5) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None -(7) AdaptiveSparkPlan +(6) AdaptiveSparkPlan Output [4]: [key#x, val#x, key#x, val#x] Arguments: isFinalPlan=false @@ -439,15 +397,13 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (4) +AdaptiveSparkPlan (3) +- == Current Plan == - Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Filter (2) + +- Scan parquet default.explain_temp1 (1) +- == Initial Plan == - Project (3) - +- Filter (2) - +- Scan parquet default.explain_temp1 (1) + Filter (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -461,11 +417,7 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x > 3)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) AdaptiveSparkPlan +(3) AdaptiveSparkPlan Output [2]: [key#x, val#x] Arguments: isFinalPlan=false @@ -553,25 +505,21 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (9) +AdaptiveSparkPlan (7) +- == Current Plan == - BroadcastHashJoin Inner BuildRight (8) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (7) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp1 (4) + BroadcastHashJoin Inner BuildRight (6) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Filter (4) + +- Scan parquet default.explain_temp1 (3) +- == Initial Plan == - BroadcastHashJoin Inner BuildRight (8) - :- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (7) - +- Project (6) - +- Filter (5) - +- Scan parquet default.explain_temp1 (4) + BroadcastHashJoin Inner BuildRight (6) + :- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Filter (4) + +- Scan parquet default.explain_temp1 (3) (1) Scan parquet default.explain_temp1 @@ -585,35 +533,27 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) Scan parquet default.explain_temp1 +(3) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct -(5) Filter +(4) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(6) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(7) BroadcastExchange +(5) BroadcastExchange Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(8) BroadcastHashJoin +(6) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None -(9) AdaptiveSparkPlan +(7) AdaptiveSparkPlan Output [4]: [key#x, val#x, key#x, val#x] Arguments: isFinalPlan=false @@ -631,37 +571,33 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -AdaptiveSparkPlan (15) +AdaptiveSparkPlan (13) +- == Current Plan == - BroadcastHashJoin Inner BuildRight (14) - :- HashAggregate (6) - : +- Exchange (5) - : +- HashAggregate (4) - : +- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (13) - +- HashAggregate (12) - +- Exchange (11) - +- HashAggregate (10) - +- Project (9) - +- Filter (8) - +- Scan parquet default.explain_temp1 (7) + BroadcastHashJoin Inner BuildRight (12) + :- HashAggregate (5) + : +- Exchange (4) + : +- HashAggregate (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (11) + +- HashAggregate (10) + +- Exchange (9) + +- HashAggregate (8) + +- Filter (7) + +- Scan parquet default.explain_temp1 (6) +- == Initial Plan == - BroadcastHashJoin Inner BuildRight (14) - :- HashAggregate (6) - : +- Exchange (5) - : +- HashAggregate (4) - : +- Project (3) - : +- Filter (2) - : +- Scan parquet default.explain_temp1 (1) - +- BroadcastExchange (13) - +- HashAggregate (12) - +- Exchange (11) - +- HashAggregate (10) - +- Project (9) - +- Filter (8) - +- Scan parquet default.explain_temp1 (7) + BroadcastHashJoin Inner BuildRight (12) + :- HashAggregate (5) + : +- Exchange (4) + : +- HashAggregate (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (11) + +- HashAggregate (10) + +- Exchange (9) + +- HashAggregate (8) + +- Filter (7) + +- Scan parquet default.explain_temp1 (6) (1) Scan parquet default.explain_temp1 @@ -675,71 +611,63 @@ ReadSchema: struct Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(3) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(4) HashAggregate +(3) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(5) Exchange +(4) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(6) HashAggregate +(5) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(7) Scan parquet default.explain_temp1 +(6) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct -(8) Filter +(7) Filter Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(9) Project -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(10) HashAggregate +(8) HashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(11) Exchange +(9) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(12) HashAggregate +(10) HashAggregate Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(13) BroadcastExchange +(11) BroadcastExchange Input [2]: [key#x, max(val)#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] -(14) BroadcastHashJoin +(12) BroadcastHashJoin Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None -(15) AdaptiveSparkPlan +(13) AdaptiveSparkPlan Output [4]: [key#x, max(val)#x, key#x, max(val)#x] Arguments: isFinalPlan=false diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 614cb2a137d0..3b98bd3a232d 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -53,15 +53,14 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* Sort (9) -+- Exchange (8) - +- * HashAggregate (7) - +- Exchange (6) - +- * HashAggregate (5) - +- * Project (4) - +- * Filter (3) - +- * ColumnarToRow (2) - +- Scan parquet default.explain_temp1 (1) +* Sort (8) ++- Exchange (7) + +- * HashAggregate (6) + +- Exchange (5) + +- * HashAggregate (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -78,33 +77,29 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) HashAggregate [codegen id : 1] +(4) HashAggregate [codegen id : 1] Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 2] +(6) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(8) Exchange +(7) Exchange Input [2]: [key#x, max(val)#x] Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] -(9) Sort [codegen id : 3] +(8) Sort [codegen id : 3] Input [2]: [key#x, max(val)#x] Arguments: [key#x ASC NULLS FIRST], true, 0 @@ -120,15 +115,14 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* Project (9) -+- * Filter (8) - +- * HashAggregate (7) - +- Exchange (6) - +- * HashAggregate (5) - +- * Project (4) - +- * Filter (3) - +- * ColumnarToRow (2) - +- Scan parquet default.explain_temp1 (1) +* Project (8) ++- * Filter (7) + +- * HashAggregate (6) + +- Exchange (5) + +- * HashAggregate (4) + +- * Filter (3) + +- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -145,33 +139,29 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) HashAggregate [codegen id : 1] +(4) HashAggregate [codegen id : 1] Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 2] +(6) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] -(8) Filter [codegen id : 2] +(7) Filter [codegen id : 2] Input [3]: [key#x, max(val)#x, max(val#x)#x] Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) -(9) Project [codegen id : 2] +(8) Project [codegen id : 2] Output [2]: [key#x, max(val)#x] Input [3]: [key#x, max(val)#x, max(val#x)#x] @@ -185,18 +175,16 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* HashAggregate (12) -+- Exchange (11) - +- * HashAggregate (10) - +- Union (9) - :- * Project (4) - : +- * Filter (3) - : +- * ColumnarToRow (2) - : +- Scan parquet default.explain_temp1 (1) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp1 (5) +* HashAggregate (10) ++- Exchange (9) + +- * HashAggregate (8) + +- Union (7) + :- * Filter (3) + : +- * ColumnarToRow (2) + : +- Scan parquet default.explain_temp1 (1) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp1 (4) (1) Scan parquet default.explain_temp1 @@ -213,42 +201,34 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) Scan parquet default.explain_temp1 +(4) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct -(6) ColumnarToRow [codegen id : 2] +(5) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] -(7) Filter [codegen id : 2] +(6) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) -(8) Project [codegen id : 2] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(9) Union +(7) Union -(10) HashAggregate [codegen id : 3] +(8) HashAggregate [codegen id : 3] Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] Results [2]: [key#x, val#x] -(11) Exchange +(9) Exchange Input [2]: [key#x, val#x] Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] -(12) HashAggregate [codegen id : 4] +(10) HashAggregate [codegen id : 4] Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] Functions: [] @@ -266,16 +246,14 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (10) -:- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp2 (5) +* BroadcastHashJoin Inner BuildRight (8) +:- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (7) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp2 (4) (1) Scan parquet default.explain_temp1 @@ -292,33 +270,25 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(4) Project [codegen id : 2] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) Scan parquet default.explain_temp2 +(4) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct -(6) ColumnarToRow [codegen id : 1] +(5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -(7) Filter [codegen id : 1] +(6) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(8) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(9) BroadcastExchange +(7) BroadcastExchange Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(10) BroadcastHashJoin [codegen id : 2] +(8) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None @@ -334,14 +304,13 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* BroadcastHashJoin LeftOuter BuildRight (8) +* BroadcastHashJoin LeftOuter BuildRight (7) :- * ColumnarToRow (2) : +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (7) - +- * Project (6) - +- * Filter (5) - +- * ColumnarToRow (4) - +- Scan parquet default.explain_temp2 (3) ++- BroadcastExchange (6) + +- * Filter (5) + +- * ColumnarToRow (4) + +- Scan parquet default.explain_temp2 (3) (1) Scan parquet default.explain_temp1 @@ -367,15 +336,11 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) -(6) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(7) BroadcastExchange +(6) BroadcastExchange Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(8) BroadcastHashJoin [codegen id : 2] +(7) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None @@ -396,10 +361,9 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* Project (4) -+- * Filter (3) - +- * ColumnarToRow (2) - +- Scan parquet default.explain_temp1 (1) +* Filter (3) ++- * ColumnarToRow (2) + +- Scan parquet default.explain_temp1 (1) (1) Scan parquet default.explain_temp1 @@ -416,98 +380,94 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - ===== Subqueries ===== Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (11) -+- Exchange (10) - +- * HashAggregate (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp2 (5) +* HashAggregate (10) ++- Exchange (9) + +- * HashAggregate (8) + +- * Project (7) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp2 (4) -(5) Scan parquet default.explain_temp2 +(4) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] ReadSchema: struct -(6) ColumnarToRow [codegen id : 1] +(5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -(7) Filter [codegen id : 1] +(6) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) -(8) Project [codegen id : 1] +(7) Project [codegen id : 1] Output [1]: [key#x] Input [2]: [key#x, val#x] -(9) HashAggregate [codegen id : 1] +(8) HashAggregate [codegen id : 1] Input [1]: [key#x] Keys: [] Functions [1]: [partial_max(key#x)] Aggregate Attributes [1]: [max#x] Results [1]: [max#x] -(10) Exchange +(9) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] -(11) HashAggregate [codegen id : 2] +(10) HashAggregate [codegen id : 2] Input [1]: [max#x] Keys: [] Functions [1]: [max(key#x)] Aggregate Attributes [1]: [max(key#x)#x] Results [1]: [max(key#x)#x AS max(key)#x] -Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] -* HashAggregate (18) -+- Exchange (17) - +- * HashAggregate (16) - +- * Project (15) - +- * Filter (14) - +- * ColumnarToRow (13) - +- Scan parquet default.explain_temp3 (12) +Subquery:2 Hosting operator id = 6 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +* HashAggregate (17) ++- Exchange (16) + +- * HashAggregate (15) + +- * Project (14) + +- * Filter (13) + +- * ColumnarToRow (12) + +- Scan parquet default.explain_temp3 (11) -(12) Scan parquet default.explain_temp3 +(11) Scan parquet default.explain_temp3 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct -(13) ColumnarToRow [codegen id : 1] +(12) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -(14) Filter [codegen id : 1] +(13) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) -(15) Project [codegen id : 1] +(14) Project [codegen id : 1] Output [1]: [key#x] Input [2]: [key#x, val#x] -(16) HashAggregate [codegen id : 1] +(15) HashAggregate [codegen id : 1] Input [1]: [key#x] Keys: [] Functions [1]: [partial_max(key#x)] Aggregate Attributes [1]: [max#x] Results [1]: [max#x] -(17) Exchange +(16) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] -(18) HashAggregate [codegen id : 2] +(17) HashAggregate [codegen id : 2] Input [1]: [max#x] Keys: [] Functions [1]: [max(key#x)] @@ -721,16 +681,14 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (10) -:- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (9) - +- * Project (8) - +- * Filter (7) - +- * ColumnarToRow (6) - +- Scan parquet default.explain_temp1 (5) +* BroadcastHashJoin Inner BuildRight (8) +:- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (7) + +- * Filter (6) + +- * ColumnarToRow (5) + +- Scan parquet default.explain_temp1 (4) (1) Scan parquet default.explain_temp1 @@ -747,33 +705,25 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(4) Project [codegen id : 2] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) Scan parquet default.explain_temp1 +(4) Scan parquet default.explain_temp1 Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct -(6) ColumnarToRow [codegen id : 1] +(5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -(7) Filter [codegen id : 1] +(6) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(8) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(9) BroadcastExchange +(7) BroadcastExchange Input [2]: [key#x, val#x] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#x] -(10) BroadcastHashJoin [codegen id : 2] +(8) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None @@ -792,17 +742,16 @@ EXPLAIN FORMATTED struct -- !query output == Physical Plan == -* BroadcastHashJoin Inner BuildRight (11) -:- * HashAggregate (7) -: +- Exchange (6) -: +- * HashAggregate (5) -: +- * Project (4) -: +- * Filter (3) -: +- * ColumnarToRow (2) -: +- Scan parquet default.explain_temp1 (1) -+- BroadcastExchange (10) - +- * HashAggregate (9) - +- ReusedExchange (8) +* BroadcastHashJoin Inner BuildRight (10) +:- * HashAggregate (6) +: +- Exchange (5) +: +- * HashAggregate (4) +: +- * Filter (3) +: +- * ColumnarToRow (2) +: +- Scan parquet default.explain_temp1 (1) ++- BroadcastExchange (9) + +- * HashAggregate (8) + +- ReusedExchange (7) (1) Scan parquet default.explain_temp1 @@ -819,43 +768,39 @@ Input [2]: [key#x, val#x] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) -(4) Project [codegen id : 1] -Output [2]: [key#x, val#x] -Input [2]: [key#x, val#x] - -(5) HashAggregate [codegen id : 1] +(4) HashAggregate [codegen id : 1] Input [2]: [key#x, val#x] Keys [1]: [key#x] Functions [1]: [partial_max(val#x)] Aggregate Attributes [1]: [max#x] Results [2]: [key#x, max#x] -(6) Exchange +(5) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] -(7) HashAggregate [codegen id : 4] +(6) HashAggregate [codegen id : 4] Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(8) ReusedExchange [Reuses operator id: 6] +(7) ReusedExchange [Reuses operator id: 5] Output [2]: [key#x, max#x] -(9) HashAggregate [codegen id : 3] +(8) HashAggregate [codegen id : 3] Input [2]: [key#x, max#x] Keys [1]: [key#x] Functions [1]: [max(val#x)] Aggregate Attributes [1]: [max(val#x)#x] Results [2]: [key#x, max(val#x)#x AS max(val)#x] -(10) BroadcastExchange +(9) BroadcastExchange Input [2]: [key#x, max(val)#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#x] -(11) BroadcastHashJoin [codegen id : 4] +(10) BroadcastHashJoin [codegen id : 4] Left keys [1]: [key#x] Right keys [1]: [key#x] Join condition: None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala new file mode 100644 index 000000000000..bc24436c5806 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +class RemoveRedundantProjectsSuite extends QueryTest with SharedSparkSession with SQLTestUtils { + + private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = { + withClue(df.queryExecution) { + val plan = df.queryExecution.executedPlan + val actual = plan.collectWithSubqueries { case p: ProjectExec => p }.size + assert(actual == expected) + } + } + + private def assertProjectExec(query: String, enabled: Int, disabled: Int): Unit = { + val df = sql(query) + assertProjectExecCount(df, enabled) + val result = df.collect() + withSQLConf(SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "false") { + val df2 = sql(query) + assertProjectExecCount(df2, disabled) + checkAnswer(df2, result) + } + } + + private val tmpPath = Utils.createTempDir() + + override def beforeAll(): Unit = { + super.beforeAll() + tmpPath.delete() + val path = tmpPath.getAbsolutePath + spark.range(100).selectExpr("id % 10 as key", "cast(id * 2 as int) as a", + "cast(id * 3 as int) as b", "cast(id as string) as c", "array(id, id + 1, id + 3) as d") + .write.partitionBy("key").parquet(path) + spark.read.parquet(path).createOrReplaceTempView("testView") + } + + override def afterAll(): Unit = { + Utils.deleteRecursively(tmpPath) + super.afterAll() + } + + test("project") { + val query = "select * from testView" + assertProjectExec(query, 0, 0) + } + + test("project with filter") { + val query = "select * from testView where a > 5" + assertProjectExec(query, 0, 1) + } + + test("project with specific column ordering") { + val query = "select key, a, b, c from testView" + assertProjectExec(query, 1, 1) + } + + test("project with extra columns") { + val query = "select a, b, c, key, a from testView" + assertProjectExec(query, 1, 1) + } + + test("project with fewer columns") { + val query = "select a from testView where a > 3" + assertProjectExec(query, 1, 1) + } + + test("aggregate without ordering requirement") { + val query = "select sum(a) as sum_a, key, last(b) as last_b " + + "from (select key, a, b from testView where a > 100) group by key" + assertProjectExec(query, 0, 1) + } + + test("aggregate with ordering requirement") { + val query = "select a, sum(b) as sum_b from testView group by a" + assertProjectExec(query, 1, 1) + } + + test("join without ordering requirement") { + val query = "select t1.key, t2.key, t1.a, t2.b from (select key, a, b, c from testView)" + + " as t1 join (select key, a, b, c from testView) as t2 on t1.c > t2.c and t1.key > 10" + assertProjectExec(query, 1, 3) + } + + test("join with ordering requirement") { + val query = "select * from (select key, a, c, b from testView) as t1 join " + + "(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50" + assertProjectExec(query, 2, 2) + } + + test("window function") { + val query = "select key, b, avg(a) over (partition by key order by a " + + "rows between 1 preceding and 1 following) as avg from testView" + assertProjectExec(query, 1, 2) + } + + test("generate") { + val query = "select a, key, explode(d) from testView where a > 10" + assertProjectExec(query, 0, 1) + } + + test("subquery") { + withTempView("testData") { + val data = spark.sparkContext.parallelize((1 to 100).map(i => Row(i, i.toString))) + val schema = new StructType().add("key", "int").add("value", "string") + spark.createDataFrame(data, schema).createOrReplaceTempView("testData") + val query = "select key, value from testData where key in " + + "(select sum(a) from testView where a > 5 group by key)" + assertProjectExec(query, 0, 1) + } + } +}