-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10623] [SQL] Fixes ORC predicate push-down #8799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,11 +31,13 @@ import org.apache.spark.sql.sources._ | |
| * and cannot be used anymore. | ||
| */ | ||
| private[orc] object OrcFilters extends Logging { | ||
| def createFilter(expr: Array[Filter]): Option[SearchArgument] = { | ||
| expr.reduceOption(And).flatMap { conjunction => | ||
| val builder = SearchArgumentFactory.newBuilder() | ||
| buildSearchArgument(conjunction, builder).map(_.build()) | ||
| } | ||
| def createFilter(filters: Array[Filter]): Option[SearchArgument] = { | ||
| for { | ||
| // Combines all filters with `And`s to produce a single conjunction predicate | ||
| conjunction <- filters.reduceOption(And) | ||
| // Then tries to build a single ORC `SearchArgument` for the conjunction predicate | ||
| builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder()) | ||
| } yield builder.build() | ||
| } | ||
|
|
||
| private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = { | ||
|
|
@@ -102,46 +104,32 @@ private[orc] object OrcFilters extends Logging { | |
| negate <- buildSearchArgument(child, builder.startNot()) | ||
| } yield negate.end() | ||
|
|
||
| case EqualTo(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.equals(attribute, _)) | ||
| case EqualTo(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startAnd().equals(attribute, value).end()) | ||
|
|
||
| case EqualNullSafe(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.nullSafeEquals(attribute, _)) | ||
| case EqualNullSafe(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startAnd().nullSafeEquals(attribute, value).end()) | ||
|
|
||
| case LessThan(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.lessThan(attribute, _)) | ||
| case LessThan(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startAnd().lessThan(attribute, value).end()) | ||
|
|
||
| case LessThanOrEqual(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.lessThanEquals(attribute, _)) | ||
| case LessThanOrEqual(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startAnd().lessThanEquals(attribute, value).end()) | ||
|
|
||
| case GreaterThan(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.startNot().lessThanEquals(attribute, _).end()) | ||
| case GreaterThan(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startNot().lessThanEquals(attribute, value).end()) | ||
|
|
||
| case GreaterThanOrEqual(attribute, value) => | ||
| Option(value) | ||
| .filter(isSearchableLiteral) | ||
| .map(builder.startNot().lessThan(attribute, _).end()) | ||
| case GreaterThanOrEqual(attribute, value) if isSearchableLiteral(value) => | ||
| Some(builder.startNot().lessThan(attribute, value).end()) | ||
|
|
||
| case IsNull(attribute) => | ||
| Some(builder.isNull(attribute)) | ||
| Some(builder.startAnd().isNull(attribute).end()) | ||
|
|
||
| case IsNotNull(attribute) => | ||
| Some(builder.startNot().isNull(attribute).end()) | ||
|
|
||
| case In(attribute, values) => | ||
| Option(values) | ||
| .filter(_.forall(isSearchableLiteral)) | ||
| .map(builder.in(attribute, _)) | ||
| case In(attribute, values) if values.forall(isSearchableLiteral) => | ||
| Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok. |
||
|
|
||
| case _ => None | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -344,4 +344,34 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-10623 Enable ORC PPD") { | ||
| withTempPath { dir => | ||
| withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { | ||
| import testImplicits._ | ||
|
|
||
| val path = dir.getCanonicalPath | ||
| sqlContext.range(10).coalesce(1).write.orc(path) | ||
| val df = sqlContext.read.orc(path) | ||
|
|
||
| def checkPredicate(pred: Column, answer: Seq[Long]): Unit = { | ||
| checkAnswer(df.where(pred), answer.map(Row(_))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this test check if the predicate is really pushded down? Or it just check answers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just checks answers. It's annoying that I couldn't find a programmatical way to verify whether its pushed down or not. Checked through logs manually though. |
||
| } | ||
|
|
||
| checkPredicate('id === 5, Seq(5L)) | ||
| checkPredicate('id <=> 5, Seq(5L)) | ||
| checkPredicate('id < 5, 0L to 4L) | ||
| checkPredicate('id <= 5, 0L to 5L) | ||
| checkPredicate('id > 5, 6L to 9L) | ||
| checkPredicate('id >= 5, 5L to 9L) | ||
| checkPredicate('id.isNull, Seq.empty[Long]) | ||
| checkPredicate('id.isNotNull, 0L to 9L) | ||
| checkPredicate('id.isin(1L, 3L, 5L), Seq(1L, 3L, 5L)) | ||
| checkPredicate('id > 0 && 'id < 3, 1L to 2L) | ||
| checkPredicate('id < 1 || 'id > 8, Seq(0L, 9L)) | ||
| checkPredicate(!('id > 3), 0L to 3L) | ||
| checkPredicate(!('id > 0 && 'id < 3), Seq(0L) ++ (3L to 9L)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is only for better readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not very clear how this part works. Can we add some comments?