Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ENABLE_HIVE_TIMESTAMP_TYPE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastore.partition.pruning.timestamps.enabled")
.internal()
.doc("When true, predicates for columns of type timestamp are pushed to hive metastore.")
.booleanConf
.createWithDefault(false)

val ENABLE_HIVE_FRACTIONAL_TYPES_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastore.partition.pruning.fractionals.enabled")
.internal()
.doc("When true, predicates for columns of type fractional (double, float, decimal) " +
"are pushed to hive metastore.")
.booleanConf
.createWithDefault(false)

val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
buildConf("spark.sql.statistics.fallBackToHdfs")
.doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
Expand Down Expand Up @@ -1222,6 +1237,12 @@ class SQLConf extends Serializable with Logging {
def advancedPartitionPredicatePushdownEnabled: Boolean =
getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)

def pruneTimestampPartitionColumns: Boolean =
getConf(ENABLE_HIVE_TIMESTAMP_TYPE_PARTITION_PRUNING)

def pruneFractionalPartitionColumns: Boolean =
getConf(ENABLE_HIVE_FRACTIONAL_TYPES_PARTITION_PRUNING)

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
import java.util.{Locale, ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType}
import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, FunctionType, Function => HiveFunction}
import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
Expand All @@ -38,15 +37,16 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegralType, StringType}
import org.apache.spark.sql.types.{FractionalType, IntegralType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -598,9 +598,17 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}

object ExtractableLiteral {
val pruneTimestamps = SQLConf.get.pruneTimestampPartitionColumns
val pruneFractionals = SQLConf.get.pruneFractionalPartitionColumns

def unapply(expr: Expression): Option[String] = expr match {
case Literal(value, _: IntegralType) => Some(value.toString)
case Literal(value, _: StringType) => Some(quoteStringLiteral(value.toString))
case Literal(value, _: FractionalType) if pruneFractionals => Some(value.toString)
// Timestamp must be converted to yyyy-mm-dd hh:mm:ss[.fffffffff] format before
// it can be used for partition pruning
case Literal(value: SQLTimestamp, _: TimestampType) if pruneTimestamps =>
Some(s"'${DateTimeUtils.timestampToString(value, DateTimeUtils.TimeZoneUTC)}'")
case _ => None
}
}
Expand Down Expand Up @@ -641,6 +649,10 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
if (varcharKeys.nonEmpty) {
logDebug(s"Following table columns will be ignored in " +
s"partition pruning because their type is varchar: $varcharKeys")
}

def unapply(attr: Attribute): Option[String] = {
if (varcharKeys.contains(attr.name)) {
Expand Down Expand Up @@ -687,7 +699,10 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
case _ => None
}

filters.flatMap(convert).mkString(" and ")
val result = filters.flatMap(convert).mkString(" and ")
logDebug(s"Conversion of $filters for metastore partition pruning resulted in $result")

result
}

private def quoteStringLiteral(str: String): String = {
Expand All @@ -714,7 +729,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
if (filter.isEmpty) {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
// We should get this config value from the metaStore. otherwise hit SPARK-18681.
// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.hive.client

import java.sql.Timestamp
import java.time.Instant
import java.util.Collections

import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -60,7 +61,7 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
"1 = intcol")

filterTest("int and string filter",
(Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil,
(Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", StringType)) :: Nil,
"1 = intcol and \"a\" = strcol")

filterTest("skip varchar",
Expand All @@ -72,9 +73,19 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
(Literal("p2\" and q=\"q2") === a("stringcol", StringType)) :: Nil,
"""stringcol = 'p1" and q="q1' and 'p2" and q="q2' = stringcol""")

filterTest("timestamp partition columns must be mapped to yyyy-mm-dd hh:mm:ss[.fffffffff] format",
(a("timestampcol", TimestampType) === Literal(812505600000000L, TimestampType)) :: Nil,
"timestampcol = '1995-10-01 00:00:00'")

filterTest("decimal filter",
(Literal(50D) === a("deccol", DecimalType(2, 0))) :: Nil,
"50.0 = deccol")

private def filterTest(name: String, filters: Seq[Expression], result: String) = {
test(name) {
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true") {
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true",
SQLConf.ENABLE_HIVE_FRACTIONAL_TYPES_PARTITION_PRUNING.key -> "true",
SQLConf.ENABLE_HIVE_TIMESTAMP_TYPE_PARTITION_PRUNING.key -> "true") {
val converted = shim.convertFilters(testTable, filters)
if (converted != result) {
fail(s"Expected ${filters.mkString(",")} to convert to '$result' but got '$converted'")
Expand All @@ -100,5 +111,40 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
}
}

test("turn on/off HIVE_FRACTIONAL_TYPES_PARTITION_PRUNING") {
import org.apache.spark.sql.catalyst.dsl.expressions._
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.ENABLE_HIVE_FRACTIONAL_TYPES_PARTITION_PRUNING.key -> enabled.toString) {
val filters =
(Literal(1.0F) === a("floatcol", FloatType) ||
Literal(2.0D) === a("doublecol", DoubleType) ||
Literal(BigDecimal(3.0D)) === a("deccol", DecimalType(10, 0))) :: Nil
val converted = shim.convertFilters(testTable, filters)
if (enabled) {
assert(converted == "((1.0 = floatcol or 2.0 = doublecol) or 3.0 = deccol)")
} else {
assert(converted.isEmpty)
}
}
}
}

test("turn on/off HIVE_TIMESTAMP_PARTITION_PRUNING") {
import org.apache.spark.sql.catalyst.dsl.expressions._
val october23rd = Instant.parse("1984-10-23T00:00:00.00Z")
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.ENABLE_HIVE_TIMESTAMP_TYPE_PARTITION_PRUNING.key -> enabled.toString) {
val filters = (Literal(new Timestamp(october23rd.toEpochMilli))
=== a("tcol", TimestampType)) :: Nil
val converted = shim.convertFilters(testTable, filters)
if (enabled) {
assert(converted == "'1984-10-23 00:00:00' = tcol")
} else {
assert(converted.isEmpty)
}
}
}
}

private def a(name: String, dataType: DataType) = AttributeReference(name, dataType)()
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import org.scalatest.Suite
class HiveClientSuites extends Suite with HiveClientVersions {
override def nestedSuites: IndexedSeq[Suite] = {
// Hive 0.12 does not provide the partition filtering API we call
versions.filterNot(_ == "0.12").map(new HiveClientSuite(_))
versions.filterNot(_ == "0.12").map(new HivePartitionFilteringSuite(_))
}
}
Loading