diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 209bd26a3fc6..b92389a93b86 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -676,9 +676,26 @@ jobs: - name: Generate TPC-DS (SF=1) table data if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' run: build/sbt "sql/test:runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" - - name: Run TPC-DS queries + - name: Run TPC-DS queries (Sort merge join) run: | SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" + env: + SPARK_TPCDS_JOIN_CONF: | + spark.sql.autoBroadcastJoinThreshold=-1 + spark.sql.join.preferSortMergeJoin=true + - name: Run TPC-DS queries (Broadcast hash join) + run: | + SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" + env: + SPARK_TPCDS_JOIN_CONF: | + spark.sql.autoBroadcastJoinThreshold=10485760 + - name: Run TPC-DS queries (Shuffled hash join) + run: | + SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" + env: + SPARK_TPCDS_JOIN_CONF: | + spark.sql.autoBroadcastJoinThreshold=-1 + spark.sql.join.forceApplyShuffledHashJoin=true - name: Upload test results to report if: always() uses: actions/upload-artifact@v2 @@ -695,7 +712,7 @@ jobs: docker-integration-tests: needs: configure-jobs if: needs.configure-jobs.outputs.type == 'regular' - name: Run docker integration tests + name: Run Docker integration tests runs-on: ubuntu-20.04 env: HADOOP_PROFILE: ${{ needs.configure-jobs.outputs.hadoop }} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala index d776915f3cdd..9ac5fb6d0387 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import java.io.File import java.nio.file.{Files, Paths} +import scala.collection.JavaConverters._ + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf @@ -100,9 +102,9 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp private def runQuery( query: String, goldenFile: File, - conf: Seq[(String, String)], - needSort: Boolean): Unit = { - withSQLConf(conf: _*) { + conf: Map[String, String]): Unit = { + val shouldSortResults = sortMergeJoinConf != conf // Sort for other joins + withSQLConf(conf.toSeq: _*) { try { val (schema, output) = handleExceptions(getNormalizedResult(spark, query)) val queryString = query.trim @@ -139,7 +141,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp assertResult(expectedSchema, s"Schema did not match\n$queryString") { schema } - if (needSort) { + if (shouldSortResults) { val expectSorted = expectedOutput.split("\n").sorted.map(_.trim) .mkString("\n").replaceAll("\\s+$", "") val outputSorted = output.sorted.map(_.trim).mkString("\n").replaceAll("\\s+$", "") @@ -171,8 +173,26 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", "spark.sql.join.forceApplyShuffledHashJoin" -> "true") - val joinConfSet: Set[Map[String, String]] = - Set(sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf); + val allJoinConfCombinations = Seq( + sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf) + + val joinConfs: Seq[Map[String, String]] = if (regenerateGoldenFiles) { + require( + !sys.env.contains("SPARK_TPCDS_JOIN_CONF"), + "'SPARK_TPCDS_JOIN_CONF' cannot be set together with 'SPARK_GENERATE_GOLDEN_FILES'") + Seq(sortMergeJoinConf) + } else { + sys.env.get("SPARK_TPCDS_JOIN_CONF").map { s => + val p = new java.util.Properties() + p.load(new java.io.StringReader(s)) + Seq(p.asScala.toMap) + }.getOrElse(allJoinConfCombinations) + } + + assert(joinConfs.nonEmpty) + joinConfs.foreach(conf => require( + allJoinConfCombinations.contains(conf), + s"Join configurations [$conf] should be one of $allJoinConfCombinations")) if (tpcdsDataPath.nonEmpty) { tpcdsQueries.foreach { name => @@ -180,13 +200,9 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp classLoader = Thread.currentThread().getContextClassLoader) test(name) { val goldenFile = new File(s"$baseResourcePath/v1_4", s"$name.sql.out") - System.gc() // Workaround for GitHub Actions memory limitation, see also SPARK-37368 - runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false) - if (!regenerateGoldenFiles) { - joinConfSet.tail.foreach { conf => - System.gc() // SPARK-37368 - runQuery(queryString, goldenFile, conf.toSeq, true) - } + joinConfs.foreach { conf => + System.gc() // Workaround for GitHub Actions memory limitation, see also SPARK-37368 + runQuery(queryString, goldenFile, conf) } } } @@ -196,13 +212,9 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp classLoader = Thread.currentThread().getContextClassLoader) test(s"$name-v2.7") { val goldenFile = new File(s"$baseResourcePath/v2_7", s"$name.sql.out") - System.gc() // SPARK-37368 - runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false) - if (!regenerateGoldenFiles) { - joinConfSet.tail.foreach { conf => - System.gc() // SPARK-37368 - runQuery(queryString, goldenFile, conf.toSeq, true) - } + joinConfs.foreach { conf => + System.gc() // SPARK-37368 + runQuery(queryString, goldenFile, conf) } } }