From ca460429692191d75fe077a598418e59ba4aca8e Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 26 Jun 2015 21:04:52 -0700 Subject: [PATCH 1/6] Implement predicate pushdown for hive metastore catalog --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 56 ++++++++++--------- .../org/apache/spark/sql/hive/HiveShim.scala | 29 ++++++++++ .../spark/sql/hive/HiveStrategies.scala | 4 +- .../sql/hive/client/ClientInterface.scala | 10 +++- .../spark/sql/hive/client/ClientWrapper.scala | 18 +++--- .../spark/sql/hive/client/HiveShim.scala | 22 ++++++++ .../sql/hive/execution/HiveTableScan.scala | 10 +++- .../spark/sql/hive/client/VersionsSuite.scala | 4 ++ .../sql/hive/execution/PruningSuite.scala | 2 +- 9 files changed, 114 insertions(+), 41 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f35ae96ee0b5..c70666805c56 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -300,7 +300,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) - val partitions = metastoreRelation.hiveQlPartitions.map { p => + val partitions = metastoreRelation.getHiveQlPartitions(None).map { p => val location = p.getLocation val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map { case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) @@ -643,32 +643,6 @@ private[hive] case class MetastoreRelation new Table(tTable) } - @transient val hiveQlPartitions: Seq[Partition] = table.getAllPartitions.map { p => - val tPartition = new org.apache.hadoop.hive.metastore.api.Partition - tPartition.setDbName(databaseName) - tPartition.setTableName(tableName) - tPartition.setValues(p.values) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tPartition.setSd(sd) - sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) - - sd.setLocation(p.storage.location) - sd.setInputFormat(p.storage.inputFormat) - sd.setOutputFormat(p.storage.outputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - sd.setSerdeInfo(serdeInfo) - serdeInfo.setSerializationLib(p.storage.serde) - - val serdeParameters = new java.util.HashMap[String, String]() - serdeInfo.setParameters(serdeParameters) - table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } - - new Partition(hiveQlTable, tPartition) - } - @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = { val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) @@ -689,6 +663,34 @@ private[hive] case class MetastoreRelation } ) + def getHiveQlPartitions(filter: Option[String]): Seq[Partition] = { + table.getPartitions(filter).map { p => + val tPartition = new org.apache.hadoop.hive.metastore.api.Partition + tPartition.setDbName(databaseName) + tPartition.setTableName(tableName) + tPartition.setValues(p.values) + + val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() + tPartition.setSd(sd) + sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) + + sd.setLocation(p.storage.location) + sd.setInputFormat(p.storage.inputFormat) + sd.setOutputFormat(p.storage.outputFormat) + + val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo + sd.setSerdeInfo(serdeInfo) + serdeInfo.setSerializationLib(p.storage.serde) + + val serdeParameters = new java.util.HashMap[String, String]() + serdeInfo.setParameters(serdeParameters) + table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) } + + new Partition(hiveQlTable, tPartition) + } + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index d08c59415165..ecd068358e1b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -27,6 +27,7 @@ import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObject import org.apache.hadoop.io.Writable import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Expression} import org.apache.spark.sql.types.Decimal import org.apache.spark.util.Utils @@ -99,6 +101,33 @@ private[hive] object HiveShim { } } + def toMetastoreFilter(predicates: Seq[Expression]): Option[String] = { + if (predicates.nonEmpty) { + // Hive getPartitionsByFilter() takes a string that represents partition + // predicates like "str_key_1=\"value_1\" and int_key_2=value_2 ..." + Some(predicates.foldLeft("") { + (str, expr) => { + expr match { + case op @ BinaryComparison(lhs, rhs) => { + val hiveFriendlyExpr = + lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" + if (str.isEmpty) { + s"$hiveFriendlyExpr" + } else { + s"$str and $hiveFriendlyExpr" + } + } + case _ => { + str + } + } + } + }) + } else { + None + } + } + /** * This class provides the UDF creation and also the UDF instance serialization and * de-serialization cross process boundary. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 452b7f0bcc74..af556787a31b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -124,7 +124,7 @@ private[hive] trait HiveStrategies { InterpretedPredicate.create(castedPredicate) } - val partitions = relation.hiveQlPartitions.filter { part => + val partitions = relation.getHiveQlPartitions(None).filter { part => val partitionValues = part.getValues var i = 0 while (i < partitionValues.size()) { @@ -212,7 +212,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil + HiveTableScan(_, relation, pruningPredicates)(hiveContext)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 0a1d761a52f8..5f62b6f6d9ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -71,7 +71,12 @@ private[hive] case class HiveTable( def isPartitioned: Boolean = partitionColumns.nonEmpty - def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this) + def getPartitions(filter: Option[String]): Seq[HivePartition] = { + filter match { + case None => client.getAllPartitions(this) + case Some(expr) => client.getPartitionsByFilter(this, expr) + } + } // Hive does not support backticks when passing names to the client. def qualifiedName: String = s"$database.$name" @@ -132,6 +137,9 @@ private[hive] trait ClientInterface { /** Returns all partitions for the given table. */ def getAllPartitions(hTable: HiveTable): Seq[HivePartition] + /** Returns partitions filtered by predicates for the given table. */ + def getPartitionsByFilter(hTable: HiveTable, filter: String): Seq[HivePartition] + /** Loads a static partition into an existing table. */ def loadPartition( loadPath: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index cbd2bf6b5eed..8def11f267b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import java.io.{BufferedReader, InputStreamReader, File, PrintStream} +import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.net.URI import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet} import javax.annotation.concurrent.GuardedBy @@ -28,16 +28,13 @@ import scala.collection.JavaConversions._ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.metastore.api.Database import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema} import org.apache.hadoop.hive.metastore.{TableType => HTableType} -import org.apache.hadoop.hive.metastore.api -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.processors._ -import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.{Driver, metadata} import org.apache.spark.Logging import org.apache.spark.sql.execution.QueryExecutionException @@ -316,6 +313,13 @@ private[hive] class ClientWrapper( shim.getAllPartitions(client, qlTable).map(toHivePartition) } + override def getPartitionsByFilter( + hTable: HiveTable, + filter: String): Seq[HivePartition] = withHiveState { + val qlTable = toQlTable(hTable) + shim.getPartitionsByFilter(client, qlTable, filter).map(toHivePartition) + } + override def listTables(dbName: String): Seq[String] = withHiveState { client.getAllTables(dbName) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 1fa9d278e2a5..ba91d31b0ca6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -61,6 +61,8 @@ private[client] sealed abstract class Shim { def getAllPartitions(hive: Hive, table: Table): Seq[Partition] + def getPartitionsByFilter(hive: Hive, table: Table, filter: String): Seq[Partition] + def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor def getDriverResults(driver: Driver): Seq[String] @@ -127,6 +129,12 @@ private[client] class Shim_v0_12 extends Shim { classOf[Hive], "getAllPartitionsForPruner", classOf[Table]) + private lazy val getPartitionsByFilterMethod = + findMethod( + classOf[Hive], + "getPartitionsByFilter", + classOf[Table], + classOf[String]) private lazy val getCommandProcessorMethod = findStaticMethod( classOf[CommandProcessorFactory], @@ -196,6 +204,10 @@ private[client] class Shim_v0_12 extends Shim { override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + override def getPartitionsByFilter(hive: Hive, table: Table, filter: String): Seq[Partition] = + getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] + .toSeq + override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] @@ -267,6 +279,12 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { classOf[Hive], "getAllPartitionsOf", classOf[Table]) + private lazy val getPartitionsByFilterMethod = + findMethod( + classOf[Hive], + "getPartitionsByFilter", + classOf[Table], + classOf[String]) private lazy val getCommandProcessorMethod = findStaticMethod( classOf[CommandProcessorFactory], @@ -288,6 +306,10 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq + override def getPartitionsByFilter(hive: Hive, table: Table, filter: String): Seq[Partition] = + getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] + .toSeq + override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index f4c8c9a7e8a6..40346920de98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -43,7 +43,7 @@ private[hive] case class HiveTableScan( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, - partitionPruningPred: Option[Expression])( + partitionPruningPred: Seq[Expression])( @transient val context: HiveContext) extends LeafNode { @@ -53,9 +53,11 @@ case class HiveTableScan( // Retrieve the original attributes based on expression ID so that capitalization matches. val attributes = requestedAttributes.map(relation.attributeMap) + private[this] val metastoreFilter = HiveShim.toMetastoreFilter(partitionPruningPred) + // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. - private[this] val boundPruningPred = partitionPruningPred.map { pred => + private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => require( pred.dataType == BooleanType, s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") @@ -132,7 +134,9 @@ case class HiveTableScan( protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) } else { - hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) + logDebug(s"Hive metastore filter is $metastoreFilter") + hadoopReader.makeRDDForPartitionedTable( + prunePartitions(relation.getHiveQlPartitions(metastoreFilter))) } override def output: Seq[Attribute] = attributes diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d52e162acbd0..b536f86ae74a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -151,6 +151,10 @@ class VersionsSuite extends SparkFunSuite with Logging { client.getAllPartitions(client.getTable("default", "src_part")) } + test(s"$version: getPartitionsByFilter") { + client.getPartitionsByFilter(client.getTable("default", "src_part"), "key = 1") + } + test(s"$version: loadPartition") { client.loadPartition( emptyDir, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index de6a41ce5bfc..e0c6f34a30e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) val partValues = if (relation.table.isPartitioned) { - p.prunePartitions(relation.hiveQlPartitions).map(_.getValues) + p.prunePartitions(relation.getHiveQlPartitions(None)).map(_.getValues) } else { Seq.empty } From cf7c6e52307ba04e6ee2b4c8377ae77f878c8431 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 26 Jun 2015 23:30:04 -0700 Subject: [PATCH 2/6] Check data types of expressions and push down string and integral types only --- .../org/apache/spark/sql/hive/HiveShim.scala | 76 ++++++++++++------- .../sql/hive/execution/HiveTableScan.scala | 3 +- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index ecd068358e1b..98d977abe9f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -17,31 +17,32 @@ package org.apache.spark.sql.hive -import java.io.{InputStream, OutputStream} -import java.rmi.server.UID - -/* Implicit conversions */ -import scala.collection.JavaConversions._ -import scala.language.implicitConversions -import scala.reflect.ClassTag - import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} +import java.io.{InputStream, OutputStream} +import java.rmi.server.UID + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector import org.apache.hadoop.io.Writable import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Expression} -import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BinaryComparison, Expression} +import org.apache.spark.sql.types.{StringType, IntegralType, Decimal} import org.apache.spark.util.Utils +/* Implicit conversions */ +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + private[hive] object HiveShim { // Precision and scale to pass for unlimited decimals; these are the same as the precision and // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs) @@ -101,28 +102,51 @@ private[hive] object HiveShim { } } - def toMetastoreFilter(predicates: Seq[Expression]): Option[String] = { - if (predicates.nonEmpty) { - // Hive getPartitionsByFilter() takes a string that represents partition - // predicates like "str_key_1=\"value_1\" and int_key_2=value_2 ..." - Some(predicates.foldLeft("") { - (str, expr) => { - expr match { - case op @ BinaryComparison(lhs, rhs) => { - val hiveFriendlyExpr = - lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" - if (str.isEmpty) { - s"$hiveFriendlyExpr" + def toMetastoreFilter( + predicates: Seq[Expression], + partitionKeyTypes: Map[String, String]): Option[String] = { + // Hive getPartitionsByFilter() takes a string that represents partition + // predicates like "str_key=\"value\" and int_key=1 ..." + val filter = predicates.foldLeft("") { + (str, expr) => { + expr match { + case op @ BinaryComparison(lhs, rhs) => { + val cond: String = + lhs match { + case AttributeReference(_,_,_,_) => { + rhs.dataType match { + case _: IntegralType => + lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" + case StringType => { + // hive varchar is string type in catalyst, but varchar cannot be pushed down. + if (!partitionKeyTypes.getOrElse(lhs.prettyString, "").startsWith( + serdeConstants.VARCHAR_TYPE_NAME)) { + lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" + } else { + "" + } + } + case _ => "" + } + } + case _ => "" + } + if (cond.nonEmpty) { + if (str.nonEmpty) { + s"$str and $cond" } else { - s"$str and $hiveFriendlyExpr" + cond } - } - case _ => { + } else { str } } + case _ => str } - }) + } + } + if (filter.nonEmpty) { + Some(filter) } else { None } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 40346920de98..94a34b08c0d6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -53,7 +53,8 @@ case class HiveTableScan( // Retrieve the original attributes based on expression ID so that capitalization matches. val attributes = requestedAttributes.map(relation.attributeMap) - private[this] val metastoreFilter = HiveShim.toMetastoreFilter(partitionPruningPred) + private[this] val metastoreFilter = HiveShim.toMetastoreFilter(partitionPruningPred, + relation.hiveQlTable.getPartitionKeys.map(col => (col.getName, col.getType)).toMap) // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. From 2ff627766e491d9a28d3cf7f87e03b429b2d22cc Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Sat, 27 Jun 2015 20:44:14 -0700 Subject: [PATCH 3/6] Fix stylecheck --- .../org/apache/spark/sql/hive/HiveShim.scala | 22 +++++++++++++------ .../sql/hive/execution/HiveTableScan.scala | 4 ++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 98d977abe9f1..e9d6c3531665 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -22,9 +22,11 @@ import com.esotericsoftware.kryo.io.{Input, Output} import java.io.{InputStream, OutputStream} import java.rmi.server.UID +import java.util.List import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.serde.serdeConstants @@ -104,7 +106,13 @@ private[hive] object HiveShim { def toMetastoreFilter( predicates: Seq[Expression], - partitionKeyTypes: Map[String, String]): Option[String] = { + partitionKeys: List[FieldSchema]): Option[String] = { + + val varcharKeys = partitionKeys + .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) + .map(col => col.getName) + .toSet + // Hive getPartitionsByFilter() takes a string that represents partition // predicates like "str_key=\"value\" and int_key=1 ..." val filter = predicates.foldLeft("") { @@ -113,14 +121,14 @@ private[hive] object HiveShim { case op @ BinaryComparison(lhs, rhs) => { val cond: String = lhs match { - case AttributeReference(_,_,_,_) => { + case AttributeReference(_, _, _, _) => { rhs.dataType match { case _: IntegralType => - lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" - case StringType => { - // hive varchar is string type in catalyst, but varchar cannot be pushed down. - if (!partitionKeyTypes.getOrElse(lhs.prettyString, "").startsWith( - serdeConstants.VARCHAR_TYPE_NAME)) { + lhs.prettyString + op.symbol + rhs.prettyString + case _: StringType => { + // hive varchar is treated as string in catalyst, + // but varchar cannot be pushed down. + if (!varcharKeys.contains(lhs.prettyString)) { lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"" } else { "" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 94a34b08c0d6..ddb13706d60c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -53,8 +53,8 @@ case class HiveTableScan( // Retrieve the original attributes based on expression ID so that capitalization matches. val attributes = requestedAttributes.map(relation.attributeMap) - private[this] val metastoreFilter = HiveShim.toMetastoreFilter(partitionPruningPred, - relation.hiveQlTable.getPartitionKeys.map(col => (col.getName, col.getType)).toMap) + private[this] val metastoreFilter = + HiveShim.toMetastoreFilter(partitionPruningPred, relation.hiveQlTable.getPartitionKeys) // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. From ead139a4948d4aa5d10905cdb6fc7adbe3b12465 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Sun, 28 Jun 2015 17:03:25 -0700 Subject: [PATCH 4/6] Disable predicate pushdown when hive metastore version is older than 0.13 --- .../scala/org/apache/spark/sql/hive/HiveShim.scala | 12 +++++++++++- .../spark/sql/hive/execution/HiveTableScan.scala | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index e9d6c3531665..d7bd75e12e08 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -106,7 +106,17 @@ private[hive] object HiveShim { def toMetastoreFilter( predicates: Seq[Expression], - partitionKeys: List[FieldSchema]): Option[String] = { + partitionKeys: List[FieldSchema], + hiveMetastoreVersion: String): Option[String] = { + + // Binary comparison operators such as >, <, >=, and <= started being supported by + // getPartitionsByFilter() in 0.13. So if Hive matastore version is older than 0.13, + // no predicate is pushed down. See HIVE-4888. + val versionPattern = "([\\d]+\\.[\\d]+).*".r + hiveMetastoreVersion match { + case versionPattern(version) => if (version.toDouble < 0.13) return None + case _ => // continue + } val varcharKeys = partitionKeys .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index ddb13706d60c..8c31b6cd8051 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -54,7 +54,10 @@ case class HiveTableScan( val attributes = requestedAttributes.map(relation.attributeMap) private[this] val metastoreFilter = - HiveShim.toMetastoreFilter(partitionPruningPred, relation.hiveQlTable.getPartitionKeys) + HiveShim.toMetastoreFilter( + partitionPruningPred, + relation.hiveQlTable.getPartitionKeys, + context.hiveMetastoreVersion) // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. From 095672f00ca23c5d071650f3d00657dfe20f38be Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 29 Jun 2015 11:34:50 -0700 Subject: [PATCH 5/6] Push down predicates when convertMetastoreParquet=true --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 ++ .../main/scala/org/apache/spark/sql/hive/HiveShim.scala | 6 +++--- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 9 ++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c70666805c56..d6f89ef6b1aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -300,6 +300,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) + // This is for non-partitioned tables, so partition pruning predicates + // cannot be pushed into Hive metastore. val partitions = metastoreRelation.getHiveQlPartitions(None).map { p => val location = p.getLocation val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index d7bd75e12e08..fce6696b91b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -109,9 +109,9 @@ private[hive] object HiveShim { partitionKeys: List[FieldSchema], hiveMetastoreVersion: String): Option[String] = { - // Binary comparison operators such as >, <, >=, and <= started being supported by - // getPartitionsByFilter() in 0.13. So if Hive matastore version is older than 0.13, - // no predicate is pushed down. See HIVE-4888. + // Binary comparison operators have been supported in getPartitionsByFilter() since 0.13. + // So if Hive matastore version is older than 0.13, predicates cannot be pushed down. + // See HIVE-4888. val versionPattern = "([\\d]+\\.[\\d]+).*".r hiveMetastoreVersion match { case versionPattern(version) => if (version.toDouble < 0.13) return None diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index af556787a31b..22ac7fc4660a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -106,6 +106,13 @@ private[hive] trait HiveStrategies { try { if (relation.hiveQlTable.isPartitioned) { + + val metastoreFilter = + HiveShim.toMetastoreFilter( + pruningPredicates, + relation.hiveQlTable.getPartitionKeys, + hiveContext.hiveMetastoreVersion) + val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) // Translate the predicate so that it automatically casts the input values to the // correct data types during evaluation. @@ -124,7 +131,7 @@ private[hive] trait HiveStrategies { InterpretedPredicate.create(castedPredicate) } - val partitions = relation.getHiveQlPartitions(None).filter { part => + val partitions = relation.getHiveQlPartitions(metastoreFilter).filter { part => val partitionValues = part.getValues var i = 0 while (i < partitionValues.size()) { From 05f8e16626bfae33428119245af7ad1138d32ccd Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Mon, 29 Jun 2015 13:19:11 -0700 Subject: [PATCH 6/6] Remove an empty line --- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 22ac7fc4660a..cb1bc6d4d374 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -106,7 +106,6 @@ private[hive] trait HiveStrategies { try { if (relation.hiveQlTable.isPartitioned) { - val metastoreFilter = HiveShim.toMetastoreFilter( pruningPredicates,