Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ case class InMemoryTableScanExec(
}

// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
// to evaluate to `true` based on statistics collected about this partition batch.
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
Expand Down Expand Up @@ -237,6 +237,34 @@ case class InMemoryTableScanExec(
if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)

// This is an example to explain how it works, imagine that the id column stored as follows:
// __________________________________________
// | Partition ID | lowerBound | upperBound |
// |--------------|------------|------------|
// | p1 | '1' | '9' |
// | p2 | '10' | '19' |
// | p3 | '20' | '29' |
// | p4 | '30' | '39' |
// | p5 | '40' | '49' |
// |______________|____________|____________|
//
// A filter: df.filter($"id".startsWith("2")).
// In this case it substr lowerBound and upperBound:
// ________________________________________________________________________________________
// | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
// |--------------|-----------------------------------|-----------------------------------|
// | p1 | '1' | '9' |
// | p2 | '1' | '1' |
// | p3 | '2' | '2' |
// | p4 | '3' | '3' |
// | p5 | '4' | '4' |
// |______________|___________________________________|___________________________________|
//
// We can see that we only need to read p1 and p3.
case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comment to explain it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to pr description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the comment in the line 240, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Done

statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
l <= statsFor(a).upperBound.substr(0, Length(l))
}

lazy val partitionFilters: Seq[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ class PartitionBatchPruningSuite
}
}

// Support `StartsWith` predicate
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '18%'", 1, 1)(
180 to 189
)
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE s like '%'", 5, 11)(
100 to 200
)
checkBatchPruning("SELECT CAST(s AS INT) FROM pruningStringData WHERE '18%' like s", 5, 11)(Seq())

// With disable IN_MEMORY_PARTITION_PRUNING option
test("disable IN_MEMORY_PARTITION_PRUNING") {
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, false)
Expand Down