Skip to content

Commit ba436c0

Browse files
committed
[SPARK-24478][SQL] Move projection and filter push down to physical conversion
This removes the v2 optimizer rule for push-down and instead pushes filters and required columns when converting to a physical plan, as suggested by marmbrus. This makes the v2 relation cleaner because the output and filters do not change in the logical plan. A side-effect of this change is that the stats from the logical (optimized) plan no longer reflect pushed filters and projection. This is a temporary state, until the planner gathers stats from the physical plan instead. An alternative to this approach is 9d3a11e. The first commit was proposed in apache#21262. This PR replaces apache#21262. Existing tests. Author: Ryan Blue <[email protected]> Closes apache#21503 from rdblue/SPARK-24478-move-push-down-to-physical-conversion.
1 parent 0ba0844 commit ba436c0

File tree

10 files changed

+210
-244
lines changed

10 files changed

+210
-244
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
363363
/**
364364
* Canonicalized copy of this query plan.
365365
*/
366-
protected lazy val canonicalized: PlanType = this
366+
lazy val canonicalized: PlanType = this
367367

368368
/**
369369
* Returns true when the given query plan will return the same results as this query plan.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
/**
2323
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
2424
* interface to report statistics to Spark.
25+
*
26+
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
27+
* DataSourceReader. Implementations that return more accurate statistics based on projection and
28+
* filters will not improve query performance until the planner can push operators before getting
29+
* stats.
2530
*/
2631
@InterfaceStability.Evolving
2732
public interface SupportsReportStatistics extends DataSourceReader {

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
149149
val cls = DataSource.lookupDataSource(source)
150150
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
151151
val source = cls.newInstance().asInstanceOf[DataSourceV2]
152-
val (pathOption, tableOption) = extraOptions.get("path") match {
152+
val options: Map[String, String] = extraOptions.get("path") match {
153153
case Some(path) if !path.contains("/") =>
154154
// without "/", this cannot be a full path. parse it as a table name
155155
val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(path)
156156
// ensure the database is set correctly
157-
val db = ident.database.getOrElse(sparkSession.catalog.currentDatabase)
158-
(None, Some(ident.copy(database = Some(db))))
159-
case Some(path) =>
160-
(Some(path), None)
157+
(extraOptions ++ Map(
158+
"database" -> ident.database.getOrElse(sparkSession.catalog.currentDatabase),
159+
"table" -> ident.table)).toMap
161160
case _ =>
162-
(None, None)
161+
extraOptions.toMap
163162
}
164163

165-
Dataset.ofRows(sparkSession, DataSourceV2Relation(
166-
source, extraOptions.toMap, pathOption, tableOption,
167-
userSpecifiedSchema = userSpecifiedSchema))
164+
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
165+
source, options, userSpecifiedSchema = userSpecifiedSchema))
168166

169167
} else {
170168
// Code path for data source v1.

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,25 +224,24 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
224224
// save variants always match columns by name
225225
extraOptions.put("matchByName", "true")
226226

227-
val (pathOption, tableOption) = extraOptions.get("path") match {
227+
val options: Map[String, String] = extraOptions.get("path") match {
228228
case Some(path) if !path.contains("/") =>
229229
// without "/", this cannot be a full path. parse it as a table name
230230
val ident = df.sparkSession.sessionState.sqlParser.parseTableIdentifier(path)
231231
// ensure the database is set correctly
232-
val db = ident.database.getOrElse(df.sparkSession.catalog.currentDatabase)
233-
(None, Some(ident.copy(database = Some(db))))
234-
case Some(path) =>
235-
(Some(path), None)
232+
(extraOptions ++ Map(
233+
"database" -> ident.database.getOrElse(df.sparkSession.catalog.currentDatabase),
234+
"table" -> ident.table)).toMap
236235
case _ =>
237-
(None, None)
236+
extraOptions.toMap
238237
}
239238

240239
val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
241-
val relation = DataSourceV2Relation(dataSource, extraOptions.toMap, pathOption, tableOption)
240+
val relation = DataSourceV2Relation.create(dataSource, options)
242241

243242
val (overwrite, ifNotExists) = mode match {
244243
case SaveMode.Ignore =>
245-
if (relation.writer(df.logicalPlan.schema, mode).isEmpty) {
244+
if (relation.newWriter(df.logicalPlan.schema, mode).isEmpty) {
246245
return
247246
}
248247
(false, false)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.apache.spark.sql.ExperimentalMethods
2121
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2222
import org.apache.spark.sql.catalyst.optimizer.Optimizer
2323
import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
24-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2PushDown
2524
import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
2625
import org.apache.spark.sql.internal.SQLConf
2726

@@ -35,6 +34,5 @@ class SparkOptimizer(
3534
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
3635
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
3736
Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
38-
Batch("Push down operators to data source scan", Once, DataSourceV2PushDown) :+
3937
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
4038
}

0 commit comments

Comments
 (0)