@@ -49,12 +49,6 @@ import org.apache.spark.sql.types.StructType
4949abstract class SparkStrategy extends GenericStrategy [SparkPlan ] {
5050
5151 override protected def planLater (plan : LogicalPlan ): SparkPlan = PlanLater (plan)
52-
53- override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = {
54- doApply(plan).map(sparkPlan => sparkPlan.withStats(plan.stats))
55- }
56-
57- protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ]
5852}
5953
6054case class PlanLater (plan : LogicalPlan ) extends LeafExecNode {
@@ -73,7 +67,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7367 * Plans special cases of limit operators.
7468 */
7569 object SpecialLimits extends Strategy {
76- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
70+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
7771 case ReturnAnswer (rootPlan) => rootPlan match {
7872 case Limit (IntegerLiteral (limit), Sort (order, true , child))
7973 if limit < conf.topKSortFallbackThreshold =>
@@ -215,7 +209,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
215209 hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL ))
216210 }
217211
218- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
212+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
219213
220214 // If it is an equi-join, we first look at the join hints w.r.t. the following order:
221215 // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
@@ -389,7 +383,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
389383 * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution ]]
390384 */
391385 object StatefulAggregationStrategy extends Strategy {
392- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
386+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
393387 case _ if ! plan.isStreaming => Nil
394388
395389 case EventTimeWatermark (columnName, delay, child) =>
@@ -429,7 +423,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
429423 * Used to plan the streaming deduplicate operator.
430424 */
431425 object StreamingDeduplicationStrategy extends Strategy {
432- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
426+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
433427 case Deduplicate (keys, child) if child.isStreaming =>
434428 StreamingDeduplicateExec (keys, planLater(child)) :: Nil
435429
@@ -446,7 +440,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
446440 * Limit is unsupported for streams in Update mode.
447441 */
448442 case class StreamingGlobalLimitStrategy (outputMode : OutputMode ) extends Strategy {
449- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
443+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
450444 case ReturnAnswer (rootPlan) => rootPlan match {
451445 case Limit (IntegerLiteral (limit), child)
452446 if plan.isStreaming && outputMode == InternalOutputModes .Append =>
@@ -461,7 +455,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
461455 }
462456
463457 object StreamingJoinStrategy extends Strategy {
464- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = {
458+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = {
465459 plan match {
466460 case ExtractEquiJoinKeys (joinType, leftKeys, rightKeys, condition, left, right, _)
467461 if left.isStreaming && right.isStreaming =>
@@ -482,7 +476,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
482476 * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
483477 */
484478 object Aggregation extends Strategy {
485- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
479+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
486480 case PhysicalAggregation (groupingExpressions, aggExpressions, resultExpressions, child)
487481 if aggExpressions.forall(expr => expr.isInstanceOf [AggregateExpression ]) =>
488482 val aggregateExpressions = aggExpressions.map(expr =>
@@ -544,7 +538,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
544538 }
545539
546540 object Window extends Strategy {
547- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
541+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
548542 case PhysicalWindow (
549543 WindowFunctionType .SQL , windowExprs, partitionSpec, orderSpec, child) =>
550544 execution.window.WindowExec (
@@ -562,7 +556,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
562556 protected lazy val singleRowRdd = sparkContext.parallelize(Seq (InternalRow ()), 1 )
563557
564558 object InMemoryScans extends Strategy {
565- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
559+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
566560 case PhysicalOperation (projectList, filters, mem : InMemoryRelation ) =>
567561 pruneFilterProject(
568562 projectList,
@@ -580,7 +574,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
580574 * be replaced with the real relation using the `Source` in `StreamExecution`.
581575 */
582576 object StreamingRelationStrategy extends Strategy {
583- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
577+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
584578 case s : StreamingRelation =>
585579 StreamingRelationExec (s.sourceName, s.output) :: Nil
586580 case s : StreamingExecutionRelation =>
@@ -596,7 +590,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
596590 * in streaming plans. Conversion for batch plans is handled by [[BasicOperators ]].
597591 */
598592 object FlatMapGroupsWithStateStrategy extends Strategy {
599- override def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
593+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
600594 case FlatMapGroupsWithState (
601595 func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, stateEnc, outputMode, _,
602596 timeout, child) =>
@@ -614,7 +608,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
614608 * Strategy to convert EvalPython logical operator to physical operator.
615609 */
616610 object PythonEvals extends Strategy {
617- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
611+ override def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
618612 case ArrowEvalPython (udfs, output, child) =>
619613 ArrowEvalPythonExec (udfs, output, planLater(child)) :: Nil
620614 case BatchEvalPython (udfs, output, child) =>
@@ -625,7 +619,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
625619 }
626620
627621 object BasicOperators extends Strategy {
628- override protected def doApply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
622+ def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
629623 case d : DataWritingCommand => DataWritingCommandExec (d, planLater(d.query)) :: Nil
630624 case r : RunnableCommand => ExecutedCommandExec (r) :: Nil
631625
0 commit comments