Skip to content

Commit 92fabd3

Browse files
committed
Revert SPARK-6910 and SPARK-9027
1 parent 5d3bdf2 commit 92fabd3

File tree

9 files changed

+44
-136
lines changed

9 files changed

+44
-136
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
301301
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
302302
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
303303
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
304-
// We're converting the entire table into ParquetRelation, so predicates to Hive metastore
305-
// are empty.
306-
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
304+
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
307305
val location = p.getLocation
308306
val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
309307
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
@@ -646,6 +644,32 @@ private[hive] case class MetastoreRelation
646644
new Table(tTable)
647645
}
648646

647+
@transient val hiveQlPartitions: Seq[Partition] = table.getAllPartitions.map { p =>
648+
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
649+
tPartition.setDbName(databaseName)
650+
tPartition.setTableName(tableName)
651+
tPartition.setValues(p.values)
652+
653+
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
654+
tPartition.setSd(sd)
655+
sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
656+
657+
sd.setLocation(p.storage.location)
658+
sd.setInputFormat(p.storage.inputFormat)
659+
sd.setOutputFormat(p.storage.outputFormat)
660+
661+
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
662+
sd.setSerdeInfo(serdeInfo)
663+
serdeInfo.setSerializationLib(p.storage.serde)
664+
665+
val serdeParameters = new java.util.HashMap[String, String]()
666+
serdeInfo.setParameters(serdeParameters)
667+
table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
668+
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
669+
670+
new Partition(hiveQlTable, tPartition)
671+
}
672+
649673
@transient override lazy val statistics: Statistics = Statistics(
650674
sizeInBytes = {
651675
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
@@ -666,34 +690,6 @@ private[hive] case class MetastoreRelation
666690
}
667691
)
668692

669-
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
670-
table.getPartitions(predicates).map { p =>
671-
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
672-
tPartition.setDbName(databaseName)
673-
tPartition.setTableName(tableName)
674-
tPartition.setValues(p.values)
675-
676-
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
677-
tPartition.setSd(sd)
678-
sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
679-
680-
sd.setLocation(p.storage.location)
681-
sd.setInputFormat(p.storage.inputFormat)
682-
sd.setOutputFormat(p.storage.outputFormat)
683-
684-
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
685-
sd.setSerdeInfo(serdeInfo)
686-
serdeInfo.setSerializationLib(p.storage.serde)
687-
688-
val serdeParameters = new java.util.HashMap[String, String]()
689-
serdeInfo.setParameters(serdeParameters)
690-
table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
691-
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
692-
693-
new Partition(hiveQlTable, tPartition)
694-
}
695-
}
696-
697693
/** Only compare database and tablename, not alias. */
698694
override def sameResult(plan: LogicalPlan): Boolean = {
699695
plan match {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.reflect.ClassTag
2727

2828
import com.esotericsoftware.kryo.Kryo
2929
import com.esotericsoftware.kryo.io.{Input, Output}
30-
3130
import org.apache.hadoop.conf.Configuration
3231
import org.apache.hadoop.fs.Path
3332
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ private[hive] trait HiveStrategies {
125125
InterpretedPredicate.create(castedPredicate)
126126
}
127127

128-
val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
128+
val partitions = relation.hiveQlPartitions.filter { part =>
129129
val partitionValues = part.getValues
130130
var i = 0
131131
while (i < partitionValues.size()) {
@@ -213,7 +213,7 @@ private[hive] trait HiveStrategies {
213213
projectList,
214214
otherPredicates,
215215
identity[Seq[Expression]],
216-
HiveTableScan(_, relation, pruningPredicates)(hiveContext)) :: Nil
216+
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
217217
case _ =>
218218
Nil
219219
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.PrintStream
2121
import java.util.{Map => JMap}
2222

2323
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
24-
import org.apache.spark.sql.catalyst.expressions.Expression
2524

2625
private[hive] case class HiveDatabase(
2726
name: String,
@@ -72,12 +71,7 @@ private[hive] case class HiveTable(
7271

7372
def isPartitioned: Boolean = partitionColumns.nonEmpty
7473

75-
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
76-
predicates match {
77-
case Nil => client.getAllPartitions(this)
78-
case _ => client.getPartitionsByFilter(this, predicates)
79-
}
80-
}
74+
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
8175

8276
// Hive does not support backticks when passing names to the client.
8377
def qualifiedName: String = s"$database.$name"
@@ -138,9 +132,6 @@ private[hive] trait ClientInterface {
138132
/** Returns all partitions for the given table. */
139133
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
140134

141-
/** Returns partitions filtered by predicates for the given table. */
142-
def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
143-
144135
/** Loads a static partition into an existing table. */
145136
def loadPartition(
146137
loadPath: String,

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,25 @@
1717

1818
package org.apache.spark.sql.hive.client
1919

20-
import java.io.{File, PrintStream}
21-
import java.util.{Map => JMap}
20+
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
21+
import java.net.URI
22+
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
2223
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.collection.JavaConversions._
2526
import scala.language.reflectiveCalls
2627

2728
import org.apache.hadoop.fs.Path
29+
import org.apache.hadoop.hive.metastore.api.Database
2830
import org.apache.hadoop.hive.conf.HiveConf
29-
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
3031
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
32+
import org.apache.hadoop.hive.metastore.api
33+
import org.apache.hadoop.hive.metastore.api.FieldSchema
34+
import org.apache.hadoop.hive.ql.metadata
3135
import org.apache.hadoop.hive.ql.metadata.Hive
32-
import org.apache.hadoop.hive.ql.processors._
3336
import org.apache.hadoop.hive.ql.session.SessionState
34-
import org.apache.hadoop.hive.ql.{Driver, metadata}
37+
import org.apache.hadoop.hive.ql.processors._
38+
import org.apache.hadoop.hive.ql.Driver
3539

3640
import org.apache.spark.Logging
3741
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -312,13 +316,6 @@ private[hive] class ClientWrapper(
312316
shim.getAllPartitions(client, qlTable).map(toHivePartition)
313317
}
314318

315-
override def getPartitionsByFilter(
316-
hTable: HiveTable,
317-
predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
318-
val qlTable = toQlTable(hTable)
319-
shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
320-
}
321-
322319
override def listTables(dbName: String): Seq[String] = withHiveState {
323320
client.getAllTables(dbName)
324321
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ import org.apache.hadoop.hive.ql.Driver
3131
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
3232
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
3333
import org.apache.hadoop.hive.ql.session.SessionState
34-
import org.apache.hadoop.hive.serde.serdeConstants
35-
36-
import org.apache.spark.Logging
37-
import org.apache.spark.sql.catalyst.expressions.{Expression, AttributeReference, BinaryComparison}
38-
import org.apache.spark.sql.types.{StringType, IntegralType}
3934

4035
/**
4136
* A shim that defines the interface between ClientWrapper and the underlying Hive library used to
@@ -66,8 +61,6 @@ private[client] sealed abstract class Shim {
6661

6762
def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
6863

69-
def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
70-
7164
def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
7265

7366
def getDriverResults(driver: Driver): Seq[String]
@@ -116,7 +109,7 @@ private[client] sealed abstract class Shim {
116109

117110
}
118111

119-
private[client] class Shim_v0_12 extends Shim with Logging {
112+
private[client] class Shim_v0_12 extends Shim {
120113

121114
private lazy val startMethod =
122115
findStaticMethod(
@@ -203,17 +196,6 @@ private[client] class Shim_v0_12 extends Shim with Logging {
203196
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
204197
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
205198

206-
override def getPartitionsByFilter(
207-
hive: Hive,
208-
table: Table,
209-
predicates: Seq[Expression]): Seq[Partition] = {
210-
// getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
211-
// See HIVE-4888.
212-
logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
213-
"Please use Hive 0.13 or higher.")
214-
getAllPartitions(hive, table)
215-
}
216-
217199
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
218200
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
219201

@@ -285,12 +267,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
285267
classOf[Hive],
286268
"getAllPartitionsOf",
287269
classOf[Table])
288-
private lazy val getPartitionsByFilterMethod =
289-
findMethod(
290-
classOf[Hive],
291-
"getPartitionsByFilter",
292-
classOf[Table],
293-
classOf[String])
294270
private lazy val getCommandProcessorMethod =
295271
findStaticMethod(
296272
classOf[CommandProcessorFactory],
@@ -312,48 +288,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
312288
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
313289
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
314290

315-
override def getPartitionsByFilter(
316-
hive: Hive,
317-
table: Table,
318-
predicates: Seq[Expression]): Seq[Partition] = {
319-
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
320-
val varcharKeys = table.getPartitionKeys
321-
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME))
322-
.map(col => col.getName).toSet
323-
324-
// Hive getPartitionsByFilter() takes a string that represents partition
325-
// predicates like "str_key=\"value\" and int_key=1 ..."
326-
val filter = predicates.flatMap { expr =>
327-
expr match {
328-
case op @ BinaryComparison(lhs, rhs) => {
329-
lhs match {
330-
case AttributeReference(_, _, _, _) => {
331-
rhs.dataType match {
332-
case _: IntegralType =>
333-
Some(lhs.prettyString + op.symbol + rhs.prettyString)
334-
case _: StringType if (!varcharKeys.contains(lhs.prettyString)) =>
335-
Some(lhs.prettyString + op.symbol + "\"" + rhs.prettyString + "\"")
336-
case _ => None
337-
}
338-
}
339-
case _ => None
340-
}
341-
}
342-
case _ => None
343-
}
344-
}.mkString(" and ")
345-
346-
val partitions =
347-
if (filter.isEmpty) {
348-
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
349-
} else {
350-
logDebug(s"Hive metastore filter is '$filter'.")
351-
getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
352-
}
353-
354-
partitions.toSeq
355-
}
356-
357291
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
358292
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
359293

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private[hive]
4444
case class HiveTableScan(
4545
requestedAttributes: Seq[Attribute],
4646
relation: MetastoreRelation,
47-
partitionPruningPred: Seq[Expression])(
47+
partitionPruningPred: Option[Expression])(
4848
@transient val context: HiveContext)
4949
extends LeafNode {
5050

@@ -56,7 +56,7 @@ case class HiveTableScan(
5656

5757
// Bind all partition key attribute references in the partition pruning predicate for later
5858
// evaluation.
59-
private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
59+
private[this] val boundPruningPred = partitionPruningPred.map { pred =>
6060
require(
6161
pred.dataType == BooleanType,
6262
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
@@ -133,8 +133,7 @@ case class HiveTableScan(
133133
protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
134134
hadoopReader.makeRDDForTable(relation.hiveQlTable)
135135
} else {
136-
hadoopReader.makeRDDForPartitionedTable(
137-
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
136+
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
138137
}
139138

140139
override def output: Seq[Attribute] = attributes

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ package org.apache.spark.sql.hive.client
2020
import java.io.File
2121

2222
import org.apache.spark.{Logging, SparkFunSuite}
23-
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
2423
import org.apache.spark.sql.catalyst.util.quietly
25-
import org.apache.spark.sql.types.IntegerType
2624
import org.apache.spark.util.Utils
2725

2826
/**
@@ -153,12 +151,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
153151
client.getAllPartitions(client.getTable("default", "src_part"))
154152
}
155153

156-
test(s"$version: getPartitionsByFilter") {
157-
client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(
158-
AttributeReference("key", IntegerType, false)(NamedExpression.newExprId),
159-
Literal(1))))
160-
}
161-
162154
test(s"$version: loadPartition") {
163155
client.loadPartition(
164156
emptyDir,

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
151151
case p @ HiveTableScan(columns, relation, _) =>
152152
val columnNames = columns.map(_.name)
153153
val partValues = if (relation.table.isPartitioned) {
154-
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
154+
p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
155155
} else {
156156
Seq.empty
157157
}

0 commit comments

Comments
 (0)