From 260078520b4c865063453a236f5d8ec2cd230b80 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 15 Apr 2019 19:40:43 +0800 Subject: [PATCH 01/18] parallelize tests --- project/SparkBuild.scala | 91 ++++++++++++++++++- .../apache/spark/sql/SQLQueryTestSuite.scala | 6 +- .../spark/sql/test/SharedSparkSession.scala | 7 +- 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2036dc011a14..b4c4f6c5c000 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -430,6 +430,81 @@ object SparkBuild extends PomBuild { else x.settings(Seq[Setting[_]](): _*) } ++ Seq[Project](OldDeps.project) } + + if (!sys.env.contains("SERIAL_SBT_TESTS")) { + allProjects.foreach(enable(SparkParallelTestGrouping.settings)) + } +} + +object SparkParallelTestGrouping { + // Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or + // collections of suites) in their own forked JVMs, allowing us to gain parallelism within a + // SBT project. Here, we take a whitelisting approach where the default behavior is to run all + // tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism. + // + // There are a reasons why such a whitelist approach is good: + // + // 1. Launching one JVM per suite adds significant overhead for short-running suites. In + // addition to JVM startup time and JIT warmup, it appears that initialization of Derby + // metastores can be very slow so creating a fresh warehouse per suite is inefficient. + // + // 2. When parallelizing within a project we need to give each forked JVM a different tmpdir + // so that the metastore warehouses do not collide. Unfortunately, it seems that there are + // some tests which have an overly tight dependency on the default tmpdir, so those fragile + // tests need to continue re-running in the default configuration (or need to be rewritten). + // Fixing that problem would be a huge amount of work for limited payoff in most cases + // because most test suites are short-running. + // + + private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( + "org.apache.spark.DistributedSuite", + "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.CastSuite", + "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.hive.StatisticsSuite", + "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", + "org.apache.spark.sql.hive.execution.HashAggregationQueryWithControlledFallbackSuite", + "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", + "org.apache.spark.sql.SQLQueryTestSuite" + ) + + private val DEFAULT_TEST_GROUP = "default_test_group" + + private def testNameToTestGroup(name: String): String = name match { + case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name + case _ => DEFAULT_TEST_GROUP + } + + lazy val settings = Seq( + testGrouping in Test := { + val tests: Seq[TestDefinition] = (definedTests in Test).value + val defaultForkOptions = ForkOptions( + bootJars = Nil, + javaHome = javaHome.value, + connectInput = connectInput.value, + outputStrategy = outputStrategy.value, + runJVMOptions = (javaOptions in Test).value, + workingDirectory = Some(baseDirectory.value), + envVars = (envVars in Test).value + ) + tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => + val forkOptions = { + if (groupName == DEFAULT_TEST_GROUP) { + defaultForkOptions + } else { + defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) + } + } + new Tests.Group( + name = groupName, + tests = groupTests, + runPolicy = Tests.SubProcess(forkOptions)) + } + }.toSeq + ) } object Core { @@ -910,8 +985,14 @@ object TestSettings { testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), // Enable Junit testing. libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test", - // Only allow one test at a time, even across projects, since they run in the same JVM - parallelExecution in Test := false, + // `parallelExecutionInTest` controls whether test suites belonging to the same SBT project + // can run in parallel with one another. It does NOT control whether tests execute in parallel + // within the same JVM (which is controlled by `testForkedParallel`) or whether test cases + // within the same suite can run in parallel (which is a ScalaTest runner option which is passed + // to the underlying runner but is not a SBT-level configuration). This needs to be `true` in + // order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect. + // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged. + parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true }, // Make sure the test temp directory exists. resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File => var dir = new File(testTempDir) @@ -932,7 +1013,11 @@ object TestSettings { } Seq.empty[File] }).value, - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) + concurrentRestrictions in Global := { + // Value of '8' is empirically chosen based on experience with Jenkins flakiness. + if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value + else Seq(Tags.limit(Tags.ForkedTestGroup, 8)) + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index def99c820af8..9963aadcf592 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -294,7 +294,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = hiveResultString(df.queryExecution.executedPlan) .map(_.replaceAll("#\\d+", "#x") - .replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/") + .replaceAll( + // scalastyle:off + "Location.*/sql/core/spark-warehouse/(SQLQueryTestSuite/|SparkServiceSQLQueryTestSuite/)?", + s"Location ${notIncludedMsg}sql/core/spark-warehouse/") + // scalastyle:on .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index 8734639edaa6..5d0a510bd1fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -26,7 +26,7 @@ import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -37,7 +37,7 @@ trait SharedSparkSession with Eventually { self: Suite => protected def sparkConf = { - new SparkConf() + val conf = new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -46,6 +46,9 @@ trait SharedSparkSession // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + conf.set( + StaticSQLConf.WAREHOUSE_PATH, + conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass().getSimpleName) } /** From 2e2c8f1fe2fc5ed3c2e197967d6c81ae2b717066 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 15 Apr 2019 23:01:05 +0800 Subject: [PATCH 02/18] try increasing the number of concurrent test groups --- project/SparkBuild.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b4c4f6c5c000..2a4ed40221e1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1014,9 +1014,10 @@ object TestSettings { Seq.empty[File] }).value, concurrentRestrictions in Global := { - // Value of '8' is empirically chosen based on experience with Jenkins flakiness. + // The number of concurrent test groups is empirically chosen based on experience + // with Jenkins flakiness. if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value - else Seq(Tags.limit(Tags.ForkedTestGroup, 8)) + else Seq(Tags.limit(Tags.ForkedTestGroup, 16)) } ) From fded60ec8bcfbb5bb47da3a1b114961286ae0710 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 00:30:19 +0800 Subject: [PATCH 03/18] use 8 concurrentRestrictions again; parallize more test suites --- project/SparkBuild.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2a4ed40221e1..22694a8f5f68 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -458,6 +458,7 @@ object SparkParallelTestGrouping { private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( "org.apache.spark.DistributedSuite", + "org.apache.spark.ShuffleNettySuite", "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.CastSuite", @@ -467,6 +468,8 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.hive.execution.HashAggregationQueryWithControlledFallbackSuite", "org.apache.spark.sql.hive.client.VersionsSuite", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", + "org.apache.spark.ml.classification.LogisticRegressionSuite", + "org.apache.spark.ml.classification.LinearSVCSuite", "org.apache.spark.sql.SQLQueryTestSuite" ) @@ -1017,7 +1020,7 @@ object TestSettings { // The number of concurrent test groups is empirically chosen based on experience // with Jenkins flakiness. if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value - else Seq(Tags.limit(Tags.ForkedTestGroup, 16)) + else Seq(Tags.limit(Tags.ForkedTestGroup, 8)) } ) From 475c8ef809d0ff164a6089c820b66f3ac4714036 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 10:07:04 +0800 Subject: [PATCH 04/18] try reducing ForkedTestGroup --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 22694a8f5f68..d72af57c05e6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1020,7 +1020,7 @@ object TestSettings { // The number of concurrent test groups is empirically chosen based on experience // with Jenkins flakiness. if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value - else Seq(Tags.limit(Tags.ForkedTestGroup, 8)) + else Seq(Tags.limit(Tags.ForkedTestGroup, 4)) } ) From d2ac3affe6eccda69c6b09ae3dc063ff00ae648c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 14:47:11 +0800 Subject: [PATCH 05/18] remove some test suites --- project/SparkBuild.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d72af57c05e6..464c718e8c9c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -458,14 +458,11 @@ object SparkParallelTestGrouping { private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( "org.apache.spark.DistributedSuite", - "org.apache.spark.ShuffleNettySuite", "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.CastSuite", "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", - "org.apache.spark.sql.hive.StatisticsSuite", "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", - "org.apache.spark.sql.hive.execution.HashAggregationQueryWithControlledFallbackSuite", "org.apache.spark.sql.hive.client.VersionsSuite", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", "org.apache.spark.ml.classification.LogisticRegressionSuite", From ef928e2d9427837c387da37456313c6993ab6cb5 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 19:19:57 +0800 Subject: [PATCH 06/18] revise --- .../src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 9963aadcf592..929e2b6ddcaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -296,7 +296,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { .map(_.replaceAll("#\\d+", "#x") .replaceAll( // scalastyle:off - "Location.*/sql/core/spark-warehouse/(SQLQueryTestSuite/|SparkServiceSQLQueryTestSuite/)?", + "Location.*/sql/core/spark-warehouse/(SQLQueryTestSuite/)?", s"Location ${notIncludedMsg}sql/core/spark-warehouse/") // scalastyle:on .replaceAll("Created By.*", s"Created By $notIncludedMsg") From 4087ac1816c992849782c17a7a940e1e3829de2a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 19:24:43 +0800 Subject: [PATCH 07/18] do not register ShutdownDeleteDir hook in withTempDir --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++++-- core/src/test/scala/org/apache/spark/SparkFunSuite.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7c8648d61bfb..fe23e81b8526 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -298,9 +298,12 @@ private[spark] object Utils extends Logging { */ def createTempDir( root: String = System.getProperty("java.io.tmpdir"), - namePrefix: String = "spark"): File = { + namePrefix: String = "spark", + registerShutdownDeleteDir: Boolean = true): File = { val dir = createDirectory(root, namePrefix) - ShutdownHookManager.registerShutdownDeleteDir(dir) + if (registerShutdownDeleteDir) { + ShutdownHookManager.registerShutdownDeleteDir(dir) + } dir } diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 34e386819667..61883e81ab26 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -113,7 +113,7 @@ abstract class SparkFunSuite * returns. */ protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir() + val dir = Utils.createTempDir(registerShutdownDeleteDir = false) try f(dir) finally { Utils.deleteRecursively(dir) } From 87752749cf68306dc6a0ed0449fc5838b62b5c3e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 19:46:24 +0800 Subject: [PATCH 08/18] Revert "do not register ShutdownDeleteDir hook in withTempDir" This reverts commit 4087ac1816c992849782c17a7a940e1e3829de2a. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 ++----- core/src/test/scala/org/apache/spark/SparkFunSuite.scala | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fe23e81b8526..7c8648d61bfb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -298,12 +298,9 @@ private[spark] object Utils extends Logging { */ def createTempDir( root: String = System.getProperty("java.io.tmpdir"), - namePrefix: String = "spark", - registerShutdownDeleteDir: Boolean = true): File = { + namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) - if (registerShutdownDeleteDir) { - ShutdownHookManager.registerShutdownDeleteDir(dir) - } + ShutdownHookManager.registerShutdownDeleteDir(dir) dir } diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 61883e81ab26..34e386819667 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -113,7 +113,7 @@ abstract class SparkFunSuite * returns. */ protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir(registerShutdownDeleteDir = false) + val dir = Utils.createTempDir() try f(dir) finally { Utils.deleteRecursively(dir) } From 09695085c4db08b58db9d3536895b5f062cccba2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 20:31:15 +0800 Subject: [PATCH 09/18] try not to delete temp dir --- core/src/main/scala/org/apache/spark/util/Utils.scala | 1 - project/SparkBuild.scala | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7c8648d61bfb..1487c332abe1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -300,7 +300,6 @@ private[spark] object Utils extends Logging { root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) - ShutdownHookManager.registerShutdownDeleteDir(dir) dir } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 464c718e8c9c..438ddd9c8f74 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -492,7 +492,8 @@ object SparkParallelTestGrouping { tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => val forkOptions = { if (groupName == DEFAULT_TEST_GROUP) { - defaultForkOptions + defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/default")) } else { defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) From d40e7daacfb5a34134aaa687f4b44b238af392d8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 22:44:17 +0800 Subject: [PATCH 10/18] Revert "try not to delete temp dir" This reverts commit 09695085c4db08b58db9d3536895b5f062cccba2. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 1 + project/SparkBuild.scala | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1487c332abe1..7c8648d61bfb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -300,6 +300,7 @@ private[spark] object Utils extends Logging { root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) + ShutdownHookManager.registerShutdownDeleteDir(dir) dir } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 438ddd9c8f74..464c718e8c9c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -492,8 +492,7 @@ object SparkParallelTestGrouping { tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => val forkOptions = { if (groupName == DEFAULT_TEST_GROUP) { - defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ - Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/default")) + defaultForkOptions } else { defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) From 22616e0407128958c8deace24fb5f71badd02ef6 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 16 Apr 2019 23:07:08 +0800 Subject: [PATCH 11/18] delete temp dir --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index db896b3b36ac..8d0503405e25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -33,12 +33,18 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils class SQLQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ setupTestData() + test("remove temp paths") { + val dir = System.getProperty("java.io.tmpdir") + "/target/tmp/default" + Utils.deleteRecursively(new File(dir)) + } + test("SPARK-8010: promote numeric to string") { val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") From 03ec1a23ede466b385f5d11c136272740ee61661 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 17 Apr 2019 02:45:28 +0800 Subject: [PATCH 12/18] Revert "delete temp dir" This reverts commit 22616e0407128958c8deace24fb5f71badd02ef6. --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8d0503405e25..db896b3b36ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -33,18 +33,12 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils class SQLQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ setupTestData() - test("remove temp paths") { - val dir = System.getProperty("java.io.tmpdir") + "/target/tmp/default" - Utils.deleteRecursively(new File(dir)) - } - test("SPARK-8010: promote numeric to string") { val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") From 025f4b7748d7ba3c77e2b3ee98e03b194b66f57f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 17 Apr 2019 19:54:52 +0800 Subject: [PATCH 13/18] revise regex --- .../test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 929e2b6ddcaf..1b35d92a0d55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -295,10 +295,8 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val answer = hiveResultString(df.queryExecution.executedPlan) .map(_.replaceAll("#\\d+", "#x") .replaceAll( - // scalastyle:off - "Location.*/sql/core/spark-warehouse/(SQLQueryTestSuite/)?", + "Location.*/sql/core/spark-warehouse/SQLQueryTestSuite/", s"Location ${notIncludedMsg}sql/core/spark-warehouse/") - // scalastyle:on .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") From 5047e5a456da62f799d30c7330db9885fb91cbec Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 17 Apr 2019 19:56:07 +0800 Subject: [PATCH 14/18] try adding HiveExternalCatalogSuite to whilte list --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 464c718e8c9c..1728cd2d1866 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -462,6 +462,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", "org.apache.spark.sql.catalyst.expressions.CastSuite", "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogSuite", "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", "org.apache.spark.sql.hive.client.VersionsSuite", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", From 920433eedb5164ff003a97ddd31f4404c1ef2985 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 17 Apr 2019 22:27:16 +0800 Subject: [PATCH 15/18] revise WAREHOUSE_PATH --- .../scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index 5d0a510bd1fd..ff6211b95042 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -48,7 +48,7 @@ trait SharedSparkSession .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) conf.set( StaticSQLConf.WAREHOUSE_PATH, - conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass().getSimpleName) + conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } /** From 57e18c2fbf312cd70f451f57d6e0e99a6db7135d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 18 Apr 2019 00:50:36 +0800 Subject: [PATCH 16/18] fix --- .../test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 1b35d92a0d55..d1842d329cc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -291,11 +291,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val df = session.sql(sql) val schema = df.schema val notIncludedMsg = "[not included in comparison]" + val clsName = this.getClass.getCanonicalName // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = hiveResultString(df.queryExecution.executedPlan) .map(_.replaceAll("#\\d+", "#x") .replaceAll( - "Location.*/sql/core/spark-warehouse/SQLQueryTestSuite/", + s"Location.*/sql/core/spark-warehouse/$clsName/", s"Location ${notIncludedMsg}sql/core/spark-warehouse/") .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") From 70874b7b2c60a1f1b1a4bd0b14c2a2866bc08480 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 18 Apr 2019 11:49:44 +0800 Subject: [PATCH 17/18] add org.apache.spark.sql.hive.StatisticsSuite --- project/SparkBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 1728cd2d1866..3a410798bc2c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -463,6 +463,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.catalyst.expressions.CastSuite", "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", "org.apache.spark.sql.hive.HiveExternalCatalogSuite", + "org.apache.spark.sql.hive.StatisticsSuite", "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", "org.apache.spark.sql.hive.client.VersionsSuite", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", From 4b61d0aa499256364259cbb6f2fd01b12ea9dd78 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 18 Apr 2019 19:49:27 +0800 Subject: [PATCH 18/18] add HiveClientVersions --- project/SparkBuild.scala | 1 + .../apache/spark/sql/hive/client/HiveClientSuite.scala | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3a410798bc2c..8e971974c35e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -466,6 +466,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.hive.StatisticsSuite", "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.client.HiveClientVersions", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", "org.apache.spark.ml.classification.LogisticRegressionSuite", "org.apache.spark.ml.classification.LinearSVCSuite", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index f3d8c2ad440f..57a02aa5e0c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType} +import org.apache.spark.util.Utils // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -39,8 +40,9 @@ class HiveClientSuite(version: String) private val testPartitionCount = 3 * 5 * 4 private def init(tryDirectSql: Boolean): HiveClient = { + val location = Some(Utils.createTempDir().toURI) val storageFormat = CatalogStorageFormat( - locationUri = None, + locationUri = location, inputFormat = None, outputFormat = None, serde = None, @@ -54,11 +56,11 @@ class HiveClientSuite(version: String) new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string") val table = CatalogTable( identifier = TableIdentifier("test", Some("default")), - tableType = CatalogTableType.MANAGED, + tableType = CatalogTableType.EXTERNAL, schema = tableSchema, partitionColumnNames = Seq("ds", "h", "chunk"), storage = CatalogStorageFormat( - locationUri = None, + locationUri = location, inputFormat = Some(classOf[TextInputFormat].getName), outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), serde = Some(classOf[LazySimpleSerDe].getName()),