Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def _createForTesting(cls, sparkContext):
confusing error messages.
"""
jsc = sparkContext._jsc.sc()
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
return cls(sparkContext, jtestHive)

def refreshTable(self, tableName):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession)
extends SQLContext(sparkSession) {

def this(sc: SparkContext) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)))
/**
* If loadTestTables is false, no test tables are loaded. Note that this flag can only be true
* when running in the JVM, i.e. it needs to be false when calling from Python.
*/
def this(sc: SparkContext, loadTestTables: Boolean = true) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
}

override def newSession(): TestHiveContext = {
Expand Down Expand Up @@ -103,13 +107,24 @@ class TestHiveContext(

}


/**
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
* @param warehousePath path to the Hive warehouse directory
* @param scratchDirPath scratch directory used by Hive's metastore client
* @param metastoreTemporaryConf configuration options for Hive's metastore
* @param existingSharedState optional [[TestHiveSharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
val warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
@transient private val existingSharedState: Option[TestHiveSharedState])
@transient private val existingSharedState: Option[TestHiveSharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>

// TODO: We need to set the temp warehouse path to sc's conf.
Expand All @@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession(
// when we creating metadataHive. This flow is not easy to follow and can introduce
// confusion when a developer is debugging an issue. We need to refactor this part
// to just set the temp warehouse path in sc's conf.
def this(sc: SparkContext) {
def this(sc: SparkContext, loadTestTables: Boolean) {
this(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None)
None,
loadTestTables)
}

assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
Expand All @@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession(

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState))
sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables)
}

private var cacheTables: Boolean = false
Expand Down Expand Up @@ -204,11 +220,12 @@ private[hive] class TestHiveSparkSession(
testTables += (testTable.name -> testTable)
}

if (loadTestTables) {
// The test tables that are defined in the Hive QTestUtil.
// /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
// https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
@transient
val hiveQTestUtilTables = Seq(
val hiveQTestUtilTables: Seq[TestTable] = Seq(
TestTable("src",
"CREATE TABLE src (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
Expand Down Expand Up @@ -254,7 +271,10 @@ private[hive] class TestHiveSparkSession(
""".stripMargin)

sql(
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
s"""
|LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}'
|INTO TABLE src_thrift
""".stripMargin)
}),
TestTable("serdeins",
s"""CREATE TABLE serdeins (key INT, value STRING)
Expand Down Expand Up @@ -290,7 +310,10 @@ private[hive] class TestHiveSparkSession(
| }'
|)
""".stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
s"""
|LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}'
|INTO TABLE episodes
""".stripMargin.cmd
),
// THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
// PARTITIONING IS NOT YET SUPPORTED
Expand Down Expand Up @@ -363,6 +386,7 @@ private[hive] class TestHiveSparkSession(
)

hiveQTestUtilTables.foreach(registerTestTable)
}

private val loadedTables = new collection.mutable.HashSet[String]

Expand Down