Skip to content

Commit 872bad1

Browse files
dilipbiswalgatorsmile
authored andcommitted
[SPARK-25267][SQL][TEST] Disable ConvertToLocalRelation in the test cases of sql/core and sql/hive
## What changes were proposed in this pull request? In SharedSparkSession and TestHive, we need to disable the rule ConvertToLocalRelation for better test case coverage. ## How was this patch tested? Identify the failures after excluding "ConvertToLocalRelation" rule. Closes #22270 from dilipbiswal/SPARK-25267-final. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit 6d7bc5a) Signed-off-by: gatorsmile <[email protected]>
1 parent f9b476c commit 872bad1

File tree

8 files changed

+39
-14
lines changed

8 files changed

+39
-14
lines changed

mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ import java.io.File
2121

2222
import org.scalatest.Suite
2323

24-
import org.apache.spark.SparkContext
24+
import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext}
2525
import org.apache.spark.ml.{PredictionModel, Transformer}
2626
import org.apache.spark.ml.linalg.Vector
2727
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
2828
import org.apache.spark.sql.execution.streaming.MemoryStream
2929
import org.apache.spark.sql.functions.col
30+
import org.apache.spark.sql.internal.SQLConf
3031
import org.apache.spark.sql.streaming.StreamTest
3132
import org.apache.spark.sql.test.TestSparkSession
3233
import org.apache.spark.util.Utils
@@ -36,6 +37,13 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite =>
3637
@transient var sc: SparkContext = _
3738
@transient var checkpointDir: String = _
3839

40+
protected override def sparkConf = {
41+
new SparkConf()
42+
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
43+
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
44+
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
45+
}
46+
3947
protected override def createSparkSession: TestSparkSession = {
4048
new TestSparkSession(new SparkContext("local[2]", "MLlibUnitTest", sparkConf))
4149
}

sql/core/src/test/resources/sql-tests/inputs/group-by-ordinal.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ select a, b, sum(b) from data group by 3;
3838
select a, b, sum(b) + 2 from data group by 3;
3939

4040
-- negative case: nondeterministic expression
41-
select a, rand(0), sum(b) from data group by a, 2;
41+
select a, rand(0), sum(b)
42+
from
43+
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2;
4244

4345
-- negative case: star
4446
select * from data group by a, b, 1;

sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ aggregate functions are not allowed in GROUP BY, but found (sum(CAST(data.`b` AS
135135

136136

137137
-- !query 13
138-
select a, rand(0), sum(b) from data group by a, 2
138+
select a, rand(0), sum(b)
139+
from
140+
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2
139141
-- !query 13 schema
140142
struct<a:int,rand(0):double,sum(b):bigint>
141143
-- !query 13 output

sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
558558

559559
test("SPARK-18004 limit + aggregates") {
560560
withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
561-
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
561+
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value").repartition(1)
562562
val limit2Df = df.limit(2)
563563
checkAnswer(
564564
limit2Df.groupBy("id").count().select($"id"),

sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,16 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
8585
}
8686

8787
val df5 = Seq((Seq("a", null), Seq(1, 2))).toDF("k", "v")
88-
intercept[RuntimeException] {
88+
val msg1 = intercept[Exception] {
8989
df5.select(map_from_arrays($"k", $"v")).collect
90-
}
90+
}.getMessage
91+
assert(msg1.contains("Cannot use null as map key!"))
9192

9293
val df6 = Seq((Seq(1, 2), Seq("a"))).toDF("k", "v")
93-
intercept[RuntimeException] {
94+
val msg2 = intercept[Exception] {
9495
df6.select(map_from_arrays($"k", $"v")).collect
95-
}
96+
}.getMessage
97+
assert(msg2.contains("The given two arrays should have the same length"))
9698
}
9799

98100
test("struct with column name") {
@@ -2377,7 +2379,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
23772379
assert(ex2.getMessage.contains(
23782380
"The number of lambda function arguments '3' does not match"))
23792381

2380-
val ex3 = intercept[RuntimeException] {
2382+
val ex3 = intercept[Exception] {
23812383
dfExample1.selectExpr("transform_keys(i, (k, v) -> v)").show()
23822384
}
23832385
assert(ex3.getMessage.contains("Cannot use null as map key!"))
@@ -2697,7 +2699,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
26972699

26982700
test("SPARK-24734: Fix containsNull of Concat for array type") {
26992701
val df = Seq((Seq(1), Seq[Integer](null), Seq("a", "b"))).toDF("k1", "k2", "v")
2700-
val ex = intercept[RuntimeException] {
2702+
val ex = intercept[Exception] {
27012703
df.select(map_from_arrays(concat($"k1", $"k2"), $"v")).show()
27022704
}
27032705
assert(ex.getMessage.contains("Cannot use null as map key"))

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContex
4040
import org.apache.spark.sql.test.SQLTestData.{NullInts, NullStrings, TestData2}
4141
import org.apache.spark.sql.types._
4242
import org.apache.spark.util.Utils
43+
import org.apache.spark.util.random.XORShiftRandom
4344

4445
class DataFrameSuite extends QueryTest with SharedSQLContext {
4546
import testImplicits._
@@ -1729,10 +1730,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
17291730
}
17301731

17311732
test("SPARK-9083: sort with non-deterministic expressions") {
1732-
import org.apache.spark.util.random.XORShiftRandom
1733-
17341733
val seed = 33
1735-
val df = (1 to 100).map(Tuple1.apply).toDF("i")
1734+
val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1)
17361735
val random = new XORShiftRandom(seed)
17371736
val expected = (1 to 100).map(_ -> random.nextDouble()).sortBy(_._2).map(_._1)
17381737
val actual = df.sort(rand(seed)).collect().map(_.getInt(0))

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
2424

2525
import org.apache.spark.{DebugFilesystem, SparkConf}
2626
import org.apache.spark.sql.{SparkSession, SQLContext}
27+
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
2728
import org.apache.spark.sql.internal.SQLConf
2829

2930
/**
@@ -39,6 +40,11 @@ trait SharedSparkSession
3940
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
4041
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
4142
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
43+
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
44+
// LocalRelation will exercise the optimization rules better by disabling it as
45+
// this rule may potentially block testing of other optimization rules such as
46+
// ConstantPropagation etc.
47+
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
4248
}
4349

4450
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
3636
import org.apache.spark.sql.{SparkSession, SQLContext}
3737
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3838
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener}
39+
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
3940
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
4041
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
4142
import org.apache.spark.sql.execution.command.CacheTableCommand
@@ -59,7 +60,12 @@ object TestHive
5960
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
6061
// SPARK-8910
6162
.set("spark.ui.enabled", "false")
62-
.set("spark.unsafe.exceptionOnMemoryLeak", "true")))
63+
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
64+
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
65+
// LocalRelation will exercise the optimization rules better by disabling it as
66+
// this rule may potentially block testing of other optimization rules such as
67+
// ConstantPropagation etc.
68+
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)))
6369

6470

6571
case class TestHiveVersion(hiveClient: HiveClient)

0 commit comments

Comments
 (0)