Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ case class AdaptiveSparkPlanExec(
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
// already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
val finalPlan = currentPhysicalPlan match {
val finalPlan = inputPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
case _ => newPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite {

test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this test case doesn't fail currently in branch-3.2 because we don't have SPARK-39551, @ulysses-you ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun sorry I made it not clear. I mean some tests failed in this pr due to we miss #36953 in branch-3.2. That said, this fix must work together with that pr.

val df = sql(
"""
|WITH empty_result AS (
| SELECT * FROM fact_stats WHERE product_id < 0
|)
|SELECT *
|FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
| FROM fact_sk
| JOIN empty_result
| ON fact_sk.product_id = empty_result.product_id) t2
| JOIN empty_result
| ON t2.store_id = empty_result.store_id
""".stripMargin)

checkPartitionPruningPredicate(df, false, false)
checkAnswer(df, Nil)
}

test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " +
"rather than createSparkPlan to re-plan subquery") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down