diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 3503fb90c3f8..e0a7b8e34974 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -492,7 +492,7 @@ def _createForTesting(cls, sparkContext): confusing error messages. """ jsc = sparkContext._jsc.sc() - jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc) + jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False) return cls(sparkContext, jtestHive) def refreshTable(self, tableName): diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b45be0251d95..5fbf2297e7e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -55,7 +55,8 @@ object TestHive .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") // SPARK-8910 - .set("spark.ui.enabled", "false"))) + .set("spark.ui.enabled", "false")), + createTestTables = true) /** @@ -70,15 +71,21 @@ object TestHive * test cases that rely on TestHive must be serialized. */ class TestHiveContext( - @transient override val sparkSession: TestHiveSparkSession) + @transient override val sparkSession: TestHiveSparkSession, + private val createTestTables: Boolean) extends SQLContext(sparkSession) { + def this(sc: SparkContext, createTestTables: Boolean) { + this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), createTestTables), + createTestTables) + } + def this(sc: SparkContext) { - this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc))) + this(sc, createTestTables = true) } override def newSession(): TestHiveContext = { - new TestHiveContext(sparkSession.newSession()) + new TestHiveContext(sparkSession.newSession(), createTestTables) } override def sharedState: TestHiveSharedState = sparkSession.sharedState @@ -109,7 +116,8 @@ private[hive] class TestHiveSparkSession( val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], - @transient private val existingSharedState: Option[TestHiveSharedState]) + @transient private val existingSharedState: Option[TestHiveSharedState], + private val createTestTables: Boolean) extends SparkSession(sc) with Logging { self => // TODO: We need to set the temp warehouse path to sc's conf. @@ -118,13 +126,14 @@ private[hive] class TestHiveSparkSession( // when we creating metadataHive. This flow is not easy to follow and can introduce // confusion when a developer is debugging an issue. We need to refactor this part // to just set the temp warehouse path in sc's conf. - def this(sc: SparkContext) { + def this(sc: SparkContext, createTestTables: Boolean) { this( sc, Utils.createTempDir(namePrefix = "warehouse"), TestHiveContext.makeScratchDir(), HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false), - None) + None, + createTestTables) } assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") @@ -144,7 +153,8 @@ private[hive] class TestHiveSparkSession( override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( - sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState)) + sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), + createTestTables) } private var cacheTables: Boolean = false @@ -179,8 +189,25 @@ private[hive] class TestHiveSparkSession( hiveFilesTemp.mkdir() ShutdownHookManager.registerShutdownDeleteDir(hiveFilesTemp) + lazy val inRepoTests = { + if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { + new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) + } else { + new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + + File.separator + "resources") + } + } + def getHiveFile(path: String): File = { - new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile) + // Attempt to load from class loader, fall back to old system property based for Python tests. + val resourcePath = Option(Thread.currentThread().getContextClassLoader.getResource(path)) + resourcePath.map(rp => new File(rp.getFile)).getOrElse{ + val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar) + hiveDevHome + .map(new File(_, stripped)) + .filter(_.exists) + .getOrElse(new File(inRepoTests, stripped)) + } } val describedTable = "DESCRIBE (\\w+)".r @@ -208,7 +235,7 @@ private[hive] class TestHiveSparkSession( // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql @transient - val hiveQTestUtilTables = Seq( + lazy val hiveQTestUtilTables = Seq( TestTable("src", "CREATE TABLE src (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), @@ -362,7 +389,9 @@ private[hive] class TestHiveSparkSession( s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd) ) - hiveQTestUtilTables.foreach(registerTestTable) + if (createTestTables) { + hiveQTestUtilTables.foreach(registerTestTable) + } private val loadedTables = new collection.mutable.HashSet[String]