From 8097492ec6e85176d3f2947d944d849f29f86add Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Sun, 9 Feb 2020 23:19:01 +0800 Subject: [PATCH 1/8] Refine abstraction code style --- .../sql/execution/DataSourceScanExec.scala | 4 ++-- .../spark/sql/execution/ExistingRDD.scala | 2 +- .../sql/execution/SubqueryBroadcastExec.scala | 4 ++-- .../execution/basicPhysicalOperators.scala | 4 ++-- .../datasources/v2/V1FallbackWriters.scala | 12 +++++------ .../v2/WriteToDataSourceV2Exec.scala | 20 +++++++++---------- .../joins/BroadcastHashJoinExec.scala | 14 ++++++------- .../joins/ShuffledHashJoinExec.scala | 14 ++++++------- .../apache/spark/sql/execution/limit.scala | 8 ++++---- .../apache/spark/sql/execution/objects.scala | 12 +++++------ .../python/ArrowEvalPythonExec.scala | 10 ++++++---- .../python/BatchEvalPythonExec.scala | 6 ++++-- .../sql/execution/python/EvalPythonExec.scala | 8 ++++++-- .../execution/python/WindowInPandasExec.scala | 8 ++++---- .../FlatMapGroupsWithStateExec.scala | 2 +- .../streaming/statefulOperators.scala | 8 ++++---- .../execution/streaming/streamingLimits.scala | 2 +- .../sql/execution/window/WindowExec.scala | 8 ++++---- .../sql/execution/window/WindowExecBase.scala | 13 +++++++----- 19 files changed, 85 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0d759085a7e2c..ef7598ba5b5ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -102,7 +102,7 @@ case class RowDataSourceScanExec( filters: Set[Filter], handledFilters: Set[Filter], rdd: RDD[InternalRow], - @transient relation: BaseRelation, + @transient override val relation: BaseRelation, override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with InputRDDCodegen { @@ -158,7 +158,7 @@ case class RowDataSourceScanExec( * @param tableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( - @transient relation: HadoopFsRelation, + @transient override val relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1ab183fe843ff..309c52aaab69f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -57,7 +57,7 @@ case class ExternalRDD[T]( /** Physical plan node for scanning data from an RDD. */ case class ExternalRDDScanExec[T]( - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala index e9f2f6a2cdfaf..98bbfd40566fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala @@ -39,10 +39,10 @@ import org.apache.spark.util.ThreadUtils * @param child the BroadcastExchange from the build side of the join */ case class SubqueryBroadcastExec( - name: String, + override val name: String, index: Int, buildKeys: Seq[Expression], - child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { + override val child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c35c48496e1c9..c41f865329488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -738,7 +738,7 @@ abstract class BaseSubqueryExec extends SparkPlan { /** * Physical plan for a subquery. */ -case class SubqueryExec(name: String, child: SparkPlan) +case class SubqueryExec(override val name: String, override val child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { override lazy val metrics = Map( @@ -797,7 +797,7 @@ object SubqueryExec { /** * A wrapper for reused [[BaseSubqueryExec]]. */ -case class ReusedSubqueryExec(child: BaseSubqueryExec) +case class ReusedSubqueryExec(override val child: BaseSubqueryExec) extends BaseSubqueryExec with LeafExecNode { override def name: String = child.name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index f97300025400d..48fc7bd0a6179 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -37,9 +37,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * Rows in the output data set are appended. */ case class AppendDataExecV1( - table: SupportsWrite, - writeOptions: CaseInsensitiveStringMap, - plan: LogicalPlan) extends V1FallbackWriters { + override val table: SupportsWrite, + override val writeOptions: CaseInsensitiveStringMap, + override val plan: LogicalPlan) extends V1FallbackWriters { override protected def doExecute(): RDD[InternalRow] = { writeWithV1(newWriteBuilder().buildForV1Write()) @@ -58,10 +58,10 @@ case class AppendDataExecV1( * AlwaysTrue to delete all rows. */ case class OverwriteByExpressionExecV1( - table: SupportsWrite, + override val table: SupportsWrite, deleteWhere: Array[Filter], - writeOptions: CaseInsensitiveStringMap, - plan: LogicalPlan) extends V1FallbackWriters { + override val writeOptions: CaseInsensitiveStringMap, + override val plan: LogicalPlan) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index e360a9e656a16..94080a7dc8ef3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -62,8 +62,8 @@ case class CreateTableAsSelectExec( catalog: TableCatalog, ident: Identifier, partitioning: Seq[Transform], - plan: LogicalPlan, - query: SparkPlan, + override val plan: LogicalPlan, + override val query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends V2TableWriteExec with SupportsV1Write { @@ -119,8 +119,8 @@ case class AtomicCreateTableAsSelectExec( catalog: StagingTableCatalog, ident: Identifier, partitioning: Seq[Transform], - plan: LogicalPlan, - query: SparkPlan, + override val plan: LogicalPlan, + override val query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends AtomicTableWriteExec { @@ -153,8 +153,8 @@ case class ReplaceTableAsSelectExec( catalog: TableCatalog, ident: Identifier, partitioning: Seq[Transform], - plan: LogicalPlan, - query: SparkPlan, + override val plan: LogicalPlan, + override val query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, orCreate: Boolean) extends V2TableWriteExec with SupportsV1Write { @@ -219,8 +219,8 @@ case class AtomicReplaceTableAsSelectExec( catalog: StagingTableCatalog, ident: Identifier, partitioning: Seq[Transform], - plan: LogicalPlan, - query: SparkPlan, + override val plan: LogicalPlan, + override val query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, orCreate: Boolean) extends AtomicTableWriteExec { @@ -253,7 +253,7 @@ case class AtomicReplaceTableAsSelectExec( case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + override val query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { override protected def doExecute(): RDD[InternalRow] = { writeWithV2(newWriteBuilder().buildForBatch()) @@ -274,7 +274,7 @@ case class OverwriteByExpressionExec( table: SupportsWrite, deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, - query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + override val query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index fd4a7897c7ad1..e4c85a308d9a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -37,13 +37,13 @@ import org.apache.spark.sql.types.{BooleanType, LongType} * relation is not shuffled. */ case class BroadcastHashJoinExec( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - joinType: JoinType, - buildSide: BuildSide, - condition: Option[Expression], - left: SparkPlan, - right: SparkPlan) + override val leftKeys: Seq[Expression], + override val rightKeys: Seq[Expression], + override val joinType: JoinType, + override val buildSide: BuildSide, + override val condition: Option[Expression], + override val left: SparkPlan, + override val right: SparkPlan) extends BinaryExecNode with HashJoin with CodegenSupport { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index a8361fd7dd559..4d2d9f8a802c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -32,13 +32,13 @@ import org.apache.spark.sql.execution.metric.SQLMetrics * Performs a hash join of two child relations by first shuffling the data using the join keys. */ case class ShuffledHashJoinExec( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - joinType: JoinType, - buildSide: BuildSide, - condition: Option[Expression], - left: SparkPlan, - right: SparkPlan) + override val leftKeys: Seq[Expression], + override val rightKeys: Seq[Expression], + override val joinType: JoinType, + override val buildSide: BuildSide, + override val condition: Option[Expression], + override val left: SparkPlan, + override val right: SparkPlan) extends BinaryExecNode with HashJoin { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index ddbd0a343ffcf..5f3b66d9c2347 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -41,7 +41,7 @@ trait LimitExec extends UnaryExecNode { * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { +case class CollectLimitExec(override val limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) @@ -71,7 +71,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { * This operator will be used when a logical `Tail` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec { +case class CollectTailExec(override val limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTail(limit) @@ -142,7 +142,7 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { /** * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class LocalLimitExec(override val limit: Int, child: SparkPlan) extends BaseLimitExec { override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -152,7 +152,7 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { /** * Take the first `limit` elements of the child's single output partition. */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(override val limit: Int, child: SparkPlan) extends BaseLimitExec { override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index d05113431df41..dc3043f2b9a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -74,7 +74,7 @@ trait ObjectConsumerExec extends UnaryExecNode { */ case class DeserializeToObjectExec( deserializer: Expression, - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -182,7 +182,7 @@ object ObjectOperator { */ case class MapPartitionsExec( func: Iterator[Any] => Iterator[Any], - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec { @@ -263,7 +263,7 @@ case class MapPartitionsInRWithArrowExec( */ case class MapElementsExec( func: AnyRef, - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec with CodegenSupport { @@ -378,7 +378,7 @@ case class MapGroupsExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -444,7 +444,7 @@ case class FlatMapGroupsInRExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -586,7 +586,7 @@ case class CoGroupExec( rightGroup: Seq[Attribute], leftAttr: Seq[Attribute], rightAttr: Seq[Attribute], - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, left: SparkPlan, right: SparkPlan) extends BinaryExecNode with ObjectProducerExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index e714554f108ff..73f5a629f0800 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.python import scala.collection.JavaConverters._ import org.apache.spark.TaskContext -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan @@ -59,9 +59,11 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int) /** * A physical plan that evaluates a [[PythonUDF]]. */ -case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan, - evalType: Int) - extends EvalPythonExec(udfs, resultAttrs, child) { +case class ArrowEvalPythonExec( + override val udfs: Seq[PythonUDF], + override val resultAttrs: Seq[Attribute], + child: SparkPlan, + evalType: Int) extends EvalPythonExec { private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 02bfbc4949b37..4419384278df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -31,8 +31,10 @@ import org.apache.spark.sql.types.{StructField, StructType} /** * A physical plan that evaluates a [[PythonUDF]] */ -case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) - extends EvalPythonExec(udfs, resultAttrs, child) { +case class BatchEvalPythonExec( + override val udfs: Seq[PythonUDF], + override val resultAttrs: Seq[Attribute], + child: SparkPlan) extends EvalPythonExec { protected override def evaluate( funcs: Seq[ChainedPythonFunctions], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index a0f23e925d237..0ffb67729d844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -57,8 +57,12 @@ import org.apache.spark.util.Utils * there should be always some rows buffered in the socket or Python process, so the pulling from * RowQueue ALWAYS happened after pushing into it. */ -abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) - extends UnaryExecNode { +abstract class EvalPythonExec extends UnaryExecNode { + def udfs: Seq[PythonUDF] + + def resultAttrs: Seq[Attribute] + + override def child: SparkPlan override def output: Seq[Attribute] = child.output ++ resultAttrs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index f54c4b8f22066..1d4d8d71b000d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -80,11 +80,11 @@ import org.apache.spark.util.Utils * window. */ case class WindowInPandasExec( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], + override val windowExpression: Seq[NamedExpression], + override val partitionSpec: Seq[Expression], + override val orderSpec: Seq[SortOrder], child: SparkPlan) - extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { + extends WindowExecBase { override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index fe91d24912222..f337223c8a626 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -47,7 +47,7 @@ case class FlatMapGroupsWithStateExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - outputObjAttr: Attribute, + override val outputObjAttr: Attribute, stateInfo: Option[StatefulOperatorStateInfo], stateEncoder: ExpressionEncoder[Any], stateFormatVersion: Int, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1bec924ba219a..bee7e489c4cba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -275,10 +275,10 @@ case class StateStoreRestoreExec( * For each input tuple, the key is calculated and the tuple is `put` into the [[StateStore]]. */ case class StateStoreSaveExec( - keyExpressions: Seq[Attribute], + override val keyExpressions: Seq[Attribute], stateInfo: Option[StatefulOperatorStateInfo] = None, outputMode: Option[OutputMode] = None, - eventTimeWatermark: Option[Long] = None, + override val eventTimeWatermark: Option[Long] = None, stateFormatVersion: Int, child: SparkPlan) extends UnaryExecNode with StateStoreWriter with WatermarkSupport { @@ -427,10 +427,10 @@ case class StateStoreSaveExec( /** Physical operator for executing streaming Deduplicate. */ case class StreamingDeduplicateExec( - keyExpressions: Seq[Attribute], + override val keyExpressions: Seq[Attribute], child: SparkPlan, stateInfo: Option[StatefulOperatorStateInfo] = None, - eventTimeWatermark: Option[Long] = None) + override val eventTimeWatermark: Option[Long] = None) extends UnaryExecNode with StateStoreWriter with WatermarkSupport { /** Distribute by grouping attributes */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala index b19540253d7eb..61ebfd5ae11f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala @@ -105,7 +105,7 @@ case class StreamingGlobalLimitExec( * stateful operation within `child` commits all the state changes (many stateful operations * commit state changes only after the iterator is consumed). */ -case class StreamingLocalLimitExec(limit: Int, child: SparkPlan) +case class StreamingLocalLimitExec(override val limit: Int, child: SparkPlan) extends LimitExec { override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index d191f3790ffa8..f44318d16bdab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -79,11 +79,11 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, * of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]]. */ case class WindowExec( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], + override val windowExpression: Seq[NamedExpression], + override val partitionSpec: Seq[Expression], + override val orderSpec: Seq[SortOrder], child: SparkPlan) - extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { + extends WindowExecBase { override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index d5d11c45f8535..c5d096a2d028b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -26,11 +26,14 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} -abstract class WindowExecBase( - windowExpression: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: SparkPlan) extends UnaryExecNode { +abstract class WindowExecBase extends UnaryExecNode { + def windowExpression: Seq[NamedExpression] + + def partitionSpec: Seq[Expression] + + def orderSpec: Seq[SortOrder] + + override def child: SparkPlan /** * Create the resulting projection. From d04092f53757ea619abc3dfc8e454189e72eebbe Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 11 Feb 2020 21:39:22 +0800 Subject: [PATCH 2/8] adjust code based on more commented rules --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- .../spark/sql/execution/SubqueryBroadcastExec.scala | 4 ++-- .../spark/sql/execution/basicPhysicalOperators.scala | 2 +- .../execution/datasources/v2/V1FallbackWriters.scala | 6 +++--- .../datasources/v2/WriteToDataSourceV2Exec.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/limit.scala | 2 +- .../org/apache/spark/sql/execution/objects.scala | 12 ++++++------ .../sql/execution/python/ArrowEvalPythonExec.scala | 8 +++----- .../sql/execution/python/BatchEvalPythonExec.scala | 6 ++---- .../sql/execution/python/WindowInPandasExec.scala | 6 +++--- .../streaming/FlatMapGroupsWithStateExec.scala | 4 ++-- .../sql/execution/streaming/statefulOperators.scala | 4 ++-- .../sql/execution/streaming/streamingLimits.scala | 2 +- .../spark/sql/execution/window/WindowExec.scala | 6 +++--- .../spark/sql/execution/window/WindowExecBase.scala | 2 -- 15 files changed, 32 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 309c52aaab69f..1ab183fe843ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -57,7 +57,7 @@ case class ExternalRDD[T]( /** Physical plan node for scanning data from an RDD. */ case class ExternalRDDScanExec[T]( - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala index 98bbfd40566fb..e9f2f6a2cdfaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala @@ -39,10 +39,10 @@ import org.apache.spark.util.ThreadUtils * @param child the BroadcastExchange from the build side of the join */ case class SubqueryBroadcastExec( - override val name: String, + name: String, index: Int, buildKeys: Seq[Expression], - override val child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { + child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c41f865329488..ee1794e78efd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -738,7 +738,7 @@ abstract class BaseSubqueryExec extends SparkPlan { /** * Physical plan for a subquery. */ -case class SubqueryExec(override val name: String, override val child: SparkPlan) +case class SubqueryExec(name: String, child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index 48fc7bd0a6179..b01d894da9eff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -58,10 +58,10 @@ case class AppendDataExecV1( * AlwaysTrue to delete all rows. */ case class OverwriteByExpressionExecV1( - override val table: SupportsWrite, + table: SupportsWrite, deleteWhere: Array[Filter], - override val writeOptions: CaseInsensitiveStringMap, - override val plan: LogicalPlan) extends V1FallbackWriters { + writeOptions: CaseInsensitiveStringMap, + plan: LogicalPlan) extends V1FallbackWriters { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 94080a7dc8ef3..f22d23c8b9fde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -62,8 +62,8 @@ case class CreateTableAsSelectExec( catalog: TableCatalog, ident: Identifier, partitioning: Seq[Transform], - override val plan: LogicalPlan, - override val query: SparkPlan, + plan: LogicalPlan, + query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends V2TableWriteExec with SupportsV1Write { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5f3b66d9c2347..0a9b8b5890d7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -41,7 +41,7 @@ trait LimitExec extends UnaryExecNode { * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimitExec(override val limit: Int, child: SparkPlan) extends LimitExec { +case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index dc3043f2b9a7e..d05113431df41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -74,7 +74,7 @@ trait ObjectConsumerExec extends UnaryExecNode { */ case class DeserializeToObjectExec( deserializer: Expression, - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -182,7 +182,7 @@ object ObjectOperator { */ case class MapPartitionsExec( func: Iterator[Any] => Iterator[Any], - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec { @@ -263,7 +263,7 @@ case class MapPartitionsInRWithArrowExec( */ case class MapElementsExec( func: AnyRef, - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec with CodegenSupport { @@ -378,7 +378,7 @@ case class MapGroupsExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -444,7 +444,7 @@ case class FlatMapGroupsInRExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def outputPartitioning: Partitioning = child.outputPartitioning @@ -586,7 +586,7 @@ case class CoGroupExec( rightGroup: Seq[Attribute], leftAttr: Seq[Attribute], rightAttr: Seq[Attribute], - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, left: SparkPlan, right: SparkPlan) extends BinaryExecNode with ObjectProducerExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 73f5a629f0800..67f075f0785fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -59,11 +59,9 @@ private[spark] class BatchIterator[T](iter: Iterator[T], batchSize: Int) /** * A physical plan that evaluates a [[PythonUDF]]. */ -case class ArrowEvalPythonExec( - override val udfs: Seq[PythonUDF], - override val resultAttrs: Seq[Attribute], - child: SparkPlan, - evalType: Int) extends EvalPythonExec { +case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan, + evalType: Int) + extends EvalPythonExec { private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 4419384278df6..b6d8e59877f17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -31,10 +31,8 @@ import org.apache.spark.sql.types.{StructField, StructType} /** * A physical plan that evaluates a [[PythonUDF]] */ -case class BatchEvalPythonExec( - override val udfs: Seq[PythonUDF], - override val resultAttrs: Seq[Attribute], - child: SparkPlan) extends EvalPythonExec { +case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], child: SparkPlan) + extends EvalPythonExec { protected override def evaluate( funcs: Seq[ChainedPythonFunctions], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 1d4d8d71b000d..983fe9db73824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -80,9 +80,9 @@ import org.apache.spark.util.Utils * window. */ case class WindowInPandasExec( - override val windowExpression: Seq[NamedExpression], - override val partitionSpec: Seq[Expression], - override val orderSpec: Seq[SortOrder], + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: SparkPlan) extends WindowExecBase { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index f337223c8a626..d65c4ffbb7a24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -47,14 +47,14 @@ case class FlatMapGroupsWithStateExec( valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], - override val outputObjAttr: Attribute, + outputObjAttr: Attribute, stateInfo: Option[StatefulOperatorStateInfo], stateEncoder: ExpressionEncoder[Any], stateFormatVersion: Int, outputMode: OutputMode, timeoutConf: GroupStateTimeout, batchTimestampMs: Option[Long], - override val eventTimeWatermark: Option[Long], + eventTimeWatermark: Option[Long], child: SparkPlan ) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index bee7e489c4cba..616d994e05fb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -275,10 +275,10 @@ case class StateStoreRestoreExec( * For each input tuple, the key is calculated and the tuple is `put` into the [[StateStore]]. */ case class StateStoreSaveExec( - override val keyExpressions: Seq[Attribute], + keyExpressions: Seq[Attribute], stateInfo: Option[StatefulOperatorStateInfo] = None, outputMode: Option[OutputMode] = None, - override val eventTimeWatermark: Option[Long] = None, + eventTimeWatermark: Option[Long] = None, stateFormatVersion: Int, child: SparkPlan) extends UnaryExecNode with StateStoreWriter with WatermarkSupport { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala index 61ebfd5ae11f5..b19540253d7eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala @@ -105,7 +105,7 @@ case class StreamingGlobalLimitExec( * stateful operation within `child` commits all the state changes (many stateful operations * commit state changes only after the iterator is consumed). */ -case class StreamingLocalLimitExec(override val limit: Int, child: SparkPlan) +case class StreamingLocalLimitExec(limit: Int, child: SparkPlan) extends LimitExec { override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index f44318d16bdab..1ec38f0625d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -79,9 +79,9 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, * of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]]. */ case class WindowExec( - override val windowExpression: Seq[NamedExpression], - override val partitionSpec: Seq[Expression], - override val orderSpec: Seq[SortOrder], + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: SparkPlan) extends WindowExecBase { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index c5d096a2d028b..f1e5f9b0c64bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -33,8 +33,6 @@ abstract class WindowExecBase extends UnaryExecNode { def orderSpec: Seq[SortOrder] - override def child: SparkPlan - /** * Create the resulting projection. * From 183eb4102784c17a91f30822c267524ecffa97b9 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 11 Feb 2020 21:47:53 +0800 Subject: [PATCH 3/8] adjust code based on more commented rules --- .../sql/execution/basicPhysicalOperators.scala | 2 +- .../datasources/v2/V1FallbackWriters.scala | 6 +++--- .../datasources/v2/WriteToDataSourceV2Exec.scala | 16 ++++++++-------- .../org/apache/spark/sql/execution/limit.scala | 6 +++--- .../execution/streaming/statefulOperators.scala | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index ee1794e78efd8..c35c48496e1c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -797,7 +797,7 @@ object SubqueryExec { /** * A wrapper for reused [[BaseSubqueryExec]]. */ -case class ReusedSubqueryExec(override val child: BaseSubqueryExec) +case class ReusedSubqueryExec(child: BaseSubqueryExec) extends BaseSubqueryExec with LeafExecNode { override def name: String = child.name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index b01d894da9eff..f97300025400d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -37,9 +37,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * Rows in the output data set are appended. */ case class AppendDataExecV1( - override val table: SupportsWrite, - override val writeOptions: CaseInsensitiveStringMap, - override val plan: LogicalPlan) extends V1FallbackWriters { + table: SupportsWrite, + writeOptions: CaseInsensitiveStringMap, + plan: LogicalPlan) extends V1FallbackWriters { override protected def doExecute(): RDD[InternalRow] = { writeWithV1(newWriteBuilder().buildForV1Write()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f22d23c8b9fde..e360a9e656a16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -119,8 +119,8 @@ case class AtomicCreateTableAsSelectExec( catalog: StagingTableCatalog, ident: Identifier, partitioning: Seq[Transform], - override val plan: LogicalPlan, - override val query: SparkPlan, + plan: LogicalPlan, + query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends AtomicTableWriteExec { @@ -153,8 +153,8 @@ case class ReplaceTableAsSelectExec( catalog: TableCatalog, ident: Identifier, partitioning: Seq[Transform], - override val plan: LogicalPlan, - override val query: SparkPlan, + plan: LogicalPlan, + query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, orCreate: Boolean) extends V2TableWriteExec with SupportsV1Write { @@ -219,8 +219,8 @@ case class AtomicReplaceTableAsSelectExec( catalog: StagingTableCatalog, ident: Identifier, partitioning: Seq[Transform], - override val plan: LogicalPlan, - override val query: SparkPlan, + plan: LogicalPlan, + query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, orCreate: Boolean) extends AtomicTableWriteExec { @@ -253,7 +253,7 @@ case class AtomicReplaceTableAsSelectExec( case class AppendDataExec( table: SupportsWrite, writeOptions: CaseInsensitiveStringMap, - override val query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { override protected def doExecute(): RDD[InternalRow] = { writeWithV2(newWriteBuilder().buildForBatch()) @@ -274,7 +274,7 @@ case class OverwriteByExpressionExec( table: SupportsWrite, deleteWhere: Array[Filter], writeOptions: CaseInsensitiveStringMap, - override val query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 0a9b8b5890d7e..ddbd0a343ffcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -71,7 +71,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec { * This operator will be used when a logical `Tail` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectTailExec(override val limit: Int, child: SparkPlan) extends LimitExec { +case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTail(limit) @@ -142,7 +142,7 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { /** * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ -case class LocalLimitExec(override val limit: Int, child: SparkPlan) extends BaseLimitExec { +case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -152,7 +152,7 @@ case class LocalLimitExec(override val limit: Int, child: SparkPlan) extends Bas /** * Take the first `limit` elements of the child's single output partition. */ -case class GlobalLimitExec(override val limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 616d994e05fb6..1bec924ba219a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -427,10 +427,10 @@ case class StateStoreSaveExec( /** Physical operator for executing streaming Deduplicate. */ case class StreamingDeduplicateExec( - override val keyExpressions: Seq[Attribute], + keyExpressions: Seq[Attribute], child: SparkPlan, stateInfo: Option[StatefulOperatorStateInfo] = None, - override val eventTimeWatermark: Option[Long] = None) + eventTimeWatermark: Option[Long] = None) extends UnaryExecNode with StateStoreWriter with WatermarkSupport { /** Distribute by grouping attributes */ From b94871be50b365aece06047b5ea5a4a9507e26df Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 11 Feb 2020 21:56:25 +0800 Subject: [PATCH 4/8] minor fix --- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 0ffb67729d844..2ac2ac8458713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -62,8 +62,6 @@ abstract class EvalPythonExec extends UnaryExecNode { def resultAttrs: Seq[Attribute] - override def child: SparkPlan - override def output: Seq[Attribute] = child.output ++ resultAttrs override def producedAttributes: AttributeSet = AttributeSet(resultAttrs) From cdb0e3e4983681219e940e9d72bba70305e6c2c9 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 11 Feb 2020 22:00:27 +0800 Subject: [PATCH 5/8] refine HashJoin --- .../execution/joins/BroadcastHashJoinExec.scala | 14 +++++++------- .../spark/sql/execution/joins/HashJoin.scala | 14 +++++++------- .../sql/execution/joins/ShuffledHashJoinExec.scala | 14 +++++++------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index e4c85a308d9a8..fd4a7897c7ad1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -37,13 +37,13 @@ import org.apache.spark.sql.types.{BooleanType, LongType} * relation is not shuffled. */ case class BroadcastHashJoinExec( - override val leftKeys: Seq[Expression], - override val rightKeys: Seq[Expression], - override val joinType: JoinType, - override val buildSide: BuildSide, - override val condition: Option[Expression], - override val left: SparkPlan, - override val right: SparkPlan) + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with HashJoin with CodegenSupport { override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 137f0b87a2f3d..57bf3fbb79d2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -29,13 +29,13 @@ import org.apache.spark.sql.types.{IntegralType, LongType} trait HashJoin { self: SparkPlan => - val leftKeys: Seq[Expression] - val rightKeys: Seq[Expression] - val joinType: JoinType - val buildSide: BuildSide - val condition: Option[Expression] - val left: SparkPlan - val right: SparkPlan + def leftKeys: Seq[Expression] + def rightKeys: Seq[Expression] + def joinType: JoinType + def buildSide: BuildSide + def condition: Option[Expression] + def left: SparkPlan + def right: SparkPlan override def simpleStringWithNodeId(): String = { val opId = ExplainUtils.getOpId(this) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 4d2d9f8a802c2..a8361fd7dd559 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -32,13 +32,13 @@ import org.apache.spark.sql.execution.metric.SQLMetrics * Performs a hash join of two child relations by first shuffling the data using the join keys. */ case class ShuffledHashJoinExec( - override val leftKeys: Seq[Expression], - override val rightKeys: Seq[Expression], - override val joinType: JoinType, - override val buildSide: BuildSide, - override val condition: Option[Expression], - override val left: SparkPlan, - override val right: SparkPlan) + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with HashJoin { override lazy val metrics = Map( From d07cc1b5f279e06716771f076a661f3d0b0e1ab4 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Wed, 12 Feb 2020 10:42:02 +0800 Subject: [PATCH 6/8] address comment --- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala | 1 - .../org/apache/spark/sql/execution/window/WindowExecBase.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 2ac2ac8458713..58f6a27ccf652 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -59,7 +59,6 @@ import org.apache.spark.util.Utils */ abstract class EvalPythonExec extends UnaryExecNode { def udfs: Seq[PythonUDF] - def resultAttrs: Seq[Attribute] override def output: Seq[Attribute] = child.output ++ resultAttrs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index f1e5f9b0c64bb..9e9ec0c13b413 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -28,9 +28,7 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, abstract class WindowExecBase extends UnaryExecNode { def windowExpression: Seq[NamedExpression] - def partitionSpec: Seq[Expression] - def orderSpec: Seq[SortOrder] /** From 364f5a70d98260573f9314592ff8ca8a39459a02 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 25 Feb 2020 18:01:19 +0800 Subject: [PATCH 7/8] use trait --- .../org/apache/spark/sql/execution/python/EvalPythonExec.scala | 2 +- .../org/apache/spark/sql/execution/window/WindowExecBase.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 58f6a27ccf652..96e3bb721a822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -57,7 +57,7 @@ import org.apache.spark.util.Utils * there should be always some rows buffered in the socket or Python process, so the pulling from * RowQueue ALWAYS happened after pushing into it. */ -abstract class EvalPythonExec extends UnaryExecNode { +trait EvalPythonExec extends UnaryExecNode { def udfs: Seq[PythonUDF] def resultAttrs: Seq[Attribute] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index 9e9ec0c13b413..10f7cf42afb02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} -abstract class WindowExecBase extends UnaryExecNode { +trait WindowExecBase extends UnaryExecNode { def windowExpression: Seq[NamedExpression] def partitionSpec: Seq[Expression] def orderSpec: Seq[SortOrder] From 839f7b04a766252d034e46fea6b2da3c5ed3c361 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Tue, 25 Feb 2020 23:18:13 +0800 Subject: [PATCH 8/8] address comment --- .../spark/sql/execution/DataSourceScanExec.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ef7598ba5b5ed..b995827d438cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -43,8 +43,8 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet trait DataSourceScanExec extends LeafExecNode { - val relation: BaseRelation - val tableIdentifier: Option[TableIdentifier] + def relation: BaseRelation + def tableIdentifier: Option[TableIdentifier] protected val nodeNamePrefix: String = "" @@ -102,8 +102,8 @@ case class RowDataSourceScanExec( filters: Set[Filter], handledFilters: Set[Filter], rdd: RDD[InternalRow], - @transient override val relation: BaseRelation, - override val tableIdentifier: Option[TableIdentifier]) + @transient relation: BaseRelation, + tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with InputRDDCodegen { def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) @@ -158,13 +158,13 @@ case class RowDataSourceScanExec( * @param tableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( - @transient override val relation: HadoopFsRelation, + @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier]) + tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { // Note that some vals referring the file-based relation are lazy intentionally