Skip to content

Commit 0d26cfc

Browse files
gengliangwanggatorsmile
authored andcommitted
[SPARK-27460][TESTS][2.4] Running slowest test suites in their own forked JVMs for higher parallelism
## What changes were proposed in this pull request? This is a backport of #24373 , #24404 and #24434 This patch modifies SparkBuild so that the largest / slowest test suites (or collections of suites) can run in their own forked JVMs, allowing them to be run in parallel with each other. This opt-in / whitelisting approach allows us to increase parallelism without having to fix a long-tail of flakiness / brittleness issues in tests which aren't performance bottlenecks. See comments in SparkBuild.scala for information on the details, including a summary of why we sometimes opt to run entire groups of tests in a single forked JVM . The time of full new pull request test in Jenkins is reduced by around 53%: before changes: 4hr 40min after changes: 2hr 13min ## How was this patch tested? Unit test Closes #25861 from dongjoon-hyun/SPARK-27460. Lead-authored-by: Gengliang Wang <[email protected]> Co-authored-by: gatorsmile <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 026e789 commit 0d26cfc

File tree

17 files changed

+202
-44
lines changed

17 files changed

+202
-44
lines changed

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121

2222
import org.mockito.Matchers.{any, eq => meq}
2323
import org.mockito.Mockito.{mock, never, verify, when}
24-
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
24+
import org.scalatest.PrivateMethodTester
2525

2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.internal.config
@@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock
3737
*/
3838
class ExecutorAllocationManagerSuite
3939
extends SparkFunSuite
40-
with LocalSparkContext
41-
with BeforeAndAfter {
40+
with LocalSparkContext {
4241

4342
import ExecutorAllocationManager._
4443
import ExecutorAllocationManagerSuite._
4544

4645
private val contexts = new mutable.ListBuffer[SparkContext]()
4746

48-
before {
47+
override def beforeEach(): Unit = {
48+
super.beforeEach()
4949
contexts.clear()
5050
}
5151

52-
after {
53-
contexts.foreach(_.stop())
52+
override def afterEach(): Unit = {
53+
try {
54+
contexts.foreach(_.stop())
55+
} finally {
56+
super.afterEach()
57+
}
5458
}
5559

5660
private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
@@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite
281285
assert(totalRunningTasks(manager) === 0)
282286
}
283287

284-
test("cancel pending executors when no longer needed") {
288+
testRetry("cancel pending executors when no longer needed") {
285289
sc = createSparkContext(0, 10, 0)
286290
val manager = sc.executorAllocationManager.get
287291
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))

core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.spark
1919

20+
import scala.concurrent.duration._
21+
2022
import org.scalatest.Assertions
23+
import org.scalatest.concurrent.Eventually._
2124

2225
import org.apache.spark.storage.StorageLevel
2326

@@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext {
5861
test("getRDDStorageInfo only reports on RDDs that actually persist data") {
5962
sc = new SparkContext("local", "test")
6063
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
61-
assert(sc.getRDDStorageInfo.size === 0)
64+
assert(sc.getRDDStorageInfo.length === 0)
6265
rdd.collect()
6366
sc.listenerBus.waitUntilEmpty(10000)
64-
assert(sc.getRDDStorageInfo.size === 1)
67+
eventually(timeout(10.seconds), interval(100.milliseconds)) {
68+
assert(sc.getRDDStorageInfo.length === 1)
69+
}
6570
assert(sc.getRDDStorageInfo.head.isCached)
6671
assert(sc.getRDDStorageInfo.head.memSize > 0)
6772
assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)

core/src/test/scala/org/apache/spark/SparkFunSuite.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark
2020
// scalastyle:off
2121
import java.io.File
2222

23-
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
23+
import scala.annotation.tailrec
24+
25+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome}
2426

2527
import org.apache.spark.internal.Logging
2628
import org.apache.spark.util.AccumulatorContext
@@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext
5254
abstract class SparkFunSuite
5355
extends FunSuite
5456
with BeforeAndAfterAll
57+
with BeforeAndAfterEach
5558
with ThreadAudit
5659
with Logging {
5760
// scalastyle:on
@@ -87,6 +90,47 @@ abstract class SparkFunSuite
8790
getTestResourceFile(file).getCanonicalPath
8891
}
8992

93+
/**
94+
* Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to
95+
* set up and tear down resources.
96+
*/
97+
def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = {
98+
test(s) {
99+
retry(n) {
100+
body
101+
}
102+
}
103+
}
104+
105+
/**
106+
* Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to
107+
* set up and tear down resources.
108+
*/
109+
def retry[T](n: Int)(body: => T): T = {
110+
if (this.isInstanceOf[BeforeAndAfter]) {
111+
throw new UnsupportedOperationException(
112+
s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " +
113+
s"Please use ${classOf[BeforeAndAfterEach]} instead.")
114+
}
115+
retry0(n, n)(body)
116+
}
117+
118+
@tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = {
119+
try body
120+
catch { case e: Throwable =>
121+
if (n > 0) {
122+
logWarning(e.getMessage, e)
123+
logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n")
124+
// Reset state before re-attempting in order so that tests which use patterns like
125+
// LocalSparkContext to clean up state can work correctly when retried.
126+
afterEach()
127+
beforeEach()
128+
retry0(n-1, n0)(body)
129+
}
130+
else throw e
131+
}
132+
}
133+
90134
/**
91135
* Log the suite name and the test name before and after each test.
92136
*

core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._
2828

2929
class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext {
3030

31-
test("basic status API usage") {
31+
testRetry("basic status API usage") {
3232
sc = new SparkContext("local", "test", new SparkConf(false))
3333
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
3434
val jobId: Int = eventually(timeout(10 seconds)) {

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._
3434
import org.mockito.ArgumentMatcher
3535
import org.mockito.Matchers.{any, argThat}
3636
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
37-
import org.scalatest.BeforeAndAfter
3837
import org.scalatest.Matchers
3938
import org.scalatest.concurrent.Eventually._
4039

@@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore
4847
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
4948
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
5049

51-
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
50+
class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
5251

5352
private var testDir: File = null
5453

55-
before {
54+
override def beforeEach(): Unit = {
55+
super.beforeEach()
5656
testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
5757
}
5858

59-
after {
60-
Utils.deleteRecursively(testDir)
59+
override def afterEach(): Unit = {
60+
try {
61+
Utils.deleteRecursively(testDir)
62+
} finally {
63+
super.afterEach()
64+
}
6165
}
6266

6367
/** Create a fake log file using the new log format used in Spark 1.3+ */
@@ -487,15 +491,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
487491
provider.inSafeMode = false
488492
clock.setTime(10000)
489493

490-
eventually(timeout(1 second), interval(10 millis)) {
494+
eventually(timeout(3.second), interval(10.milliseconds)) {
491495
provider.getConfig().keys should not contain ("HDFS State")
492496
}
493497
} finally {
494498
provider.stop()
495499
}
496500
}
497501

498-
test("provider reports error after FS leaves safe mode") {
502+
testRetry("provider reports error after FS leaves safe mode") {
499503
testDir.delete()
500504
val clock = new ManualClock()
501505
val provider = new SafeModeTestProvider(createTestConf(), clock)
@@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
505509
provider.inSafeMode = false
506510
clock.setTime(10000)
507511

508-
eventually(timeout(1 second), interval(10 millis)) {
512+
eventually(timeout(3.second), interval(10.milliseconds)) {
509513
verify(errorHandler).uncaughtException(any(), any())
510514
}
511515
} finally {

core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,23 @@ package org.apache.spark.scheduler
1919

2020
import scala.collection.mutable
2121

22-
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
23-
2422
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
2523
import org.apache.spark.scheduler.cluster.ExecutorInfo
2624

2725
/**
2826
* Unit tests for SparkListener that require a local cluster.
2927
*/
30-
class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
31-
with BeforeAndAfter with BeforeAndAfterAll {
28+
class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext {
3229

3330
/** Length of time to wait while draining listener events. */
3431
val WAIT_TIMEOUT_MILLIS = 10000
3532

36-
before {
33+
override def beforeEach(): Unit = {
34+
super.beforeEach()
3735
sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
3836
}
3937

40-
test("SparkListener sends executor added message") {
38+
testRetry("SparkListener sends executor added message") {
4139
val listener = new SaveExecutorInfo
4240
sc.addSparkListener(listener)
4341

project/SparkBuild.scala

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,84 @@ object SparkBuild extends PomBuild {
430430
else x.settings(Seq[Setting[_]](): _*)
431431
} ++ Seq[Project](OldDeps.project)
432432
}
433+
434+
if (!sys.env.contains("SERIAL_SBT_TESTS")) {
435+
allProjects.foreach(enable(SparkParallelTestGrouping.settings))
436+
}
437+
}
438+
439+
object SparkParallelTestGrouping {
440+
// Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or
441+
// collections of suites) in their own forked JVMs, allowing us to gain parallelism within a
442+
// SBT project. Here, we take a whitelisting approach where the default behavior is to run all
443+
// tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism.
444+
//
445+
// There are a reasons why such a whitelist approach is good:
446+
//
447+
// 1. Launching one JVM per suite adds significant overhead for short-running suites. In
448+
// addition to JVM startup time and JIT warmup, it appears that initialization of Derby
449+
// metastores can be very slow so creating a fresh warehouse per suite is inefficient.
450+
//
451+
// 2. When parallelizing within a project we need to give each forked JVM a different tmpdir
452+
// so that the metastore warehouses do not collide. Unfortunately, it seems that there are
453+
// some tests which have an overly tight dependency on the default tmpdir, so those fragile
454+
// tests need to continue re-running in the default configuration (or need to be rewritten).
455+
// Fixing that problem would be a huge amount of work for limited payoff in most cases
456+
// because most test suites are short-running.
457+
//
458+
459+
private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
460+
"org.apache.spark.DistributedSuite",
461+
"org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
462+
"org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
463+
"org.apache.spark.sql.catalyst.expressions.CastSuite",
464+
"org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
465+
"org.apache.spark.sql.hive.HiveExternalCatalogSuite",
466+
"org.apache.spark.sql.hive.StatisticsSuite",
467+
"org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
468+
"org.apache.spark.sql.hive.client.VersionsSuite",
469+
"org.apache.spark.sql.hive.client.HiveClientVersions",
470+
"org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
471+
"org.apache.spark.ml.classification.LogisticRegressionSuite",
472+
"org.apache.spark.ml.classification.LinearSVCSuite",
473+
"org.apache.spark.sql.SQLQueryTestSuite"
474+
)
475+
476+
private val DEFAULT_TEST_GROUP = "default_test_group"
477+
478+
private def testNameToTestGroup(name: String): String = name match {
479+
case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
480+
case _ => DEFAULT_TEST_GROUP
481+
}
482+
483+
lazy val settings = Seq(
484+
testGrouping in Test := {
485+
val tests: Seq[TestDefinition] = (definedTests in Test).value
486+
val defaultForkOptions = ForkOptions(
487+
bootJars = Nil,
488+
javaHome = javaHome.value,
489+
connectInput = connectInput.value,
490+
outputStrategy = outputStrategy.value,
491+
runJVMOptions = (javaOptions in Test).value,
492+
workingDirectory = Some(baseDirectory.value),
493+
envVars = (envVars in Test).value
494+
)
495+
tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) =>
496+
val forkOptions = {
497+
if (groupName == DEFAULT_TEST_GROUP) {
498+
defaultForkOptions
499+
} else {
500+
defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++
501+
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
502+
}
503+
}
504+
new Tests.Group(
505+
name = groupName,
506+
tests = groupTests,
507+
runPolicy = Tests.SubProcess(forkOptions))
508+
}
509+
}.toSeq
510+
)
433511
}
434512

435513
object Core {
@@ -844,8 +922,14 @@ object TestSettings {
844922
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
845923
// Enable Junit testing.
846924
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test",
847-
// Only allow one test at a time, even across projects, since they run in the same JVM
848-
parallelExecution in Test := false,
925+
// `parallelExecutionInTest` controls whether test suites belonging to the same SBT project
926+
// can run in parallel with one another. It does NOT control whether tests execute in parallel
927+
// within the same JVM (which is controlled by `testForkedParallel`) or whether test cases
928+
// within the same suite can run in parallel (which is a ScalaTest runner option which is passed
929+
// to the underlying runner but is not a SBT-level configuration). This needs to be `true` in
930+
// order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect.
931+
// The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged.
932+
parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true },
849933
// Make sure the test temp directory exists.
850934
resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File =>
851935
var dir = new File(testTempDir)
@@ -866,7 +950,12 @@ object TestSettings {
866950
}
867951
Seq.empty[File]
868952
}).value,
869-
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
953+
concurrentRestrictions in Global := {
954+
// The number of concurrent test groups is empirically chosen based on experience
955+
// with Jenkins flakiness.
956+
if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value
957+
else Seq(Tags.limit(Tags.ForkedTestGroup, 4))
958+
}
870959
)
871960

872961
}

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite
169169

170170
val handle = launcher.startApplication()
171171
try {
172-
eventually(timeout(2 minutes), interval(1 second)) {
172+
eventually(timeout(3.minutes), interval(1.second)) {
173173
assert(handle.getState().isFinal())
174174
}
175175
} finally {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,15 +202,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
202202
.startApplication()
203203

204204
try {
205-
eventually(timeout(30 seconds), interval(100 millis)) {
205+
eventually(timeout(3.minutes), interval(100.milliseconds)) {
206206
handle.getState() should be (SparkAppHandle.State.RUNNING)
207207
}
208208

209209
handle.getAppId() should not be (null)
210210
handle.getAppId() should startWith ("application_")
211211
handle.stop()
212212

213-
eventually(timeout(30 seconds), interval(100 millis)) {
213+
eventually(timeout(3.minutes), interval(100.milliseconds)) {
214214
handle.getState() should be (SparkAppHandle.State.KILLED)
215215
}
216216
} finally {

sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
267267
val df = session.sql(sql)
268268
val schema = df.schema
269269
val notIncludedMsg = "[not included in comparison]"
270+
val clsName = this.getClass.getCanonicalName
270271
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
271272
val answer = df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x")
272-
.replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/")
273+
.replaceAll(
274+
s"Location.*/sql/core/spark-warehouse/$clsName/",
275+
s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
273276
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
274277
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
275278
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")

0 commit comments

Comments
 (0)