From f011d7c060853ed950436f7cd99a839ca6d4f776 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 5 Jul 2022 17:37:56 +0800 Subject: [PATCH] Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast --- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/DynamicPartitionPruningSuite.scala | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index e6c8be1397e1d..7aeb1c3432918 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -658,7 +658,7 @@ case class AdaptiveSparkPlanExec( // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is // already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. - val finalPlan = currentPhysicalPlan match { + val finalPlan = inputPlan match { case b: BroadcastExchangeLike if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) case _ => newPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 89749e7de003c..911767177746a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1597,6 +1597,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite with EnableAdaptiveExecutionSuite { + test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") { + val df = sql( + """ + |WITH empty_result AS ( + | SELECT * FROM fact_stats WHERE product_id < 0 + |) + |SELECT * + |FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id + | FROM fact_sk + | JOIN empty_result + | ON fact_sk.product_id = empty_result.product_id) t2 + | JOIN empty_result + | ON t2.store_id = empty_result.store_id + """.stripMargin) + + checkPartitionPruningPredicate(df, false, false) + checkAnswer(df, Nil) + } + test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " + "rather than createSparkPlan to re-plan subquery") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",