@@ -107,6 +107,7 @@ case class AdaptiveSparkPlanExec(
107107    //  `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work
108108    //  around this case.
109109    EnsureRequirements (optimizeOutRepartition =  requiredDistribution.isDefined),
110+     ValidateSparkPlan ,
110111    RemoveRedundantSorts ,
111112    DisableUnnecessaryBucketedScan 
112113  ) ++  context.session.sessionState.queryStagePrepRules
@@ -295,16 +296,19 @@ case class AdaptiveSparkPlanExec(
295296        //  plans are updated, we can clear the query stage list because at this point the two plans
296297        //  are semantically and physically in sync again.
297298        val  logicalPlan  =  replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
298-         val  (newPhysicalPlan, newLogicalPlan) =  reOptimize(logicalPlan)
299-         val  origCost  =  costEvaluator.evaluateCost(currentPhysicalPlan)
300-         val  newCost  =  costEvaluator.evaluateCost(newPhysicalPlan)
301-         if  (newCost <  origCost || 
299+         val  afterReOptimize  =  reOptimize(logicalPlan)
300+         if  (afterReOptimize.isDefined) {
301+           val  (newPhysicalPlan, newLogicalPlan) =  afterReOptimize.get
302+           val  origCost  =  costEvaluator.evaluateCost(currentPhysicalPlan)
303+           val  newCost  =  costEvaluator.evaluateCost(newPhysicalPlan)
304+           if  (newCost <  origCost || 
302305            (newCost ==  origCost &&  currentPhysicalPlan !=  newPhysicalPlan)) {
303-           logOnLevel(s " Plan changed from  $currentPhysicalPlan to  $newPhysicalPlan" )
304-           cleanUpTempTags(newPhysicalPlan)
305-           currentPhysicalPlan =  newPhysicalPlan
306-           currentLogicalPlan =  newLogicalPlan
307-           stagesToReplace =  Seq .empty[QueryStageExec ]
306+             logOnLevel(s " Plan changed from  $currentPhysicalPlan to  $newPhysicalPlan" )
307+             cleanUpTempTags(newPhysicalPlan)
308+             currentPhysicalPlan =  newPhysicalPlan
309+             currentLogicalPlan =  newLogicalPlan
310+             stagesToReplace =  Seq .empty[QueryStageExec ]
311+           }
308312        }
309313        //  Now that some stages have finished, we can try creating new stages.
310314        result =  createQueryStages(currentPhysicalPlan)
@@ -637,29 +641,35 @@ case class AdaptiveSparkPlanExec(
637641  /**  
638642   * Re-optimize and run physical planning on the current logical plan based on the latest stats. 
639643   */  
640-   private  def  reOptimize (logicalPlan : LogicalPlan ):  (SparkPlan , LogicalPlan ) =  {
641-     logicalPlan.invalidateStatsCache()
642-     val  optimized  =  optimizer.execute(logicalPlan)
643-     val  sparkPlan  =  context.session.sessionState.planner.plan(ReturnAnswer (optimized)).next()
644-     val  newPlan  =  applyPhysicalRules(
645-       sparkPlan,
646-       preprocessingRules ++  queryStagePreparationRules,
647-       Some ((planChangeLogger, " AQE Replanning"  )))
648- 
649-     //  When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
650-     //  add the `BroadcastExchangeExec` node manually in the DPP subquery,
651-     //  not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
652-     //  and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
653-     //  node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
654-     //  Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan
655-     //  is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
656-     val  finalPlan  =  currentPhysicalPlan match  {
657-       case  b : BroadcastExchangeLike 
658-         if  (! newPlan.isInstanceOf [BroadcastExchangeLike ]) =>  b.withNewChildren(Seq (newPlan))
659-       case  _ =>  newPlan
660-     }
644+   private  def  reOptimize (logicalPlan : LogicalPlan ):  Option [(SparkPlan , LogicalPlan )] =  {
645+     try  {
646+       logicalPlan.invalidateStatsCache()
647+       val  optimized  =  optimizer.execute(logicalPlan)
648+       val  sparkPlan  =  context.session.sessionState.planner.plan(ReturnAnswer (optimized)).next()
649+       val  newPlan  =  applyPhysicalRules(
650+         sparkPlan,
651+         preprocessingRules ++  queryStagePreparationRules,
652+         Some ((planChangeLogger, " AQE Replanning"  )))
653+ 
654+       //  When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
655+       //  add the `BroadcastExchangeExec` node manually in the DPP subquery,
656+       //  not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
657+       //  and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
658+       //  node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
659+       //  Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
660+       //  already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
661+       val  finalPlan  =  currentPhysicalPlan match  {
662+         case  b : BroadcastExchangeLike 
663+           if  (! newPlan.isInstanceOf [BroadcastExchangeLike ]) =>  b.withNewChildren(Seq (newPlan))
664+         case  _ =>  newPlan
665+       }
661666
662-     (finalPlan, optimized)
667+       Some ((finalPlan, optimized))
668+     } catch  {
669+       case  e : InvalidAQEPlanException [_] => 
670+         logOnLevel(s " Re-optimize -  ${e.getMessage()}: \n ${e.plan}" )
671+         None 
672+     }
663673  }
664674
665675  /**  
0 commit comments