Skip to content

Commit 7d09bad

Browse files
committed
Use SQLConfEntry in HiveContext
1 parent 49f6213 commit 7d09bad

File tree

7 files changed

+103
-35
lines changed

7 files changed

+103
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,24 @@ private[spark] object SQLConf {
157157
}
158158
_v
159159
}, _.toString, doc, isPublic)
160+
161+
def seqConf[T](
162+
key: String,
163+
valueConverter: String => T,
164+
defaultValue: Option[Seq[T]] = None,
165+
doc: String = "",
166+
isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
167+
SQLConfEntry(
168+
key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
169+
}
170+
171+
def stringSeqConf(
172+
key: String,
173+
defaultValue: Option[Seq[String]] = None,
174+
doc: String = "",
175+
isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
176+
seqConf(key, s => s, defaultValue, doc, isPublic)
177+
}
160178
}
161179

162180
import SQLConfEntry._

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.beans.Introspector
2121
import java.util.Properties
2222
import java.util.concurrent.atomic.AtomicReference
2323

24-
import org.apache.spark.sql.SQLConf.SQLConfEntry
25-
2624
import scala.collection.JavaConversions._
2725
import scala.collection.immutable
2826
import scala.language.implicitConversions
@@ -33,6 +31,7 @@ import org.apache.spark.SparkContext
3331
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3432
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3533
import org.apache.spark.rdd.RDD
34+
import org.apache.spark.sql.SQLConf.SQLConfEntry
3635
import org.apache.spark.sql.catalyst.analysis._
3736
import org.apache.spark.sql.catalyst.errors.DialectException
3837
import org.apache.spark.sql.catalyst.expressions._

sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,19 @@ class SQLConfEntrySuite extends SparkFunSuite {
132132
}
133133
assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d")
134134
}
135+
136+
test("stringSeqConf") {
137+
val key = "spark.sql.SQLConfEntrySuite.stringSeq"
138+
val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
139+
defaultValue = Some(Nil))
140+
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
141+
142+
conf.setConf(confEntry, Seq("a", "b", "c", "d"))
143+
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d"))
144+
145+
conf.setConfString(key, "a,b,c,d,e")
146+
assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e")
147+
assert(conf.getConfString(key) === "a,b,c,d,e")
148+
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
149+
}
135150
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,13 @@ import java.io.File
2121
import java.net.{URL, URLClassLoader}
2222
import java.sql.Timestamp
2323

24-
import org.apache.hadoop.hive.common.StatsSetupConst
25-
import org.apache.hadoop.hive.common.`type`.HiveDecimal
26-
import org.apache.spark.sql.SQLConf.SQLConfEntry
27-
import org.apache.spark.sql.catalyst.ParserDialect
28-
2924
import scala.collection.JavaConversions._
3025
import scala.collection.mutable.HashMap
3126
import scala.language.implicitConversions
3227

3328
import org.apache.hadoop.fs.{FileSystem, Path}
29+
import org.apache.hadoop.hive.common.StatsSetupConst
30+
import org.apache.hadoop.hive.common.`type`.HiveDecimal
3431
import org.apache.hadoop.hive.conf.HiveConf
3532
import org.apache.hadoop.hive.ql.metadata.Table
3633
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
@@ -40,6 +37,9 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
4037
import org.apache.spark.SparkContext
4138
import org.apache.spark.annotation.Experimental
4239
import org.apache.spark.sql._
40+
import org.apache.spark.sql.SQLConf.SQLConfEntry
41+
import org.apache.spark.sql.SQLConf.SQLConfEntry._
42+
import org.apache.spark.sql.catalyst.ParserDialect
4343
import org.apache.spark.sql.catalyst.analysis._
4444
import org.apache.spark.sql.catalyst.plans.logical._
4545
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand}
@@ -70,13 +70,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7070

7171
import HiveContext._
7272

73+
println("create HiveContext")
74+
7375
/**
7476
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
7577
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
7678
* SerDe.
7779
*/
78-
protected[sql] def convertMetastoreParquet: Boolean =
79-
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
80+
protected[sql] def convertMetastoreParquet: Boolean = getConf(CONVERT_METASTORE_PARQUET)
8081

8182
/**
8283
* When true, also tries to merge possibly different but compatible Parquet schemas in different
@@ -85,7 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
8586
* This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
8687
*/
8788
protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
88-
getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true"
89+
getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
8990

9091
/**
9192
* When true, a table created by a Hive CTAS statement (no USING clause) will be
@@ -99,8 +100,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
99100
* - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
100101
* and no SerDe is specified (no ROW FORMAT SERDE clause).
101102
*/
102-
protected[sql] def convertCTAS: Boolean =
103-
getConf("spark.sql.hive.convertCTAS", "false").toBoolean
103+
protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS)
104104

105105
/**
106106
* The version of the hive client that will be used to communicate with the metastore. Note that
@@ -118,8 +118,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
118118
* option is only valid when using the execution version of Hive.
119119
* - maven - download the correct version of hive on demand from maven.
120120
*/
121-
protected[hive] def hiveMetastoreJars: String =
122-
getConf(HIVE_METASTORE_JARS, "builtin")
121+
protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS)
123122

124123
/**
125124
* A comma separated list of class prefixes that should be loaded using the classloader that
@@ -129,26 +128,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
129128
* custom appenders that are used by log4j.
130129
*/
131130
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
132-
getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
133-
.split(",").filterNot(_ == "")
134-
135-
private def jdbcPrefixes = Seq(
136-
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
131+
getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
137132

138133
/**
139134
* A comma separated list of class prefixes that should explicitly be reloaded for each version
140135
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
141136
* prefix that typically would be shared (i.e. org.apache.spark.*)
142137
*/
143138
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
144-
getConf("spark.sql.hive.metastore.barrierPrefixes", "")
145-
.split(",").filterNot(_ == "")
139+
getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
146140

147141
/*
148142
* hive thrift server use background spark sql thread pool to execute sql queries
149143
*/
150-
protected[hive] def hiveThriftServerAsync: Boolean =
151-
getConf("spark.sql.hive.thriftServer.async", "true").toBoolean
144+
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
152145

153146
@transient
154147
protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -525,7 +518,50 @@ private[hive] object HiveContext {
525518
val hiveExecutionVersion: String = "0.13.1"
526519

527520
val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version"
528-
val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars"
521+
val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
522+
defaultValue = Some("builtin"),
523+
doc = "Location of the jars that should be used to instantiate the HiveMetastoreClient. This" +
524+
" property can be one of three options: " +
525+
"1. \"builtin\" Use Hive 0.13.1, which is bundled with the Spark assembly jar when " +
526+
"<code>-Phive</code> is enabled. When this option is chosen, " +
527+
"spark.sql.hive.metastore.version must be either <code>0.13.1</code> or not defined. " +
528+
"2. \"maven\" Use Hive jars of specified version downloaded from Maven repositories." +
529+
"3. A classpath in the standard format for both Hive and Hadoop.")
530+
531+
val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
532+
defaultValue = Some(true),
533+
doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
534+
"the built in support.")
535+
536+
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
537+
"spark.sql.hive.convertMetastoreParquet.mergeSchema",
538+
defaultValue = Some(false),
539+
doc = "TODO")
540+
541+
val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
542+
defaultValue = Some(false),
543+
doc = "TODO")
544+
545+
val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
546+
defaultValue = Some(jdbcPrefixes),
547+
doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
548+
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
549+
"that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
550+
"classes that need to be shared are those that interact with classes that are already " +
551+
"shared. For example, custom appenders that are used by log4j.")
552+
553+
private def jdbcPrefixes = Seq(
554+
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
555+
556+
val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
557+
defaultValue = Some(Seq()),
558+
doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
559+
"version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
560+
"declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
561+
562+
val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
563+
defaultValue = Some(true),
564+
doc = "TODO")
529565

530566
/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
531567
def newTemporaryConfiguration(): Map[String, String] = {

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
563563

564564
test("scan a parquet table created through a CTAS statement") {
565565
withSQLConf(
566-
"spark.sql.hive.convertMetastoreParquet" -> "true",
566+
HiveContext.CONVERT_METASTORE_PARQUET.key -> "true",
567567
SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
568568

569569
withTempTable("jt") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
2424
import org.apache.spark.sql.hive.test.TestHive
2525
import org.apache.spark.sql.hive.test.TestHive._
2626
import org.apache.spark.sql.hive.test.TestHive.implicits._
27-
import org.apache.spark.sql.hive.{HiveQLDialect, MetastoreRelation}
27+
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
2828
import org.apache.spark.sql.parquet.ParquetRelation2
2929
import org.apache.spark.sql.sources.LogicalRelation
3030
import org.apache.spark.sql.types._
@@ -191,9 +191,9 @@ class SQLQuerySuite extends QueryTest {
191191
}
192192
}
193193

194-
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
194+
val originalConf = convertCTAS
195195

196-
setConf("spark.sql.hive.convertCTAS", "true")
196+
setConf(HiveContext.CONVERT_CTAS, true)
197197

198198
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
199199
sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
@@ -235,7 +235,7 @@ class SQLQuerySuite extends QueryTest {
235235
checkRelation("ctas1", false)
236236
sql("DROP TABLE ctas1")
237237

238-
setConf("spark.sql.hive.convertCTAS", originalConf)
238+
setConf(HiveContext.CONVERT_CTAS, originalConf)
239239
}
240240

241241
test("SQL Dialect Switching") {
@@ -348,7 +348,7 @@ class SQLQuerySuite extends QueryTest {
348348
"MANAGED_TABLE"
349349
)
350350

351-
val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
351+
val default = convertMetastoreParquet
352352
// use the Hive SerDe for parquet tables
353353
sql("set spark.sql.hive.convertMetastoreParquet = false")
354354
checkAnswer(
@@ -603,8 +603,8 @@ class SQLQuerySuite extends QueryTest {
603603
// generates an invalid query plan.
604604
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
605605
read.json(rdd).registerTempTable("data")
606-
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
607-
setConf("spark.sql.hive.convertCTAS", "false")
606+
val originalConf = convertCTAS
607+
setConf(HiveContext.CONVERT_CTAS, false)
608608

609609
sql("CREATE TABLE explodeTest (key bigInt)")
610610
table("explodeTest").queryExecution.analyzed match {
@@ -621,7 +621,7 @@ class SQLQuerySuite extends QueryTest {
621621

622622
sql("DROP TABLE explodeTest")
623623
dropTempTable("data")
624-
setConf("spark.sql.hive.convertCTAS", originalConf)
624+
setConf(HiveContext.CONVERT_CTAS, originalConf)
625625
}
626626

627627
test("sanity test for SPARK-6618") {

sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
153153
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
154154
read.json(rdd2).registerTempTable("jt_array")
155155

156-
setConf("spark.sql.hive.convertMetastoreParquet", "true")
156+
setConf(HiveContext.CONVERT_METASTORE_PARQUET, true)
157157
}
158158

159159
override def afterAll(): Unit = {
@@ -164,7 +164,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
164164
sql("DROP TABLE normal_parquet")
165165
sql("DROP TABLE IF EXISTS jt")
166166
sql("DROP TABLE IF EXISTS jt_array")
167-
setConf("spark.sql.hive.convertMetastoreParquet", "false")
167+
setConf(HiveContext.CONVERT_METASTORE_PARQUET, false)
168168
}
169169

170170
test(s"conversion is working") {

0 commit comments

Comments
 (0)