Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 92 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,84 @@ object SparkBuild extends PomBuild {
else x.settings(Seq[Setting[_]](): _*)
} ++ Seq[Project](OldDeps.project)
}

if (!sys.env.contains("SERIAL_SBT_TESTS")) {
allProjects.foreach(enable(SparkParallelTestGrouping.settings))
}
}

object SparkParallelTestGrouping {
// Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or
// collections of suites) in their own forked JVMs, allowing us to gain parallelism within a
// SBT project. Here, we take a whitelisting approach where the default behavior is to run all
// tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism.
//
// There are a reasons why such a whitelist approach is good:
//
// 1. Launching one JVM per suite adds significant overhead for short-running suites. In
// addition to JVM startup time and JIT warmup, it appears that initialization of Derby
// metastores can be very slow so creating a fresh warehouse per suite is inefficient.
//
// 2. When parallelizing within a project we need to give each forked JVM a different tmpdir
Copy link
Member Author

@gengliangwang gengliangwang Apr 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen It is hard to successfully run test cases in parallel. See the comment here. E.g, it possible that multiple test cases use the same table location spark-warehouse/t for a temporary table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it, but this won't help the Maven build and is kind of brittle. Is it really hard to just set temp dirs differently for different suites?

Can a suite run suites in scalatest? and parallelize suites that way?

Copy link
Member Author

@gengliangwang gengliangwang Apr 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it, but this won't help the Maven build and is kind of brittle.

From the Jenkins log, I can see that we are using SBT for test PRs. And I can see that the test time of this PR is about 106 minutes from the email notification, which is much better now (before changes it takes around 3.5 hours from https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/104580/testReport/history/)

Is it really hard to just set temp dirs differently for different suites?

I think fixing that problem would be a huge amount of work for limited payoff in most cases because most test suites are short-running.

Can a suite run suites in scalatest? and parallelize suites that way?

Do you mean run all test suite in parallel? We can enable parallelExecution in Test (http://www.scalatest.org/user_guide/using_scalatest_with_sbt). But we still face the problem of colliding warehouse paths.

// so that the metastore warehouses do not collide. Unfortunately, it seems that there are
// some tests which have an overly tight dependency on the default tmpdir, so those fragile
// tests need to continue re-running in the default configuration (or need to be rewritten).
// Fixing that problem would be a huge amount of work for limited payoff in most cases
// because most test suites are short-running.
//

private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
"org.apache.spark.DistributedSuite",
"org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
"org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
"org.apache.spark.sql.catalyst.expressions.CastSuite",
"org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
"org.apache.spark.sql.hive.HiveExternalCatalogSuite",
"org.apache.spark.sql.hive.StatisticsSuite",
"org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
"org.apache.spark.sql.hive.client.VersionsSuite",
"org.apache.spark.sql.hive.client.HiveClientVersions",
"org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
"org.apache.spark.ml.classification.LogisticRegressionSuite",
"org.apache.spark.ml.classification.LinearSVCSuite",
"org.apache.spark.sql.SQLQueryTestSuite"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we come up with this list?

Copy link
Member Author

@gengliangwang gengliangwang Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test suites that:

  1. duration > 1min
  2. doesn't collide with other suites in the warehouse locations.

This is a preliminary list.

)

private val DEFAULT_TEST_GROUP = "default_test_group"

private def testNameToTestGroup(name: String): String = name match {
case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
case _ => DEFAULT_TEST_GROUP
}

lazy val settings = Seq(
testGrouping in Test := {
val tests: Seq[TestDefinition] = (definedTests in Test).value
val defaultForkOptions = ForkOptions(
bootJars = Nil,
javaHome = javaHome.value,
connectInput = connectInput.value,
outputStrategy = outputStrategy.value,
runJVMOptions = (javaOptions in Test).value,
workingDirectory = Some(baseDirectory.value),
envVars = (envVars in Test).value
)
tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) =>
val forkOptions = {
if (groupName == DEFAULT_TEST_GROUP) {
defaultForkOptions
} else {
defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
}
}
new Tests.Group(
name = groupName,
tests = groupTests,
runPolicy = Tests.SubProcess(forkOptions))
}
}.toSeq
)
}

object Core {
Expand Down Expand Up @@ -910,8 +988,14 @@ object TestSettings {
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
// Enable Junit testing.
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test",
// Only allow one test at a time, even across projects, since they run in the same JVM
parallelExecution in Test := false,
// `parallelExecutionInTest` controls whether test suites belonging to the same SBT project
// can run in parallel with one another. It does NOT control whether tests execute in parallel
// within the same JVM (which is controlled by `testForkedParallel`) or whether test cases
// within the same suite can run in parallel (which is a ScalaTest runner option which is passed
// to the underlying runner but is not a SBT-level configuration). This needs to be `true` in
// order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect.
// The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged.
parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true },
// Make sure the test temp directory exists.
resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File =>
var dir = new File(testTempDir)
Expand All @@ -932,7 +1016,12 @@ object TestSettings {
}
Seq.empty[File]
}).value,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
concurrentRestrictions in Global := {
// The number of concurrent test groups is empirically chosen based on experience
// with Jenkins flakiness.
if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value
else Seq(Tags.limit(Tags.ForkedTestGroup, 4))
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
val df = session.sql(sql)
val schema = df.schema
val notIncludedMsg = "[not included in comparison]"
val clsName = this.getClass.getCanonicalName
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = hiveResultString(df.queryExecution.executedPlan)
.map(_.replaceAll("#\\d+", "#x")
.replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/")
.replaceAll(
s"Location.*/sql/core/spark-warehouse/$clsName/",
s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
Expand All @@ -37,7 +37,7 @@ trait SharedSparkSession
with Eventually { self: Suite =>

protected def sparkConf = {
new SparkConf()
val conf = new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
Expand All @@ -46,6 +46,9 @@ trait SharedSparkSession
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType}
import org.apache.spark.util.Utils

// TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String)
Expand All @@ -39,8 +40,9 @@ class HiveClientSuite(version: String)
private val testPartitionCount = 3 * 5 * 4

private def init(tryDirectSql: Boolean): HiveClient = {
val location = Some(Utils.createTempDir().toURI)
val storageFormat = CatalogStorageFormat(
locationUri = None,
locationUri = location,
inputFormat = None,
outputFormat = None,
serde = None,
Expand All @@ -54,11 +56,11 @@ class HiveClientSuite(version: String)
new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string")
val table = CatalogTable(
identifier = TableIdentifier("test", Some("default")),
tableType = CatalogTableType.MANAGED,
tableType = CatalogTableType.EXTERNAL,
schema = tableSchema,
partitionColumnNames = Seq("ds", "h", "chunk"),
storage = CatalogStorageFormat(
locationUri = None,
locationUri = location,
inputFormat = Some(classOf[TextInputFormat].getName),
outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
serde = Some(classOf[LazySimpleSerDe].getName()),
Expand Down