Skip to content

Commit a49b73c

Browse files
committed
Address comments.
1 parent 84bc8dd commit a49b73c

File tree

4 files changed

+18
-18
lines changed

4 files changed

+18
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,13 +2064,13 @@ object SQLConf {
20642064
.createWithDefault(true)
20652065

20662066
val NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST =
2067-
buildConf("spark.sql.optimizer.nestedPredicatePushdown.v1sourceList")
2067+
buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedV1Sources")
20682068
.internal()
20692069
.doc("A comma-separated list of data source short names or fully qualified data source " +
20702070
"implementation class names for which Spark tries to push down predicates for nested " +
2071-
"columns and or names containing `dots` to data sources. Currently, Parquet implements " +
2071+
"columns and/or names containing `dots` to data sources. Currently, Parquet implements " +
20722072
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
2073-
"other data sources don't support this feature yet.")
2073+
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
20742074
.version("3.0.0")
20752075
.stringConf
20762076
.createWithDefault("parquet,orc")
@@ -3099,9 +3099,6 @@ class SQLConf extends Serializable with Logging {
30993099

31003100
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
31013101

3102-
def nestedPredicatePushdownv1SourceList: String =
3103-
getConf(NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST)
3104-
31053102
def serializerNestedSchemaPruningEnabled: Boolean =
31063103
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)
31073104

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -532,8 +532,7 @@ object DataSourceStrategy {
532532
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
533533
* translated [[Filter]]. The map is used for rebuilding
534534
* [[Expression]] from [[Filter]].
535-
* @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled. Default is
536-
* disabled.
535+
* @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled.
537536
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
538537
*/
539538
protected[sql] def translateFilterWithMapping(
@@ -572,12 +571,7 @@ object DataSourceStrategy {
572571
.map(sources.Not)
573572

574573
case other =>
575-
val pushableColumn = if (nestedPredicatePushdownEnabled) {
576-
PushableColumnAndNestedColumn
577-
} else {
578-
PushableColumnWithoutNestedColumn
579-
}
580-
val filter = translateLeafNodeFilter(other, pushableColumn)
574+
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
581575
if (filter.isDefined && translatedFilterToExpr.isDefined) {
582576
translatedFilterToExpr.get(filter.get) = predicate
583577
}
@@ -687,6 +681,16 @@ abstract class PushableColumnBase {
687681
}
688682
}
689683

684+
object PushableColumn {
685+
def apply(nestedPredicatePushdownEnabled: Boolean): PushableColumnBase = {
686+
if (nestedPredicatePushdownEnabled) {
687+
PushableColumnAndNestedColumn
688+
} else {
689+
PushableColumnWithoutNestedColumn
690+
}
691+
}
692+
}
693+
690694
object PushableColumnAndNestedColumn extends PushableColumnBase {
691695
override val nestedPredicatePushdownEnabled = true
692696
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,8 @@ object DataSourceUtils {
7878
relation match {
7979
case hs: HadoopFsRelation =>
8080
val supportedDatasources =
81-
SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST)
82-
.toLowerCase(Locale.ROOT)
83-
.split(",").map(_.trim)
81+
Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST)
82+
.toLowerCase(Locale.ROOT))
8483
supportedDatasources.contains(hs.toString)
8584
case _ => false
8685
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ object FileSourceStrategy extends Strategy with Logging {
182182
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
183183
val pushedFilters = dataFilters
184184
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
185-
logInfo(s"Pushed Filters: " + s"${pushedFilters.mkString(",")}")
185+
logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
186186

187187
// Predicates with both partition keys and attributes need to be evaluated after the scan.
188188
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)

0 commit comments

Comments
 (0)