diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2036dc011a14..8e971974c35e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -430,6 +430,84 @@ object SparkBuild extends PomBuild { else x.settings(Seq[Setting[_]](): _*) } ++ Seq[Project](OldDeps.project) } + + if (!sys.env.contains("SERIAL_SBT_TESTS")) { + allProjects.foreach(enable(SparkParallelTestGrouping.settings)) + } +} + +object SparkParallelTestGrouping { + // Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or + // collections of suites) in their own forked JVMs, allowing us to gain parallelism within a + // SBT project. Here, we take a whitelisting approach where the default behavior is to run all + // tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism. + // + // There are a reasons why such a whitelist approach is good: + // + // 1. Launching one JVM per suite adds significant overhead for short-running suites. In + // addition to JVM startup time and JIT warmup, it appears that initialization of Derby + // metastores can be very slow so creating a fresh warehouse per suite is inefficient. + // + // 2. When parallelizing within a project we need to give each forked JVM a different tmpdir + // so that the metastore warehouses do not collide. Unfortunately, it seems that there are + // some tests which have an overly tight dependency on the default tmpdir, so those fragile + // tests need to continue re-running in the default configuration (or need to be rewritten). + // Fixing that problem would be a huge amount of work for limited payoff in most cases + // because most test suites are short-running. + // + + private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( + "org.apache.spark.DistributedSuite", + "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.CastSuite", + "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogSuite", + "org.apache.spark.sql.hive.StatisticsSuite", + "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", + "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.client.HiveClientVersions", + "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", + "org.apache.spark.ml.classification.LogisticRegressionSuite", + "org.apache.spark.ml.classification.LinearSVCSuite", + "org.apache.spark.sql.SQLQueryTestSuite" + ) + + private val DEFAULT_TEST_GROUP = "default_test_group" + + private def testNameToTestGroup(name: String): String = name match { + case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name + case _ => DEFAULT_TEST_GROUP + } + + lazy val settings = Seq( + testGrouping in Test := { + val tests: Seq[TestDefinition] = (definedTests in Test).value + val defaultForkOptions = ForkOptions( + bootJars = Nil, + javaHome = javaHome.value, + connectInput = connectInput.value, + outputStrategy = outputStrategy.value, + runJVMOptions = (javaOptions in Test).value, + workingDirectory = Some(baseDirectory.value), + envVars = (envVars in Test).value + ) + tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => + val forkOptions = { + if (groupName == DEFAULT_TEST_GROUP) { + defaultForkOptions + } else { + defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) + } + } + new Tests.Group( + name = groupName, + tests = groupTests, + runPolicy = Tests.SubProcess(forkOptions)) + } + }.toSeq + ) } object Core { @@ -910,8 +988,14 @@ object TestSettings { testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), // Enable Junit testing. libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test", - // Only allow one test at a time, even across projects, since they run in the same JVM - parallelExecution in Test := false, + // `parallelExecutionInTest` controls whether test suites belonging to the same SBT project + // can run in parallel with one another. It does NOT control whether tests execute in parallel + // within the same JVM (which is controlled by `testForkedParallel`) or whether test cases + // within the same suite can run in parallel (which is a ScalaTest runner option which is passed + // to the underlying runner but is not a SBT-level configuration). This needs to be `true` in + // order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect. + // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged. + parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true }, // Make sure the test temp directory exists. resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File => var dir = new File(testTempDir) @@ -932,7 +1016,12 @@ object TestSettings { } Seq.empty[File] }).value, - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) + concurrentRestrictions in Global := { + // The number of concurrent test groups is empirically chosen based on experience + // with Jenkins flakiness. + if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value + else Seq(Tags.limit(Tags.ForkedTestGroup, 4)) + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index def99c820af8..d1842d329cc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -291,10 +291,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val df = session.sql(sql) val schema = df.schema val notIncludedMsg = "[not included in comparison]" + val clsName = this.getClass.getCanonicalName // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = hiveResultString(df.queryExecution.executedPlan) .map(_.replaceAll("#\\d+", "#x") - .replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/") + .replaceAll( + s"Location.*/sql/core/spark-warehouse/$clsName/", + s"Location ${notIncludedMsg}sql/core/spark-warehouse/") .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index 8734639edaa6..ff6211b95042 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -26,7 +26,7 @@ import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -37,7 +37,7 @@ trait SharedSparkSession with Eventually { self: Suite => protected def sparkConf = { - new SparkConf() + val conf = new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -46,6 +46,9 @@ trait SharedSparkSession // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + conf.set( + StaticSQLConf.WAREHOUSE_PATH, + conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index f3d8c2ad440f..57a02aa5e0c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType} +import org.apache.spark.util.Utils // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -39,8 +40,9 @@ class HiveClientSuite(version: String) private val testPartitionCount = 3 * 5 * 4 private def init(tryDirectSql: Boolean): HiveClient = { + val location = Some(Utils.createTempDir().toURI) val storageFormat = CatalogStorageFormat( - locationUri = None, + locationUri = location, inputFormat = None, outputFormat = None, serde = None, @@ -54,11 +56,11 @@ class HiveClientSuite(version: String) new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string") val table = CatalogTable( identifier = TableIdentifier("test", Some("default")), - tableType = CatalogTableType.MANAGED, + tableType = CatalogTableType.EXTERNAL, schema = tableSchema, partitionColumnNames = Seq("ds", "h", "chunk"), storage = CatalogStorageFormat( - locationUri = None, + locationUri = location, inputFormat = Some(classOf[TextInputFormat].getName), outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), serde = Some(classOf[LazySimpleSerDe].getName()),