Skip to content

Commit 2600785

Browse files
committed
parallelize tests
1 parent 0bb716b commit 2600785

File tree

3 files changed

+98
-6
lines changed

3 files changed

+98
-6
lines changed

project/SparkBuild.scala

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,81 @@ object SparkBuild extends PomBuild {
430430
else x.settings(Seq[Setting[_]](): _*)
431431
} ++ Seq[Project](OldDeps.project)
432432
}
433+
434+
if (!sys.env.contains("SERIAL_SBT_TESTS")) {
435+
allProjects.foreach(enable(SparkParallelTestGrouping.settings))
436+
}
437+
}
438+
439+
object SparkParallelTestGrouping {
440+
// Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or
441+
// collections of suites) in their own forked JVMs, allowing us to gain parallelism within a
442+
// SBT project. Here, we take a whitelisting approach where the default behavior is to run all
443+
// tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism.
444+
//
445+
// There are a reasons why such a whitelist approach is good:
446+
//
447+
// 1. Launching one JVM per suite adds significant overhead for short-running suites. In
448+
// addition to JVM startup time and JIT warmup, it appears that initialization of Derby
449+
// metastores can be very slow so creating a fresh warehouse per suite is inefficient.
450+
//
451+
// 2. When parallelizing within a project we need to give each forked JVM a different tmpdir
452+
// so that the metastore warehouses do not collide. Unfortunately, it seems that there are
453+
// some tests which have an overly tight dependency on the default tmpdir, so those fragile
454+
// tests need to continue re-running in the default configuration (or need to be rewritten).
455+
// Fixing that problem would be a huge amount of work for limited payoff in most cases
456+
// because most test suites are short-running.
457+
//
458+
459+
private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
460+
"org.apache.spark.DistributedSuite",
461+
"org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
462+
"org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
463+
"org.apache.spark.sql.catalyst.expressions.CastSuite",
464+
"org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
465+
"org.apache.spark.sql.hive.StatisticsSuite",
466+
"org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
467+
"org.apache.spark.sql.hive.execution.HashAggregationQueryWithControlledFallbackSuite",
468+
"org.apache.spark.sql.hive.client.VersionsSuite",
469+
"org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
470+
"org.apache.spark.sql.SQLQueryTestSuite"
471+
)
472+
473+
private val DEFAULT_TEST_GROUP = "default_test_group"
474+
475+
private def testNameToTestGroup(name: String): String = name match {
476+
case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
477+
case _ => DEFAULT_TEST_GROUP
478+
}
479+
480+
lazy val settings = Seq(
481+
testGrouping in Test := {
482+
val tests: Seq[TestDefinition] = (definedTests in Test).value
483+
val defaultForkOptions = ForkOptions(
484+
bootJars = Nil,
485+
javaHome = javaHome.value,
486+
connectInput = connectInput.value,
487+
outputStrategy = outputStrategy.value,
488+
runJVMOptions = (javaOptions in Test).value,
489+
workingDirectory = Some(baseDirectory.value),
490+
envVars = (envVars in Test).value
491+
)
492+
tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) =>
493+
val forkOptions = {
494+
if (groupName == DEFAULT_TEST_GROUP) {
495+
defaultForkOptions
496+
} else {
497+
defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++
498+
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
499+
}
500+
}
501+
new Tests.Group(
502+
name = groupName,
503+
tests = groupTests,
504+
runPolicy = Tests.SubProcess(forkOptions))
505+
}
506+
}.toSeq
507+
)
433508
}
434509

435510
object Core {
@@ -910,8 +985,14 @@ object TestSettings {
910985
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
911986
// Enable Junit testing.
912987
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test",
913-
// Only allow one test at a time, even across projects, since they run in the same JVM
914-
parallelExecution in Test := false,
988+
// `parallelExecutionInTest` controls whether test suites belonging to the same SBT project
989+
// can run in parallel with one another. It does NOT control whether tests execute in parallel
990+
// within the same JVM (which is controlled by `testForkedParallel`) or whether test cases
991+
// within the same suite can run in parallel (which is a ScalaTest runner option which is passed
992+
// to the underlying runner but is not a SBT-level configuration). This needs to be `true` in
993+
// order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect.
994+
// The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged.
995+
parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true },
915996
// Make sure the test temp directory exists.
916997
resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File =>
917998
var dir = new File(testTempDir)
@@ -932,7 +1013,11 @@ object TestSettings {
9321013
}
9331014
Seq.empty[File]
9341015
}).value,
935-
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
1016+
concurrentRestrictions in Global := {
1017+
// Value of '8' is empirically chosen based on experience with Jenkins flakiness.
1018+
if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value
1019+
else Seq(Tags.limit(Tags.ForkedTestGroup, 8))
1020+
}
9361021
)
9371022

9381023
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
294294
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
295295
val answer = hiveResultString(df.queryExecution.executedPlan)
296296
.map(_.replaceAll("#\\d+", "#x")
297-
.replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/")
297+
.replaceAll(
298+
// scalastyle:off
299+
"Location.*/sql/core/spark-warehouse/(SQLQueryTestSuite/|SparkServiceSQLQueryTestSuite/)?",
300+
s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
301+
// scalastyle:on
298302
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
299303
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
300304
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.{DebugFilesystem, SparkConf}
2626
import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
2727
import org.apache.spark.sql.{SparkSession, SQLContext}
2828
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
29-
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3030

3131
/**
3232
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
@@ -37,7 +37,7 @@ trait SharedSparkSession
3737
with Eventually { self: Suite =>
3838

3939
protected def sparkConf = {
40-
new SparkConf()
40+
val conf = new SparkConf()
4141
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
4242
.set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
4343
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
@@ -46,6 +46,9 @@ trait SharedSparkSession
4646
// this rule may potentially block testing of other optimization rules such as
4747
// ConstantPropagation etc.
4848
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
49+
conf.set(
50+
StaticSQLConf.WAREHOUSE_PATH,
51+
conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass().getSimpleName)
4952
}
5053

5154
/**

0 commit comments

Comments
 (0)