Skip to content

Commit 2036074

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-26004][SQL] InMemoryTable support StartsWith predicate push down
## What changes were proposed in this pull request? [SPARK-24638](https://issues.apache.org/jira/browse/SPARK-24638) adds support for Parquet file `StartsWith` predicate push down. `InMemoryTable` can also support this feature. This is an example to explain how it works, Imagine that the `id` column stored as below: Partition ID | lowerBound | upperBound -- | -- | -- p1 | '1' | '9' p2 | '10' | '19' p3 | '20' | '29' p4 | '30' | '39' p5 | '40' | '49' A filter ```df.filter($"id".startsWith("2"))``` or ```id like '2%'``` then we substr lowerBound and upperBound: Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) -- | -- | -- p1 | '1' | '9' p2 | '1' | '1' p3 | '2' | '2' p4 | '3' | '3' p5 | '4' | '4' We can see that we only need to read `p1` and `p3`. ## How was this patch tested? unit tests and benchmark tests benchmark test result: ``` ================================================================================================ Pushdown benchmark for StringStartsWith ================================================================================================ Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz StringStartsWith filter: (value like '10%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ InMemoryTable Vectorized 12068 / 14198 1.3 767.3 1.0X InMemoryTable Vectorized (Pushdown) 5457 / 8662 2.9 347.0 2.2X Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz StringStartsWith filter: (value like '1000%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ InMemoryTable Vectorized 5246 / 5355 3.0 333.5 1.0X InMemoryTable Vectorized (Pushdown) 2185 / 2346 7.2 138.9 2.4X Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU 2.90GHz StringStartsWith filter: (value like '786432%'): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ InMemoryTable Vectorized 5112 / 5312 3.1 325.0 1.0X InMemoryTable Vectorized (Pushdown) 2292 / 2522 6.9 145.7 2.2X ``` Closes #23004 from wangyum/SPARK-26004. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5ebb4b5 commit 2036074

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ case class InMemoryTableScanExec(
194194
}
195195

196196
// Returned filter predicate should return false iff it is impossible for the input expression
197-
// to evaluate to `true' based on statistics collected about this partition batch.
197+
// to evaluate to `true` based on statistics collected about this partition batch.
198198
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
199199
case And(lhs: Expression, rhs: Expression)
200200
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
@@ -237,6 +237,34 @@ case class InMemoryTableScanExec(
237237
if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
238238
list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
239239
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
240+
241+
// This is an example to explain how it works, imagine that the id column stored as follows:
242+
// __________________________________________
243+
// | Partition ID | lowerBound | upperBound |
244+
// |--------------|------------|------------|
245+
// | p1 | '1' | '9' |
246+
// | p2 | '10' | '19' |
247+
// | p3 | '20' | '29' |
248+
// | p4 | '30' | '39' |
249+
// | p5 | '40' | '49' |
250+
// |______________|____________|____________|
251+
//
252+
// A filter: df.filter($"id".startsWith("2")).
253+
// In this case it substr lowerBound and upperBound:
254+
// ________________________________________________________________________________________
255+
// | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
256+
// |--------------|-----------------------------------|-----------------------------------|
257+
// | p1 | '1' | '9' |
258+
// | p2 | '1' | '1' |
259+
// | p3 | '2' | '2' |
260+
// | p4 | '3' | '3' |
261+
// | p5 | '4' | '4' |
262+
// |______________|___________________________________|___________________________________|
263+
//
264+
// We can see that we only need to read p1 and p3.
265+
case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
266+
statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
267+
l <= statsFor(a).upperBound.substr(0, Length(l))
240268
}
241269

242270
lazy val partitionFilters: Seq[Expression] = {

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ class PartitionBatchPruningSuite
170170
}
171171
}
172172

173+
// Support `StartsWith` predicate
174+
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '18%'", 1, 1)(
175+
180 to 189
176+
)
177+
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '%'", 5, 11)(
178+
100 to 200
179+
)
180+
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE '18%' like s", 5, 11)(Seq())
181+
173182
// With disable IN_MEMORY_PARTITION_PRUNING option
174183
test("disable IN_MEMORY_PARTITION_PRUNING") {
175184
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)

0 commit comments

Comments
 (0)