Skip to content

Conversation

@piaozhexiu
Copy link

@marmbrus per our email conversation, I am sending a PR that implements the idea that I outlined in the jira FYI.

  • Analyzer captures predicates if a unresolved relation is followed by a filter expression and pushes them down into HiveMetastoreCatalog.
  • HiveMetastoreCatalog extracts partition predicates whose types are string and integral and constructs a string representation for them.
  • Hive client invokes getPartitionsByFilter(Table tbl, String filter) with the filter string.

In addition, I understand better the limitations of Hive getPartitionsByFilter(Table tbl, String filter) function now.

  • Good news: it works with integral types as well as string.
  • Bad news: it only works with equals operators.

The following is where the "filtering is supported only on partition keys of type string" error from in Hive-

// metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
public String getFilterPushdownParam(Table table, int partColIndex) throws MetaException {
  boolean isIntegralSupported = doesOperatorSupportIntegral(operator);
  String colType = table.getPartitionKeys().get(partColIndex).getType();
  // Can only support partitions whose types are string, or maybe integers
  if (!colType.equals(serdeConstants.STRING_TYPE_NAME)
      && (!isIntegralSupported || !isIntegralType(colType))) {
    throw new MetaException("Filtering is supported only on partition keys of type " +
        "string" + (isIntegralSupported ? ", or integral types" : ""));
  }
  boolean isStringValue = value instanceof String;
  if (!isStringValue && (!isIntegralSupported || !(value instanceof Long))) {
    throw new MetaException("Filtering is supported only on partition keys of type " +
        "string" + (isIntegralSupported ? ", or integral types" : ""));
  }
  return isStringValue ? (String) value : Long.toString((Long) value);
}

As can be seen, one of following conditions has to be met-

  • Partition key is string type; or
  • Partition key is integral type, and operator supports integral types. (i.e doesOperatorSupportIntegral() returns true.)

But looking at the doesOperatorSupportIntegral() function, only equals and not-equals operators support integral types-

// metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
private static boolean doesOperatorSupportIntegral(Operator operator) {
  // TODO: for SQL-based filtering, this could be amended if we added casts.
  return (operator == Operator.EQUALS)
      || (operator == Operator.NOTEQUALS)
      || (operator == Operator.NOTEQUALS2);
}

This is not good because <, >, <=, and >= which are very common in practice are not supported.

In this PR, I restricted predicate pushdown to =. With that restriction, all the unit tests pass now.

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35427 has finished for PR 6921 at commit 945a882.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35431 has finished for PR 6921 at commit b87e107.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu
Copy link
Author

[error] Failed tests:
[error]     org.apache.spark.sql.hive.ParquetDataSourceOnMetastoreSuite
[error]     org.apache.spark.sql.hive.ParquetMetastoreSuiteBase

I don't understand this. When I run the tests on my laptop, they all pass. Here is the command that I use-

build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive -Phive-thriftserver "test-only org.apache.spark.sql.hive.ParquetDataSourceOnMetastoreSuite"
build/sbt -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive -Phive-thriftserver "test-only org.apache.spark.sql.hive.ParquetMetastoreSuiteBase"

Any idea?

@marmbrus
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jun 22, 2015

Test build #35493 has finished for PR 6921 at commit b87e107.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu
Copy link
Author

I am rebasing it now.

@marmbrus
Copy link
Contributor

@piaozhexiu, thanks for working on this! I did some more investigation on what types of filters seem to be supported. It seems like we might be able to support >, <, <= >= as well, as long as we encode it the filter correctly.

scala> sql("CREATE TABLE test (key INT) PARTITIONED BY (i INT, s STRING)")

// works
scala> org.apache.spark.sql.hive.test.TestHive.catalog.client.getPartitionsByFilter(testTable, "s='11'")
// works
scala> org.apache.spark.sql.hive.test.TestHive.catalog.client.getPartitionsByFilter(testTable, "s>'11'")
// works
scala> org.apache.spark.sql.hive.test.TestHive.catalog.client.getPartitionsByFilter(testTable, "i=11")
// works
scala> org.apache.spark.sql.hive.test.TestHive.catalog.client.getPartitionsByFilter(testTable, "i>11")

// fails: Filtering is supported only on partition keys of type string:
scala> org.apache.spark.sql.hive.test.TestHive.catalog.client.getPartitionsByFilter(testTable, "i>'11'")

I'm curious what problems you ran into exactly and which version of hive were you using when you encountered problems?

A few other comments on the structure of this change?

  • I think we should avoid adding the filter into the logical relation. The problem here is that this logic is running well before we have done filter pushdown, and as a result we could miss significant optimization possibilities. Instead, I'd modify the HiveTableScan, which already has a complete list of predicates that can be used to prune partitions. When we are building the scan, we can pass this expression into the logical relation (and probably remove the lazy val that enumerates all partitions completely).
  • I'd move the logic that constructs the filter expression into the HiveShim and just pass in raw catalyst Expression objects through the client interface. This way if newer hive versions allow more filters in the future we can add them on a version specific basis. The contract can be that the HiveClient does "best effort" filtering, and we will always reevaluate all filters on the set of partitions that are returned by the client.

What do you think?

@piaozhexiu
Copy link
Author

@marmbrus thanks for your comments! Your suggestions make sense. Let me do that.

As for Hive filter types, the problems that I ran into are Spark unit test failures (hive/test). Since Spark builds against Hive 0.13, I've been looking at its source code to understand issues. But perhaps I was using incorrect encoding like you said. Let me verify that.

@SparkQA
Copy link

SparkQA commented Jun 23, 2015

Test build #35495 has finished for PR 6921 at commit 61d488f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 27, 2015

Test build #35891 has finished for PR 6921 at commit cc1efe5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 27, 2015

Test build #35893 has finished for PR 6921 at commit afeb1ba.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 28, 2015

Test build #35915 has finished for PR 6921 at commit 1d345d6.

  • This patch fails some tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu piaozhexiu force-pushed the SPARK-6910 branch 2 times, most recently from 917c3ee to c0b82be Compare June 28, 2015 06:49
@SparkQA
Copy link

SparkQA commented Jun 28, 2015

Test build #35924 has finished for PR 6921 at commit c0b82be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu piaozhexiu changed the title [SPARK-6910] [SQL] [WIP] Support for pushing predicates down to metastore for partition pruning [SPARK-6910] [SQL] Support for pushing predicates down to metastore for partition pruning Jun 28, 2015
@piaozhexiu
Copy link
Author

@marmbrus I got all the unit tests passing now. Can you please review my patch? This is no longer WIP.

I incorporated all your suggestions. A couple of things that I should mention-

  • All the BinaryComparison operators (=, <, >, <=, >=) are supported for both integral and string types.
  • In v0.13, integral types should not be encoded as string. For eg, int_key = "1" throws an error while int_key = 1 works. In fact, this was why I was seeing many unit tests failures before. Fixing it makes all unit tests pass now.
  • I haven't tested my patch with v0.12.
  • I tested my patch with v0.11 metastore server, and I got an error. But if I encode integral types as string (i.e. int_key = "1"), the error goes away. So this is a different behavior from v0.13. Do we need to worry about v0.11?
  • Predicates are not pushed down when spark.sql.hive.convertMetastoreParquet is set true. This is because in HiveMetastoreCatalog, the full list of partitions is needed to convert Hive metadata into Parquet (before HiveTableScan is invoked). Please let me know if this can be optimized.
  • Hive varchar type cannot be pushed down into the Hive metastore, so I had to remove varchar keys from partition pruning predicates in HiveTableScan. But since Catalyst treats varchar as string, it was not possible to tell whether an expression whose data type is Catalyst string is a Hive varchar or string. So I needed to pass in the Hive partition keys schema in addition to raw Catalyst Expression objects to HiveShim.

Thanks!

@piaozhexiu
Copy link
Author

@marmbrus While digging into Hive commit log, I found this HIVE-4888. Binary comparison operators have been supported by getPartitionsByFilter() since Hive 0.13.

So my patch will break backward compatibility with Hive 0.12 and older. I think the best we can do is to disable predicate pushdown in metastore if spark.sql.hive.metastore.version is lower than 0.13. Since we're already implementing this in HiveShim, code will be isolated and can be easily removed when dropping support for old Hive versions in the future.

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #35947 has finished for PR 6921 at commit 0ebfecc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu
Copy link
Author

I added a check for Hive metastore version, and if it is lower than 0.13, predicates are not pushed down.

I think I handled all the cases now. Ready for review.

@piaozhexiu
Copy link
Author

One last update (hopefully). I realized that I can push down predicates even when convertMetastoreParquet is true. Basically, the same can be done in ParquetConversion as in HiveTableScan. I just pushed the update for this.

Here is the summary of the current status:

Case Predicate pushdown
Integral and string types Yes
Varchar type No
Hive 0.13 and newer Yes
Hive 0.12 and older No
convertMetastoreParquet=true on partitioned table Yes
convertMetastoreParquet=true on non-partitioned table No

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #36012 has finished for PR 6921 at commit 1a013c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #36026 has finished for PR 6921 at commit f6f26af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu
Copy link
Author

This is weird. A couple of different tests randomly fail at each run. I am wondering whether this is because derby db intermittently fails during unit tests.

I'll force another run by rebasing the patch.

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36044 has finished for PR 6921 at commit 05f8e16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@piaozhexiu
Copy link
Author

Closing as it is superseded by #7216.

@piaozhexiu piaozhexiu closed this Jul 3, 2015
marmbrus pushed a commit to marmbrus/spark that referenced this pull request Jul 14, 2015
…or partition pruning

This PR supersedes my old one apache#6921. Since my patch has changed quite a bit, I am opening a new PR to make it easier to review.

The changes include-
* Implement `toMetastoreFilter()` function in `HiveShim` that takes `Seq[Expression]` and converts them into a filter string for Hive metastore.
 * This functions matches all the `AttributeReference` + `BinaryComparisonOp` + `Integral/StringType` patterns in `Seq[Expression]` and fold them into a string.
* Change `hiveQlPartitions` field in `MetastoreRelation` to `getHiveQlPartitions()` function that takes a filter string parameter.
* Call `getHiveQlPartitions()` in `HiveTableScan` with a filter string.

But there are some cases in which predicate pushdown is disabled-

Case | Predicate pushdown
------- | -----------------------------
Hive integral and string types | Yes
Hive varchar type | No
Hive 0.13 and newer | Yes
Hive 0.12 and older | No
convertMetastoreParquet=false | Yes
convertMetastoreParquet=true | No

In case of `convertMetastoreParquet=true`, predicates are not pushed down because this conversion happens in an `Analyzer` rule (`HiveMetastoreCatalog.ParquetConversions`). At this point, `HiveTableScan` hasn't run, so predicates are not available. But reading the source code, I think it is intentional to convert the entire Hive table w/ all the partitions into `ParquetRelation` because then `ParquetRelation` can be cached and reused for any query against that table. Please correct me if I am wrong.

cc marmbrus

Author: Cheolsoo Park <[email protected]>

Closes apache#7216 from piaozhexiu/SPARK-6910-2 and squashes the following commits:

aa1490f [Cheolsoo Park] Fix ordering of imports
c212c4d [Cheolsoo Park] Incorporate review comments
5e93f9d [Cheolsoo Park] Predicate pushdown into Hive metastore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants