Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,26 @@ jobs:
- name: Generate TPC-DS (SF=1) table data
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
run: build/sbt "sql/test:runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite"
- name: Run TPC-DS queries
- name: Run TPC-DS queries (Sort merge join)
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"
env:
SPARK_TPCDS_JOIN_CONF: |
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.join.preferSortMergeJoin=true
- name: Run TPC-DS queries (Broadcast hash join)
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"
env:
SPARK_TPCDS_JOIN_CONF: |
spark.sql.autoBroadcastJoinThreshold=10485760
- name: Run TPC-DS queries (Shuffled hash join)
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"
env:
SPARK_TPCDS_JOIN_CONF: |
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.join.forceApplyShuffledHashJoin=true
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand All @@ -695,7 +712,7 @@ jobs:
docker-integration-tests:
needs: configure-jobs
if: needs.configure-jobs.outputs.type == 'regular'
name: Run docker integration tests
name: Run Docker integration tests
runs-on: ubuntu-20.04
env:
HADOOP_PROFILE: ${{ needs.configure-jobs.outputs.hadoop }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql
import java.io.File
import java.nio.file.{Files, Paths}

import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -100,9 +102,9 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
private def runQuery(
query: String,
goldenFile: File,
conf: Seq[(String, String)],
needSort: Boolean): Unit = {
withSQLConf(conf: _*) {
conf: Map[String, String]): Unit = {
val shouldSortResults = sortMergeJoinConf != conf // Sort for other joins
withSQLConf(conf.toSeq: _*) {
try {
val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
val queryString = query.trim
Expand Down Expand Up @@ -139,7 +141,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
assertResult(expectedSchema, s"Schema did not match\n$queryString") {
schema
}
if (needSort) {
if (shouldSortResults) {
val expectSorted = expectedOutput.split("\n").sorted.map(_.trim)
.mkString("\n").replaceAll("\\s+$", "")
val outputSorted = output.sorted.map(_.trim).mkString("\n").replaceAll("\\s+$", "")
Expand Down Expand Up @@ -171,22 +173,36 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
"spark.sql.join.forceApplyShuffledHashJoin" -> "true")

val joinConfSet: Set[Map[String, String]] =
Set(sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf);
val allJoinConfCombinations = Seq(
sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf)

val joinConfs: Seq[Map[String, String]] = if (regenerateGoldenFiles) {
require(
!sys.env.contains("SPARK_TPCDS_JOIN_CONF"),
"'SPARK_TPCDS_JOIN_CONF' cannot be set together with 'SPARK_GENERATE_GOLDEN_FILES'")
Seq(sortMergeJoinConf)
} else {
sys.env.get("SPARK_TPCDS_JOIN_CONF").map { s =>
val p = new java.util.Properties()
p.load(new java.io.StringReader(s))
Seq(p.asScala.toMap)
}.getOrElse(allJoinConfCombinations)
}

assert(joinConfs.nonEmpty)
joinConfs.foreach(conf => require(
allJoinConfCombinations.contains(conf),
s"Join configurations [$conf] should be one of $allJoinConfCombinations"))

if (tpcdsDataPath.nonEmpty) {
tpcdsQueries.foreach { name =>
val queryString = resourceToString(s"tpcds/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
test(name) {
val goldenFile = new File(s"$baseResourcePath/v1_4", s"$name.sql.out")
System.gc() // Workaround for GitHub Actions memory limitation, see also SPARK-37368
runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false)
if (!regenerateGoldenFiles) {
joinConfSet.tail.foreach { conf =>
System.gc() // SPARK-37368
runQuery(queryString, goldenFile, conf.toSeq, true)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I am here, I ended up with refactoring this code part here. No behavior change. Everything is as was.

joinConfs.foreach { conf =>
System.gc() // Workaround for GitHub Actions memory limitation, see also SPARK-37368
runQuery(queryString, goldenFile, conf)
}
}
}
Expand All @@ -196,13 +212,9 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
classLoader = Thread.currentThread().getContextClassLoader)
test(s"$name-v2.7") {
val goldenFile = new File(s"$baseResourcePath/v2_7", s"$name.sql.out")
System.gc() // SPARK-37368
runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false)
if (!regenerateGoldenFiles) {
joinConfSet.tail.foreach { conf =>
System.gc() // SPARK-37368
runQuery(queryString, goldenFile, conf.toSeq, true)
}
joinConfs.foreach { conf =>
System.gc() // SPARK-37368
runQuery(queryString, goldenFile, conf)
}
}
}
Expand Down