-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-6910] [SQL] Support for pushing predicates down to metastore for partition pruning #7216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #36516 has finished for PR 7216 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking the version here, I'd see if we can't move this into the per version HiveShim and just override it in the specific version classes, just so that we can be consistent with all of the other version specific code.
|
This is looking very close! Thanks again for doing such a thorough evaluation of the differing things Hive supports here. :) |
|
Test build #37074 has finished for PR 7216 at commit
|
|
Test build #37081 has finished for PR 7216 at commit
|
|
@marmbrus Thank you for reviewing my patch. I think I addressed all your comments. Can you please take a look again? Btw, the last jenkins failures seems unrelated to my PR. In fact, jenkins build had succeeded first and then failed in the 2nd run triggered by my minor commit that fixed imports order. |
|
Test build #37092 has finished for PR 7216 at commit
|
|
Isn't this a test issue? I keep getting the following pyspark error. Looks like it is because of a minor difference in two datetimes |
|
Test build #37100 has finished for PR 7216 at commit
|
|
Test build #1054 has finished for PR 7216 at commit
|
696f4d4 to
aa1490f
Compare
|
Test build #37133 has finished for PR 7216 at commit
|
|
Thanks! I'm going to merge this to master. |
…or partition pruning This PR supersedes my old one apache#6921. Since my patch has changed quite a bit, I am opening a new PR to make it easier to review. The changes include- * Implement `toMetastoreFilter()` function in `HiveShim` that takes `Seq[Expression]` and converts them into a filter string for Hive metastore. * This functions matches all the `AttributeReference` + `BinaryComparisonOp` + `Integral/StringType` patterns in `Seq[Expression]` and fold them into a string. * Change `hiveQlPartitions` field in `MetastoreRelation` to `getHiveQlPartitions()` function that takes a filter string parameter. * Call `getHiveQlPartitions()` in `HiveTableScan` with a filter string. But there are some cases in which predicate pushdown is disabled- Case | Predicate pushdown ------- | ----------------------------- Hive integral and string types | Yes Hive varchar type | No Hive 0.13 and newer | Yes Hive 0.12 and older | No convertMetastoreParquet=false | Yes convertMetastoreParquet=true | No In case of `convertMetastoreParquet=true`, predicates are not pushed down because this conversion happens in an `Analyzer` rule (`HiveMetastoreCatalog.ParquetConversions`). At this point, `HiveTableScan` hasn't run, so predicates are not available. But reading the source code, I think it is intentional to convert the entire Hive table w/ all the partitions into `ParquetRelation` because then `ParquetRelation` can be cached and reused for any query against that table. Please correct me if I am wrong. cc marmbrus Author: Cheolsoo Park <[email protected]> Closes apache#7216 from piaozhexiu/SPARK-6910-2 and squashes the following commits: aa1490f [Cheolsoo Park] Fix ordering of imports c212c4d [Cheolsoo Park] Incorporate review comments 5e93f9d [Cheolsoo Park] Predicate pushdown into Hive metastore
|
@piaozhexiu can you close this pr since github is not closing it automatically? Thanks! |
|
Sure. Thank you for merging it! |
Revert #7216 and #7386. These patch seems to be causing quite a few test failures: ``` Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor322.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:351) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:320) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:180) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:135) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:172) at org.apache.spark.sql.hive.client.ClientWrapper.getPartitionsByFilter(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.HiveTable.getPartitions(ClientInterface.scala:78) at org.apache.spark.sql.hive.MetastoreRelation.getHiveQlPartitions(HiveMetastoreCatalog.scala:670) at org.apache.spark.sql.hive.execution.HiveTableScan.doExecute(HiveTableScan.scala:137) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:89) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:164) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:151) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 85 more Caused by: MetaException(message:Filtering is supported only on partition keys of type string) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:185) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:452) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:357) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:279) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$TreeNode.generateJDOFilter(ExpressionTree.java:243) at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:590) at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:2417) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2029) at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:146) at org.apache.hadoop.hive.metastore.ObjectStore$4.getJdoResult(ObjectStore.java:2332) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-Maven-with-YARN/2945/HADOOP_PROFILE=hadoop-2.4,label=centos/testReport/junit/org.apache.spark.sql.hive.execution/SortMergeCompatibilitySuite/auto_sortmerge_join_16/ Author: Michael Armbrust <[email protected]> Closes #7409 from marmbrus/revertMetastorePushdown and squashes the following commits: 92fabd3 [Michael Armbrust] Revert SPARK-6910 and SPARK-9027 5d3bdf2 [Michael Armbrust] Revert "[SPARK-9027] [SQL] Generalize metastore predicate pushdown"
This PR supersedes my old one #6921. Since my patch has changed quite a bit, I am opening a new PR to make it easier to review.
The changes include-
toMetastoreFilter()function inHiveShimthat takesSeq[Expression]and converts them into a filter string for Hive metastore.AttributeReference+BinaryComparisonOp+Integral/StringTypepatterns inSeq[Expression]and fold them into a string.hiveQlPartitionsfield inMetastoreRelationtogetHiveQlPartitions()function that takes a filter string parameter.getHiveQlPartitions()inHiveTableScanwith a filter string.But there are some cases in which predicate pushdown is disabled-
In case of
convertMetastoreParquet=true, predicates are not pushed down because this conversion happens in anAnalyzerrule (HiveMetastoreCatalog.ParquetConversions). At this point,HiveTableScanhasn't run, so predicates are not available. But reading the source code, I think it is intentional to convert the entire Hive table w/ all the partitions intoParquetRelationbecause thenParquetRelationcan be cached and reused for any query against that table. Please correct me if I am wrong.cc @marmbrus