Skip to content

Commit 1b7c636

Browse files
WangGuangxinviirya
authored andcommitted
[SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet
### What changes were proposed in this pull request? Push down StringEndsWith/Contains to Parquet so that we can leverage Parquet Dictionary Filtering ### Why are the changes needed? Improve performance. FilterPushDownBenchmark: ``` ================================================================================================ Pushdown benchmark for StringEndsWith ================================================================================================ OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%10'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7666 7771 117 2.1 487.4 1.0X Parquet Vectorized (Pushdown) 540 554 18 29.1 34.3 14.2X Native ORC Vectorized 8206 8417 203 1.9 521.7 0.9X Native ORC Vectorized (Pushdown) 8120 8674 422 1.9 516.2 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%1000'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7007 7122 224 2.2 445.5 1.0X Parquet Vectorized (Pushdown) 423 485 92 37.2 26.9 16.6X Native ORC Vectorized 7368 7629 373 2.1 468.5 1.0X Native ORC Vectorized (Pushdown) 7998 8349 270 2.0 508.5 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringEndsWith filter: (value like '%786432'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7012 7210 238 2.2 445.8 1.0X Parquet Vectorized (Pushdown) 419 431 14 37.6 26.6 16.7X Native ORC Vectorized 7513 7995 447 2.1 477.6 0.9X Native ORC Vectorized (Pushdown) 8310 8811 448 1.9 528.3 0.8X ================================================================================================ Pushdown benchmark for StringContains ================================================================================================ OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%10%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7588 8125 328 2.1 482.4 1.0X Parquet Vectorized (Pushdown) 1029 1068 25 15.3 65.4 7.4X Native ORC Vectorized 7803 7859 92 2.0 496.1 1.0X Native ORC Vectorized (Pushdown) 8944 9443 459 1.8 568.6 0.8X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%1000%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------- Parquet Vectorized 7476 8343 710 2.1 475.3 1.0X Parquet Vectorized (Pushdown) 424 427 2 37.1 27.0 17.6X Native ORC Vectorized 7503 8261 818 2.1 477.0 1.0X Native ORC Vectorized (Pushdown) 8124 8609 548 1.9 516.5 0.9X OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16 Intel(R) Core(TM) i7-1068NG7 CPU 2.30GHz StringContains filter: (value like '%786432%'): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------ Parquet Vectorized 7070 7274 199 2.2 449.5 1.0X Parquet Vectorized (Pushdown) 441 478 32 35.6 28.1 16.0X Native ORC Vectorized 7564 7937 323 2.1 480.9 0.9X Native ORC Vectorized (Pushdown) 8623 8921 228 1.8 548.2 0.8X ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #36328 from WangGuangxin/pushdown_startwith_using_dict. Authored-by: wangguangxin.cn <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 0d8ace5 commit 1b7c636

File tree

8 files changed

+186
-37
lines changed

8 files changed

+186
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,15 @@ object SQLConf {
959959
.booleanConf
960960
.createWithDefault(true)
961961

962+
val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED =
963+
buildConf("spark.sql.parquet.filterPushdown.stringPredicate")
964+
.doc("If true, enables Parquet filter push-down optimization for string predicate such " +
965+
"as startsWith/endsWith/contains function. This configuration only has an effect when " +
966+
s"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.")
967+
.version("3.4.0")
968+
.internal()
969+
.fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
970+
962971
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
963972
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
964973
.doc("For IN predicate, Parquet filter will push-down a set of OR clauses if its " +
@@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging {
40504059

40514060
def parquetFilterPushDownDecimal: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)
40524061

4053-
def parquetFilterPushDownStringStartWith: Boolean =
4054-
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
4062+
def parquetFilterPushDownStringPredicate: Boolean =
4063+
getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED)
40554064

40564065
def parquetFilterPushDownInFilterThreshold: Int =
40574066
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ class ParquetFileFormat
251251
val pushDownDate = sqlConf.parquetFilterPushDownDate
252252
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
253253
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
254-
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
254+
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
255255
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
256256
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
257257
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
@@ -279,7 +279,7 @@ class ParquetFileFormat
279279
pushDownDate,
280280
pushDownTimestamp,
281281
pushDownDecimal,
282-
pushDownStringStartWith,
282+
pushDownStringPredicate,
283283
pushDownInFilterThreshold,
284284
isCaseSensitive,
285285
datetimeRebaseSpec)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ParquetFilters(
4848
pushDownDate: Boolean,
4949
pushDownTimestamp: Boolean,
5050
pushDownDecimal: Boolean,
51-
pushDownStartWith: Boolean,
51+
pushDownStringPredicate: Boolean,
5252
pushDownInFilterThreshold: Int,
5353
caseSensitive: Boolean,
5454
datetimeRebaseSpec: RebaseSpec) {
@@ -747,7 +747,7 @@ class ParquetFilters(
747747
}
748748

749749
case sources.StringStartsWith(name, prefix)
750-
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
750+
if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
751751
Option(prefix).map { v =>
752752
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
753753
new UserDefinedPredicate[Binary] with Serializable {
@@ -778,6 +778,36 @@ class ParquetFilters(
778778
)
779779
}
780780

781+
case sources.StringEndsWith(name, suffix)
782+
if pushDownStringPredicate && canMakeFilterOn(name, suffix) =>
783+
Option(suffix).map { v =>
784+
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
785+
new UserDefinedPredicate[Binary] with Serializable {
786+
private val suffixStr = UTF8String.fromString(v)
787+
override def canDrop(statistics: Statistics[Binary]): Boolean = false
788+
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
789+
override def keep(value: Binary): Boolean = {
790+
value != null && UTF8String.fromBytes(value.getBytes).endsWith(suffixStr)
791+
}
792+
}
793+
)
794+
}
795+
796+
case sources.StringContains(name, value)
797+
if pushDownStringPredicate && canMakeFilterOn(name, value) =>
798+
Option(value).map { v =>
799+
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
800+
new UserDefinedPredicate[Binary] with Serializable {
801+
private val subStr = UTF8String.fromString(v)
802+
override def canDrop(statistics: Statistics[Binary]): Boolean = false
803+
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
804+
override def keep(value: Binary): Boolean = {
805+
value != null && UTF8String.fromBytes(value.getBytes).contains(subStr)
806+
}
807+
}
808+
)
809+
}
810+
781811
case _ => None
782812
}
783813
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory(
7979
private val pushDownDate = sqlConf.parquetFilterPushDownDate
8080
private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
8181
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
82-
private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
82+
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
8383
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
8484
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
8585
private val int96RebaseModeInRead = options.int96RebaseModeInRead
@@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory(
221221
pushDownDate,
222222
pushDownTimestamp,
223223
pushDownDecimal,
224-
pushDownStringStartWith,
224+
pushDownStringPredicate,
225225
pushDownInFilterThreshold,
226226
isCaseSensitive,
227227
datetimeRebaseSpec)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ case class ParquetScanBuilder(
5252
val pushDownDate = sqlConf.parquetFilterPushDownDate
5353
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
5454
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
55-
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
55+
val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
5656
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
5757
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
5858
val parquetSchema =
@@ -62,7 +62,7 @@ case class ParquetScanBuilder(
6262
pushDownDate,
6363
pushDownTimestamp,
6464
pushDownDecimal,
65-
pushDownStringStartWith,
65+
pushDownStringPredicate,
6666
pushDownInFilterThreshold,
6767
isCaseSensitive,
6868
// The rebase mode doesn't matter here because the filters are used to determine

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3034,7 +3034,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
30343034
}
30353035

30363036
Seq("orc", "parquet").foreach { format =>
3037-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
3037+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "",
3038+
SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> "false") {
30383039
withTempPath { dir =>
30393040
spark.range(10).map(i => (i, i.toString)).toDF("id", "s")
30403041
.write

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
242242
}
243243
}
244244

245+
runBenchmark("Pushdown benchmark for StringEndsWith") {
246+
withTempPath { dir =>
247+
withTempTable("orcTable", "parquetTable") {
248+
prepareStringDictTable(dir, numRows, 200, width)
249+
Seq(
250+
"value like '%10'",
251+
"value like '%1000'",
252+
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}'"
253+
).foreach { whereExpr =>
254+
val title = s"StringEndsWith filter: ($whereExpr)"
255+
filterPushDownBenchmark(numRows, title, whereExpr)
256+
}
257+
}
258+
}
259+
}
260+
261+
runBenchmark("Pushdown benchmark for StringContains") {
262+
withTempPath { dir =>
263+
withTempTable("orcTable", "parquetTable") {
264+
prepareStringDictTable(dir, numRows, 200, width)
265+
Seq(
266+
"value like '%10%'",
267+
"value like '%1000%'",
268+
s"value like '%${mid.toString.substring(0, mid.toString.length - 1)}%'"
269+
).foreach { whereExpr =>
270+
val title = s"StringContains filter: ($whereExpr)"
271+
filterPushDownBenchmark(numRows, title, whereExpr)
272+
}
273+
}
274+
}
275+
}
276+
245277
runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
246278
withTempPath { dir =>
247279
Seq(

0 commit comments

Comments
 (0)