Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.stringPredicate")
Copy link
Contributor

@jaceklaskowski jaceklaskowski Apr 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since spark.sql.parquet.filterPushdown.string.startsWith is internal why not replacing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid exising users who have already use it.

.doc("If true, enables Parquet filter push-down optimization for string predicate such " +
"as startsWith/endsWith/contains function. This configuration only has an effect when " +
s"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.")
.version("3.4.0")
.internal()
.fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " +
Expand Down Expand Up @@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging {

def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)

def parquetFilterPushDownStringStartWith: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
def parquetFilterPushDownStringPredicate: Boolean =
getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED)

def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class ParquetFileFormat
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
Expand Down Expand Up @@ -279,7 +279,7 @@ class ParquetFileFormat
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParquetFilters(
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownStringPredicate: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean,
datetimeRebaseSpec: RebaseSpec) {
Expand Down Expand Up @@ -747,7 +747,7 @@ class ParquetFilters(
}

case sources.StringStartsWith(name, prefix)
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
Expand Down Expand Up @@ -778,6 +778,36 @@ class ParquetFilters(
)
}

case sources.StringEndsWith(name, suffix)
if pushDownStringPredicate && canMakeFilterOn(name, suffix) =>
Option(suffix).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val suffixStr = UTF8String.fromString(v)
override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).endsWith(suffixStr)
}
}
)
}

case sources.StringContains(name, value)
if pushDownStringPredicate && canMakeFilterOn(name, value) =>
Option(value).map { v =>
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
new UserDefinedPredicate[Binary] with Serializable {
private val subStr = UTF8String.fromString(v)
override def canDrop(statistics: Statistics[Binary]): Boolean = false
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
override def keep(value: Binary): Boolean = {
value != null && UTF8String.fromBytes(value.getBytes).contains(subStr)
}
}
)
}

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory(
private val pushDownDate = sqlConf.parquetFilterPushDownDate
private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
private val int96RebaseModeInRead = options.int96RebaseModeInRead
Expand Down Expand Up @@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory(
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class ParquetScanBuilder(
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
Expand All @@ -62,7 +62,7 @@ case class ParquetScanBuilder(
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownStringPredicate,
pushDownInFilterThreshold,
isCaseSensitive,
// The rebase mode doesn't matter here because the filters are used to determine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3072,7 +3072,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

Seq("orc", "parquet").foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "",
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") {
withTempPath { dir =>
spark.range(10).map(i => (i, i.toString)).toDF("id", "s")
.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
}
}

runBenchmark("Pushdown benchmark for StringEndsWith") {
withTempPath { dir =>
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, 200, width)
Seq(
"value like '%10'",
"value like '%1000'",
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}'"
).foreach { whereExpr =>
val title = s"StringEndsWith filter: ($whereExpr)"
filterPushDownBenchmark(numRows, title, whereExpr)
}
}
}
}

runBenchmark("Pushdown benchmark for StringContains") {
withTempPath { dir =>
withTempTable("orcTable", "parquetTable") {
prepareStringDictTable(dir, numRows, 200, width)
Seq(
"value like '%10%'",
"value like '%1000%'",
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}%'"
).foreach { whereExpr =>
val title = s"StringContains filter: ($whereExpr)"
filterPushDownBenchmark(numRows, title, whereExpr)
}
}
}
}

runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
withTempPath { dir =>
Seq(
Expand Down
Loading