@@ -97,50 +97,95 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
9797 """ .stripMargin)
9898 }
9999
100- private def runQuery (query : String , goldenFile : File ): Unit = {
101- val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
102- val queryString = query.trim
103- val outputString = output.mkString(" \n " ).replaceAll(" \\ s+$" , " " )
104- if (regenerateGoldenFiles) {
105- val goldenOutput = {
106- s " -- Automatically generated by ${getClass.getSimpleName}\n\n " +
107- s " -- !query schema \n " +
108- schema + " \n " +
109- s " -- !query output \n " +
110- outputString +
111- " \n "
100+ private def runQuery (
101+ query : String ,
102+ goldenFile : File ,
103+ conf : Seq [(String , String )],
104+ needSort : Boolean ): Unit = {
105+ withSQLConf(conf : _* ) {
106+ try {
107+ val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
108+ val queryString = query.trim
109+ val outputString = output.mkString(" \n " ).replaceAll(" \\ s+$" , " " )
110+ if (regenerateGoldenFiles) {
111+ val goldenOutput = {
112+ s " -- Automatically generated by ${getClass.getSimpleName}\n\n " +
113+ s " -- !query schema \n " +
114+ schema + " \n " +
115+ s " -- !query output \n " +
116+ outputString +
117+ " \n "
118+ }
119+ val parent = goldenFile.getParentFile
120+ if (! parent.exists()) {
121+ assert(parent.mkdirs(), " Could not create directory: " + parent)
122+ }
123+ stringToFile(goldenFile, goldenOutput)
124+ }
125+
126+ // Read back the golden file.
127+ val (expectedSchema, expectedOutput) = {
128+ val goldenOutput = fileToString(goldenFile)
129+ val segments = goldenOutput.split(" -- !query.*\n " )
130+
131+ // query has 3 segments, plus the header
132+ assert(segments.size == 3 ,
133+ s " Expected 3 blocks in result file but got ${segments.size}. " +
134+ " Try regenerate the result files." )
135+
136+ (segments(1 ).trim, segments(2 ).replaceAll(" \\ s+$" , " " ))
137+ }
138+
139+ assertResult(expectedSchema, s " Schema did not match \n $queryString" ) {
140+ schema
141+ }
142+ if (needSort) {
143+ val expectSorted = expectedOutput.split(" \n " ).sorted.map(_.trim)
144+ .mkString(" \n " ).replaceAll(" \\ s+$" , " " )
145+ val outputSorted = output.sorted.map(_.trim).mkString(" \n " ).replaceAll(" \\ s+$" , " " )
146+ assertResult(expectSorted, s " Result did not match \n $queryString" ) {
147+ outputSorted
148+ }
149+ } else {
150+ assertResult(expectedOutput, s " Result did not match \n $queryString" ) {
151+ outputString
152+ }
153+ }
154+ } catch {
155+ case e : Throwable =>
156+ val configs = conf.map {
157+ case (k, v) => s " $k= $v"
158+ }
159+ throw new Exception (s " ${e.getMessage}\n Error using configs: \n ${configs.mkString(" \n " )}" )
112160 }
113- val parent = goldenFile.getParentFile
114- if (! parent.exists()) {
115- assert(parent.mkdirs(), " Could not create directory: " + parent)
116- }
117- stringToFile(goldenFile, goldenOutput)
118161 }
162+ }
119163
120- // Read back the golden file.
121- val (expectedSchema, expectedOutput) = {
122- val goldenOutput = fileToString(goldenFile)
123- val segments = goldenOutput.split(" -- !query.*\n " )
164+ val sortMergeJoinConf = Map (
165+ SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " -1" ,
166+ SQLConf .PREFER_SORTMERGEJOIN .key -> " true" )
124167
125- // query has 3 segments, plus the header
126- assert(segments.size == 3 ,
127- s " Expected 3 blocks in result file but got ${segments.size}. " +
128- " Try regenerate the result files." )
168+ val broadcastHashJoinConf = Map (SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " 10485760" )
129169
130- (segments(1 ).trim, segments(2 ).replaceAll(" \\ s+$" , " " ))
131- }
170+ val shuffledHashJoinConf = Map (
171+ SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " -1" ,
172+ " spark.sql.join.forceApplyShuffledHashJoin" -> " true" )
132173
133- assertResult(expectedSchema, s " Schema did not match \n $queryString" ) { schema }
134- assertResult(expectedOutput, s " Result did not match \n $queryString" ) { outputString }
135- }
174+ val joinConfSet : Set [Map [String , String ]] =
175+ Set (sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf);
136176
137177 if (tpcdsDataPath.nonEmpty) {
138178 tpcdsQueries.foreach { name =>
139179 val queryString = resourceToString(s " tpcds/ $name.sql " ,
140180 classLoader = Thread .currentThread().getContextClassLoader)
141181 test(name) {
142182 val goldenFile = new File (s " $baseResourcePath/v1_4 " , s " $name.sql.out " )
143- runQuery(queryString, goldenFile)
183+ runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false )
184+ if (! regenerateGoldenFiles) {
185+ joinConfSet.tail.foreach { conf =>
186+ runQuery(queryString, goldenFile, conf.toSeq, true )
187+ }
188+ }
144189 }
145190 }
146191
@@ -149,7 +194,12 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
149194 classLoader = Thread .currentThread().getContextClassLoader)
150195 test(s " $name-v2.7 " ) {
151196 val goldenFile = new File (s " $baseResourcePath/v2_7 " , s " $name.sql.out " )
152- runQuery(queryString, goldenFile)
197+ runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false )
198+ if (! regenerateGoldenFiles) {
199+ joinConfSet.tail.foreach { conf =>
200+ runQuery(queryString, goldenFile, conf.toSeq, true )
201+ }
202+ }
153203 }
154204 }
155205 } else {
0 commit comments