Skip to content

Commit c6b1a9e

Browse files
marmbrusrxin
authored andcommitted
Revert SPARK-6910 and SPARK-9027
Revert apache#7216 and apache#7386. These patch seems to be causing quite a few test failures: ``` Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor322.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:351) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:320) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:180) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:135) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:172) at org.apache.spark.sql.hive.client.ClientWrapper.getPartitionsByFilter(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.HiveTable.getPartitions(ClientInterface.scala:78) at org.apache.spark.sql.hive.MetastoreRelation.getHiveQlPartitions(HiveMetastoreCatalog.scala:670) at org.apache.spark.sql.hive.execution.HiveTableScan.doExecute(HiveTableScan.scala:137) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:89) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:164) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:151) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 85 more Caused by: MetaException(message:Filtering is supported only on partition keys of type string) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:185) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:452) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:357) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:279) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$TreeNode.generateJDOFilter(ExpressionTree.java:243) at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:590) at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:2417) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2029) at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:146) at org.apache.hadoop.hive.metastore.ObjectStore$4.getJdoResult(ObjectStore.java:2332) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-Maven-with-YARN/2945/HADOOP_PROFILE=hadoop-2.4,label=centos/testReport/junit/org.apache.spark.sql.hive.execution/SortMergeCompatibilitySuite/auto_sortmerge_join_16/ Author: Michael Armbrust <[email protected]> Closes apache#7409 from marmbrus/revertMetastorePushdown and squashes the following commits: 92fabd3 [Michael Armbrust] Revert SPARK-6910 and SPARK-9027 5d3bdf2 [Michael Armbrust] Revert "[SPARK-9027] [SQL] Generalize metastore predicate pushdown"
1 parent f23a721 commit c6b1a9e

File tree

10 files changed

+44
-218
lines changed

10 files changed

+44
-218
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
301301
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
302302
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
303303
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
304-
// We're converting the entire table into ParquetRelation, so predicates to Hive metastore
305-
// are empty.
306-
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
304+
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
307305
val location = p.getLocation
308306
val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
309307
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
@@ -646,6 +644,32 @@ private[hive] case class MetastoreRelation
646644
new Table(tTable)
647645
}
648646

647+
@transient val hiveQlPartitions: Seq[Partition] = table.getAllPartitions.map { p =>
648+
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
649+
tPartition.setDbName(databaseName)
650+
tPartition.setTableName(tableName)
651+
tPartition.setValues(p.values)
652+
653+
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
654+
tPartition.setSd(sd)
655+
sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
656+
657+
sd.setLocation(p.storage.location)
658+
sd.setInputFormat(p.storage.inputFormat)
659+
sd.setOutputFormat(p.storage.outputFormat)
660+
661+
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
662+
sd.setSerdeInfo(serdeInfo)
663+
serdeInfo.setSerializationLib(p.storage.serde)
664+
665+
val serdeParameters = new java.util.HashMap[String, String]()
666+
serdeInfo.setParameters(serdeParameters)
667+
table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
668+
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
669+
670+
new Partition(hiveQlTable, tPartition)
671+
}
672+
649673
@transient override lazy val statistics: Statistics = Statistics(
650674
sizeInBytes = {
651675
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
@@ -666,34 +690,6 @@ private[hive] case class MetastoreRelation
666690
}
667691
)
668692

669-
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
670-
table.getPartitions(predicates).map { p =>
671-
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
672-
tPartition.setDbName(databaseName)
673-
tPartition.setTableName(tableName)
674-
tPartition.setValues(p.values)
675-
676-
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
677-
tPartition.setSd(sd)
678-
sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
679-
680-
sd.setLocation(p.storage.location)
681-
sd.setInputFormat(p.storage.inputFormat)
682-
sd.setOutputFormat(p.storage.outputFormat)
683-
684-
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
685-
sd.setSerdeInfo(serdeInfo)
686-
serdeInfo.setSerializationLib(p.storage.serde)
687-
688-
val serdeParameters = new java.util.HashMap[String, String]()
689-
serdeInfo.setParameters(serdeParameters)
690-
table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
691-
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
692-
693-
new Partition(hiveQlTable, tPartition)
694-
}
695-
}
696-
697693
/** Only compare database and tablename, not alias. */
698694
override def sameResult(plan: LogicalPlan): Boolean = {
699695
plan match {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.reflect.ClassTag
2727

2828
import com.esotericsoftware.kryo.Kryo
2929
import com.esotericsoftware.kryo.io.{Input, Output}
30-
3130
import org.apache.hadoop.conf.Configuration
3231
import org.apache.hadoop.fs.Path
3332
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private[hive] trait HiveStrategies {
125125
InterpretedPredicate.create(castedPredicate)
126126
}
127127

128-
val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
128+
val partitions = relation.hiveQlPartitions.filter { part =>
129129
val partitionValues = part.getValues
130130
var i = 0
131131
while (i < partitionValues.size()) {
@@ -213,7 +213,7 @@ private[hive] trait HiveStrategies {
213213
projectList,
214214
otherPredicates,
215215
identity[Seq[Expression]],
216-
HiveTableScan(_, relation, pruningPredicates)(hiveContext)) :: Nil
216+
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
217217
case _ =>
218218
Nil
219219
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.PrintStream
2121
import java.util.{Map => JMap}
2222

2323
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
24-
import org.apache.spark.sql.catalyst.expressions.Expression
2524

2625
private[hive] case class HiveDatabase(
2726
name: String,
@@ -72,12 +71,7 @@ private[hive] case class HiveTable(
7271

7372
def isPartitioned: Boolean = partitionColumns.nonEmpty
7473

75-
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
76-
predicates match {
77-
case Nil => client.getAllPartitions(this)
78-
case _ => client.getPartitionsByFilter(this, predicates)
79-
}
80-
}
74+
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
8175

8276
// Hive does not support backticks when passing names to the client.
8377
def qualifiedName: String = s"$database.$name"
@@ -138,9 +132,6 @@ private[hive] trait ClientInterface {
138132
/** Returns all partitions for the given table. */
139133
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
140134

141-
/** Returns partitions filtered by predicates for the given table. */
142-
def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
143-
144135
/** Loads a static partition into an existing table. */
145136
def loadPartition(
146137
loadPath: String,

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,25 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20-
import java.io.{File, PrintStream}
21-
import java.util.{Map => JMap}
20+
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
21+
import java.net.URI
22+
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
2223
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.collection.JavaConversions._
2526
import scala.language.reflectiveCalls
2627

2728
import org.apache.hadoop.fs.Path
29+
import org.apache.hadoop.hive.metastore.api.Database
2830
import org.apache.hadoop.hive.conf.HiveConf
29-
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
3031
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
32+
import org.apache.hadoop.hive.metastore.api
33+
import org.apache.hadoop.hive.metastore.api.FieldSchema
34+
import org.apache.hadoop.hive.ql.metadata
3135
import org.apache.hadoop.hive.ql.metadata.Hive
32-
import org.apache.hadoop.hive.ql.processors._
3336
import org.apache.hadoop.hive.ql.session.SessionState
34-
import org.apache.hadoop.hive.ql.{Driver, metadata}
37+
import org.apache.hadoop.hive.ql.processors._
38+
import org.apache.hadoop.hive.ql.Driver
3539

3640
import org.apache.spark.Logging
3741
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -312,13 +316,6 @@ private[hive] class ClientWrapper(
312316
shim.getAllPartitions(client, qlTable).map(toHivePartition)
313317
}
314318

315-
override def getPartitionsByFilter(
316-
hTable: HiveTable,
317-
predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
318-
val qlTable = toQlTable(hTable)
319-
shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
320-
}
321-
322319
override def listTables(dbName: String): Seq[String] = withHiveState {
323320
client.getAllTables(dbName)
324321
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 1 addition & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ import org.apache.hadoop.hive.ql.Driver
3131
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
3232
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
3333
import org.apache.hadoop.hive.ql.session.SessionState
34-
import org.apache.hadoop.hive.serde.serdeConstants
35-
36-
import org.apache.spark.Logging
37-
import org.apache.spark.sql.catalyst.expressions._
38-
import org.apache.spark.sql.types.{StringType, IntegralType}
3934

4035
/**
4136
* A shim that defines the interface between ClientWrapper and the underlying Hive library used to
@@ -66,8 +61,6 @@ private[client] sealed abstract class Shim {
6661

6762
def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
6863

69-
def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
70-
7164
def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
7265

7366
def getDriverResults(driver: Driver): Seq[String]
@@ -116,7 +109,7 @@ private[client] sealed abstract class Shim {
116109

117110
}
118111

119-
private[client] class Shim_v0_12 extends Shim with Logging {
112+
private[client] class Shim_v0_12 extends Shim {
120113

121114
private lazy val startMethod =
122115
findStaticMethod(
@@ -203,17 +196,6 @@ private[client] class Shim_v0_12 extends Shim with Logging {
203196
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
204197
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
205198

206-
override def getPartitionsByFilter(
207-
hive: Hive,
208-
table: Table,
209-
predicates: Seq[Expression]): Seq[Partition] = {
210-
// getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
211-
// See HIVE-4888.
212-
logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
213-
"Please use Hive 0.13 or higher.")
214-
getAllPartitions(hive, table)
215-
}
216-
217199
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
218200
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
219201

@@ -285,12 +267,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
285267
classOf[Hive],
286268
"getAllPartitionsOf",
287269
classOf[Table])
288-
private lazy val getPartitionsByFilterMethod =
289-
findMethod(
290-
classOf[Hive],
291-
"getPartitionsByFilter",
292-
classOf[Table],
293-
classOf[String])
294270
private lazy val getCommandProcessorMethod =
295271
findStaticMethod(
296272
classOf[CommandProcessorFactory],
@@ -312,52 +288,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
312288
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
313289
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
314290

315-
/**
316-
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
317-
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
318-
*
319-
* Unsupported predicates are skipped.
320-
*/
321-
def convertFilters(table: Table, filters: Seq[Expression]): String = {
322-
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
323-
val varcharKeys = table.getPartitionKeys
324-
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME))
325-
.map(col => col.getName).toSet
326-
327-
filters.collect {
328-
case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
329-
s"${a.name} ${op.symbol} $v"
330-
case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
331-
s"$v ${op.symbol} ${a.name}"
332-
333-
case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
334-
if !varcharKeys.contains(a.name) =>
335-
s"""${a.name} ${op.symbol} "$v""""
336-
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
337-
if !varcharKeys.contains(a.name) =>
338-
s""""$v" ${op.symbol} ${a.name}"""
339-
}.mkString(" and ")
340-
}
341-
342-
override def getPartitionsByFilter(
343-
hive: Hive,
344-
table: Table,
345-
predicates: Seq[Expression]): Seq[Partition] = {
346-
347-
// Hive getPartitionsByFilter() takes a string that represents partition
348-
// predicates like "str_key=\"value\" and int_key=1 ..."
349-
val filter = convertFilters(table, predicates)
350-
val partitions =
351-
if (filter.isEmpty) {
352-
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
353-
} else {
354-
logDebug(s"Hive metastore filter is '$filter'.")
355-
getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
356-
}
357-
358-
partitions.toSeq
359-
}
360-
361291
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
362292
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
363293

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private[hive]
4444
case class HiveTableScan(
4545
requestedAttributes: Seq[Attribute],
4646
relation: MetastoreRelation,
47-
partitionPruningPred: Seq[Expression])(
47+
partitionPruningPred: Option[Expression])(
4848
@transient val context: HiveContext)
4949
extends LeafNode {
5050

@@ -56,7 +56,7 @@ case class HiveTableScan(
5656

5757
// Bind all partition key attribute references in the partition pruning predicate for later
5858
// evaluation.
59-
private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
59+
private[this] val boundPruningPred = partitionPruningPred.map { pred =>
6060
require(
6161
pred.dataType == BooleanType,
6262
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
@@ -133,8 +133,7 @@ case class HiveTableScan(
133133
protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
134134
hadoopReader.makeRDDForTable(relation.hiveQlTable)
135135
} else {
136-
hadoopReader.makeRDDForPartitionedTable(
137-
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
136+
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
138137
}
139138

140139
override def output: Seq[Attribute] = attributes

0 commit comments

Comments
 (0)