@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
3434import org .apache .hadoop .hive .serde .serdeConstants
3535
3636import org .apache .spark .Logging
37- import org .apache .spark .sql .catalyst .expressions ._
37+ import org .apache .spark .sql .catalyst .expressions .{ Expression , AttributeReference , BinaryComparison }
3838import org .apache .spark .sql .types .{StringType , IntegralType }
3939
4040/**
@@ -312,41 +312,37 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
312312 override def getAllPartitions (hive : Hive , table : Table ): Seq [Partition ] =
313313 getAllPartitionsMethod.invoke(hive, table).asInstanceOf [JSet [Partition ]].toSeq
314314
315- /**
316- * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
317- * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
318- *
319- * Unsupported predicates are skipped.
320- */
321- def convertFilters (table : Table , filters : Seq [Expression ]): String = {
322- // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
323- val varcharKeys = table.getPartitionKeys
324- .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME ))
325- .map(col => col.getName).toSet
326-
327- filters.collect {
328- case op @ BinaryComparison (a : Attribute , Literal (v, _ : IntegralType )) =>
329- s " ${a.name} ${op.symbol} $v"
330- case op @ BinaryComparison (Literal (v, _ : IntegralType ), a : Attribute ) =>
331- s " $v ${op.symbol} ${a.name}"
332-
333- case op @ BinaryComparison (a : Attribute , Literal (v, _ : StringType ))
334- if ! varcharKeys.contains(a.name) =>
335- s """ ${a.name} ${op.symbol} " $v" """
336- case op @ BinaryComparison (Literal (v, _ : StringType ), a : Attribute )
337- if ! varcharKeys.contains(a.name) =>
338- s """ " $v" ${op.symbol} ${a.name}"""
339- }.mkString(" and " )
340- }
341-
342315 override def getPartitionsByFilter (
343316 hive : Hive ,
344317 table : Table ,
345318 predicates : Seq [Expression ]): Seq [Partition ] = {
319+ // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
320+ val varcharKeys = table.getPartitionKeys
321+ .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME ))
322+ .map(col => col.getName).toSet
346323
347324 // Hive getPartitionsByFilter() takes a string that represents partition
348325 // predicates like "str_key=\"value\" and int_key=1 ..."
349- val filter = convertFilters(table, predicates)
326+ val filter = predicates.flatMap { expr =>
327+ expr match {
328+ case op @ BinaryComparison (lhs, rhs) => {
329+ lhs match {
330+ case AttributeReference (_, _, _, _) => {
331+ rhs.dataType match {
332+ case _ : IntegralType =>
333+ Some (lhs.prettyString + op.symbol + rhs.prettyString)
334+ case _ : StringType if (! varcharKeys.contains(lhs.prettyString)) =>
335+ Some (lhs.prettyString + op.symbol + " \" " + rhs.prettyString + " \" " )
336+ case _ => None
337+ }
338+ }
339+ case _ => None
340+ }
341+ }
342+ case _ => None
343+ }
344+ }.mkString(" and " )
345+
350346 val partitions =
351347 if (filter.isEmpty) {
352348 getAllPartitionsMethod.invoke(hive, table).asInstanceOf [JSet [Partition ]]
0 commit comments