@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.dynamicpruning
1919
2020import org .apache .spark .sql .SparkSession
2121import org .apache .spark .sql .catalyst .expressions
22- import org .apache .spark .sql .catalyst .expressions .{Alias , BindReferences , DynamicPruningExpression , DynamicPruningSubquery , Expression , ListQuery , Literal , PredicateHelper }
22+ import org .apache .spark .sql .catalyst .expressions .{Alias , AttributeSeq , BindReferences , DynamicPruningExpression , DynamicPruningSubquery , Expression , ListQuery , Literal , PredicateHelper }
2323import org .apache .spark .sql .catalyst .optimizer .{BuildLeft , BuildRight }
2424import org .apache .spark .sql .catalyst .plans .logical .{Aggregate , LogicalPlan }
2525import org .apache .spark .sql .catalyst .plans .physical .BroadcastMode
@@ -40,8 +40,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
4040 /**
4141 * Identify the shape in which keys of a given plan are broadcasted.
4242 */
43- private def broadcastMode (keys : Seq [Expression ], plan : LogicalPlan ): BroadcastMode = {
44- val packedKeys = BindReferences .bindReferences(HashJoin .rewriteKeyExpr(keys), plan. output)
43+ private def broadcastMode (keys : Seq [Expression ], output : AttributeSeq ): BroadcastMode = {
44+ val packedKeys = BindReferences .bindReferences(HashJoin .rewriteKeyExpr(keys), output)
4545 HashedRelationBroadcastMode (packedKeys)
4646 }
4747
@@ -67,8 +67,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
6767 }.isDefined
6868
6969 if (canReuseExchange) {
70- val mode = broadcastMode(buildKeys, buildPlan)
7170 val executedPlan = QueryExecution .prepareExecutedPlan(sparkSession, sparkPlan)
71+ val mode = broadcastMode(buildKeys, executedPlan.output)
7272 // plan a broadcast exchange of the build side of the join
7373 val exchange = BroadcastExchangeExec (mode, executedPlan)
7474 val name = s " dynamicpruning# ${exprId.id}"
0 commit comments