diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0d759085a7e2c..b995827d438cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -43,8 +43,8 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet trait DataSourceScanExec extends LeafExecNode { - val relation: BaseRelation - val tableIdentifier: Option[TableIdentifier] + def relation: BaseRelation + def tableIdentifier: Option[TableIdentifier] protected val nodeNamePrefix: String = "" @@ -103,7 +103,7 @@ case class RowDataSourceScanExec( handledFilters: Set[Filter], rdd: RDD[InternalRow], @transient relation: BaseRelation, - override val tableIdentifier: Option[TableIdentifier]) + tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with InputRDDCodegen { def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) @@ -164,7 +164,7 @@ case class FileSourceScanExec( partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier]) + tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { // Note that some vals referring the file-based relation are lazy intentionally diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 137f0b87a2f3d..57bf3fbb79d2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -29,13 +29,13 @@ import org.apache.spark.sql.types.{IntegralType, LongType} trait HashJoin { self: SparkPlan => - val leftKeys: Seq[Expression] - val rightKeys: Seq[Expression] - val joinType: JoinType - val buildSide: BuildSide - val condition: Option[Expression] - val left: SparkPlan - val right: SparkPlan + def leftKeys: Seq[Expression] + def rightKeys: Seq[Expression] + def joinType: JoinType + def buildSide: BuildSide + def condition: Option[Expression] + def left: SparkPlan + def right: SparkPlan override def simpleStringWithNodeId(): String = { val opId = ExplainUtils.getOpId(this) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index e714554f108ff..67f075f0785fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import org.apache.spark.TaskContext -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan @@ -61,7 +61,7 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int) */ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan, evalType: Int) - extends EvalPythonExec(udfs, resultAttrs, child) { + extends EvalPythonExec { private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 02bfbc4949b37..b6d8e59877f17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{StructField, StructType} * A physical plan that evaluates a [[PythonUDF]] */ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) - extends EvalPythonExec(udfs, resultAttrs, child) { + extends EvalPythonExec { protected override def evaluate( funcs: Seq[ChainedPythonFunctions], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index a0f23e925d237..96e3bb721a822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -57,8 +57,9 @@ import org.apache.spark.util.Utils * there should be always some rows buffered in the socket or Python process, so the pulling from * RowQueue ALWAYS happened after pushing into it. */ -abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) - extends UnaryExecNode { +trait EvalPythonExec extends UnaryExecNode { + def udfs: Seq[PythonUDF] + def resultAttrs: Seq[Attribute] override def output: Seq[Attribute] = child.output ++ resultAttrs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index f54c4b8f22066..983fe9db73824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -84,7 +84,7 @@ case class WindowInPandasExec( partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: SparkPlan) - extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { + extends WindowExecBase { override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index fe91d24912222..d65c4ffbb7a24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -54,7 +54,7 @@ case class FlatMapGroupsWithStateExec( outputMode: OutputMode, timeoutConf: GroupStateTimeout, batchTimestampMs: Option[Long], - override val eventTimeWatermark: Option[Long], + eventTimeWatermark: Option[Long], child: SparkPlan ) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index d191f3790ffa8..1ec38f0625d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -83,7 +83,7 @@ case class WindowExec( partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: SparkPlan) - extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { + extends WindowExecBase { override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index d5d11c45f8535..10f7cf42afb02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -26,11 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} -abstract class WindowExecBase( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: SparkPlan) extends UnaryExecNode { +trait WindowExecBase extends UnaryExecNode { + def windowExpression: Seq[NamedExpression] + def partitionSpec: Seq[Expression] + def orderSpec: Seq[SortOrder] /** * Create the resulting projection.