Skip to content

Commit 37f2d96

Browse files
committed
[SPARK-9027] [SQL] Generalize metastore predicate pushdown
Add support for pushing down metastore filters that are in different orders and add some unit tests. Author: Michael Armbrust <[email protected]> Closes apache#7386 from marmbrus/metastoreFilters and squashes the following commits: 05a4524 [Michael Armbrust] [SPARK-9027][SQL] Generalize metastore predicate pushdown
1 parent 59d820a commit 37f2d96

File tree

2 files changed

+107
-25
lines changed

2 files changed

+107
-25
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
3434
import org.apache.hadoop.hive.serde.serdeConstants
3535

3636
import org.apache.spark.Logging
37-
import org.apache.spark.sql.catalyst.expressions.{Expression, AttributeReference, BinaryComparison}
37+
import org.apache.spark.sql.catalyst.expressions._
3838
import org.apache.spark.sql.types.{StringType, IntegralType}
3939

4040
/**
@@ -312,37 +312,41 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
312312
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
313313
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
314314

315-
override def getPartitionsByFilter(
316-
hive: Hive,
317-
table: Table,
318-
predicates: Seq[Expression]): Seq[Partition] = {
315+
/**
316+
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
317+
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
318+
*
319+
* Unsupported predicates are skipped.
320+
*/
321+
def convertFilters(table: Table, filters: Seq[Expression]): String = {
319322
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
320323
val varcharKeys = table.getPartitionKeys
321324
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME))
322325
.map(col => col.getName).toSet
323326

324-
// Hive getPartitionsByFilter() takes a string that represents partition
325-
// predicates like "str_key=\"value\" and int_key=1 ..."
326-
val filter = predicates.flatMap { expr =>
327-
expr match {
328-
case op @ BinaryComparison(lhs, rhs) => {
329-
lhs match {
330-
case AttributeReference(_, _, _, _) => {
331-
rhs.dataType match {
332-
case _: IntegralType =>
333-
Some(lhs.prettyString + op.symbol + rhs.prettyString)
334-
case _: StringType if (!varcharKeys.contains(lhs.prettyString)) =>
335-
Some(lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"")
336-
case _ => None
337-
}
338-
}
339-
case _ => None
340-
}
341-
}
342-
case _ => None
343-
}
327+
filters.collect {
328+
case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
329+
s"${a.name} ${op.symbol} $v"
330+
case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
331+
s"$v ${op.symbol} ${a.name}"
332+
333+
case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
334+
if !varcharKeys.contains(a.name) =>
335+
s"""${a.name} ${op.symbol} "$v""""
336+
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
337+
if !varcharKeys.contains(a.name) =>
338+
s""""$v" ${op.symbol} ${a.name}"""
344339
}.mkString(" and ")
340+
}
341+
342+
override def getPartitionsByFilter(
343+
hive: Hive,
344+
table: Table,
345+
predicates: Seq[Expression]): Seq[Partition] = {
345346

347+
// Hive getPartitionsByFilter() takes a string that represents partition
348+
// predicates like "str_key=\"value\" and int_key=1 ..."
349+
val filter = convertFilters(table, predicates)
346350
val partitions =
347351
if (filter.isEmpty) {
348352
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.client
19+
20+
import scala.collection.JavaConversions._
21+
22+
import org.apache.hadoop.hive.metastore.api.FieldSchema
23+
import org.apache.hadoop.hive.serde.serdeConstants
24+
25+
import org.apache.spark.{Logging, SparkFunSuite}
26+
import org.apache.spark.sql.catalyst.dsl.expressions._
27+
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.types._
29+
30+
/**
31+
* A set of tests for the filter conversion logic used when pushing partition pruning into the
32+
* metastore
33+
*/
34+
class FiltersSuite extends SparkFunSuite with Logging {
35+
private val shim = new Shim_v0_13
36+
37+
private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
38+
private val varCharCol = new FieldSchema()
39+
varCharCol.setName("varchar")
40+
varCharCol.setType(serdeConstants.VARCHAR_TYPE_NAME)
41+
testTable.setPartCols(varCharCol :: Nil)
42+
43+
filterTest("string filter",
44+
(a("stringcol", StringType) > Literal("test")) :: Nil,
45+
"stringcol > \"test\"")
46+
47+
filterTest("string filter backwards",
48+
(Literal("test") > a("stringcol", StringType)) :: Nil,
49+
"\"test\" > stringcol")
50+
51+
filterTest("int filter",
52+
(a("intcol", IntegerType) === Literal(1)) :: Nil,
53+
"intcol = 1")
54+
55+
filterTest("int filter backwards",
56+
(Literal(1) === a("intcol", IntegerType)) :: Nil,
57+
"1 = intcol")
58+
59+
filterTest("int and string filter",
60+
(Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil,
61+
"1 = intcol and \"a\" = strcol")
62+
63+
filterTest("skip varchar",
64+
(Literal("") === a("varchar", StringType)) :: Nil,
65+
"")
66+
67+
private def filterTest(name: String, filters: Seq[Expression], result: String) = {
68+
test(name){
69+
val converted = shim.convertFilters(testTable, filters)
70+
if (converted != result) {
71+
fail(
72+
s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
73+
}
74+
}
75+
}
76+
77+
private def a(name: String, dataType: DataType) = AttributeReference(name, dataType)()
78+
}

0 commit comments

Comments
 (0)