Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

test("cache temp table") {
withTempTable("tempTable") {
withTempView("tempTable") {
testData.select('key).createOrReplaceTempView("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
spark.catalog.cacheTable("tempTable")
Expand All @@ -97,7 +97,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

test("cache table as select") {
withTempTable("tempTable") {
withTempView("tempTable") {
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
spark.catalog.uncacheTable("tempTable")
Expand Down Expand Up @@ -227,7 +227,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
withTempTable("testCacheTable") {
withTempView("testCacheTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
assertCached(spark.table("testCacheTable"))

Expand All @@ -244,7 +244,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

test("CACHE TABLE tableName AS SELECT ...") {
withTempTable("testCacheTable") {
withTempView("testCacheTable") {
sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
assertCached(spark.table("testCacheTable"))

Expand Down Expand Up @@ -413,7 +413,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
// Set up two tables distributed in the same way. Try this with the data distributed into
// different number of partitions.
for (numPartitions <- 1 until 10 by 4) {
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
testData.repartition(numPartitions, $"key").createOrReplaceTempView("t1")
testData2.repartition(numPartitions, $"a").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
Expand All @@ -435,7 +435,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

// Distribute the tables into non-matching number of partitions. Need to shuffle one side.
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
testData.repartition(6, $"key").createOrReplaceTempView("t1")
testData2.repartition(3, $"a").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
Expand All @@ -452,7 +452,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}

// One side of join is not partitioned in the desired way. Need to shuffle one side.
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
testData.repartition(6, $"value").createOrReplaceTempView("t1")
testData2.repartition(6, $"a").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
Expand All @@ -468,7 +468,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.catalog.uncacheTable("t2")
}

withTempTable("t1", "t2") {
withTempView("t1", "t2") {
testData.repartition(6, $"value").createOrReplaceTempView("t1")
testData2.repartition(12, $"a").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
Expand All @@ -487,7 +487,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
// One side of join is not partitioned in the desired way. Since the number of partitions of
// the side that has already partitioned is smaller than the side that is not partitioned,
// we shuffle both side.
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
testData.repartition(6, $"value").createOrReplaceTempView("t1")
testData2.repartition(3, $"a").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
Expand All @@ -504,7 +504,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext

// repartition's column ordering is different from group by column ordering.
// But they use the same set of columns.
withTempTable("t1") {
withTempView("t1") {
testData.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")

Expand All @@ -520,7 +520,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
// We will still shuffle because hashcodes of a row depend on the column ordering.
// If we do not shuffle, we may actually partition two tables in totally two different way.
// See PartitioningSuite for more details.
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
val df1 = testData
df1.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
val df2 = testData2.select($"a", $"b".cast("string"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-16337 temporary view refresh") {
withTempTable("view_refresh") { withTempPath { (location: File) =>
withTempView("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
Expand Down
34 changes: 17 additions & 17 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("count of empty table") {
withTempTable("t") {
withTempView("t") {
Seq.empty[(Int, Int)].toDF("a", "b").createOrReplaceTempView("t")
checkAnswer(
sql("select count(a) from t"),
Expand Down Expand Up @@ -1671,7 +1671,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-7952: fix the equality check between boolean and numeric types") {
withTempTable("t") {
withTempView("t") {
// numeric field i, boolean field j, result of i = j, result of i <=> j
Seq[(Integer, java.lang.Boolean, java.lang.Boolean, java.lang.Boolean)](
(1, true, true, true),
Expand All @@ -1691,22 +1691,22 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-7067: order by queries for complex ExtractValue chain") {
withTempTable("t") {
withTempView("t") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}

test("SPARK-8782: ORDER BY NULL") {
withTempTable("t") {
withTempView("t") {
Seq((1, 2), (1, 2)).toDF("a", "b").createOrReplaceTempView("t")
checkAnswer(sql("SELECT * FROM t ORDER BY NULL"), Seq(Row(1, 2), Row(1, 2)))
}
}

test("SPARK-8837: use keyword in column name") {
withTempTable("t") {
withTempView("t") {
val df = Seq(1 -> "a").toDF("count", "sort")
checkAnswer(df.filter("count > 0"), Row(1, "a"))
df.createOrReplaceTempView("t")
Expand Down Expand Up @@ -1820,7 +1820,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-9511: error with table starting with number") {
withTempTable("1one") {
withTempView("1one") {
sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
.toDF("num", "str")
.createOrReplaceTempView("1one")
Expand Down Expand Up @@ -1864,15 +1864,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-10130 type coercion for IF should have children resolved first") {
withTempTable("src") {
withTempView("src") {
Seq((1, 1), (-1, 1)).toDF("key", "value").createOrReplaceTempView("src")
checkAnswer(
sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp"), Seq(Row(1), Row(0)))
}
}

test("SPARK-10389: order by non-attribute grouping expression on Aggregate") {
withTempTable("src") {
withTempView("src") {
Seq((1, 1), (-1, 1)).toDF("key", "value").createOrReplaceTempView("src")
checkAnswer(sql("SELECT MAX(value) FROM src GROUP BY key + 1 ORDER BY key + 1"),
Seq(Row(1), Row(1)))
Expand Down Expand Up @@ -1976,7 +1976,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-11032: resolve having correctly") {
withTempTable("src") {
withTempView("src") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("src")
checkAnswer(
sql("SELECT MIN(t.i) FROM (SELECT * FROM src WHERE i > 0) t HAVING(COUNT(1) > 0)"),
Expand Down Expand Up @@ -2081,7 +2081,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: Row(3, 1) :: Row(3, 2) :: Nil)

// Try with a temporary view
withTempTable("nestedStructTable") {
withTempView("nestedStructTable") {
nestedStructData.createOrReplaceTempView("nestedStructTable")
checkAnswer(
sql("SELECT record.* FROM nestedStructTable"),
Expand All @@ -2104,7 +2104,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
| SELECT struct(`col$.a_`, `a.b.c.`) as `r&&b.c` FROM
| (SELECT struct(a, b) as `col$.a_`, struct(b, a) as `a.b.c.` FROM testData2) tmp
""".stripMargin)
withTempTable("specialCharacterTable") {
withTempView("specialCharacterTable") {
specialCharacterPath.createOrReplaceTempView("specialCharacterTable")
checkAnswer(
specialCharacterPath.select($"`r&&b.c`.*"),
Expand All @@ -2128,7 +2128,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("Struct Star Expansion - Name conflict") {
// Create a data set that contains a naming conflict
val nameConflict = sql("SELECT struct(a, b) as nameConflict, a as a FROM testData2")
withTempTable("nameConflict") {
withTempView("nameConflict") {
nameConflict.createOrReplaceTempView("nameConflict")
// Unqualified should resolve to table.
checkAnswer(sql("SELECT nameConflict.* FROM nameConflict"),
Expand All @@ -2149,7 +2149,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("Star Expansion - table with zero column") {
withTempTable("temp_table_no_cols") {
withTempView("temp_table_no_cols") {
val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.createTempView("temp_table_no_cols")
Expand Down Expand Up @@ -2464,7 +2464,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-13056: Null in map value causes NPE") {
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value")
withTempTable("maptest") {
withTempView("maptest") {
df.createOrReplaceTempView("maptest")
// local optimization will by pass codegen code, so we should keep the filter `key=1`
checkAnswer(sql("SELECT value['abc'] FROM maptest where key = 1"), Row("somestring"))
Expand All @@ -2474,7 +2474,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("hash function") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
withTempTable("tbl") {
withTempView("tbl") {
df.createOrReplaceTempView("tbl")
checkAnswer(
df.select(hash($"i", $"j")),
Expand Down Expand Up @@ -2526,7 +2526,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")
withTempTable("nt1", "nt2") {
withTempView("nt1", "nt2") {
df1.createOrReplaceTempView("nt1")
df2.createOrReplaceTempView("nt2")
checkAnswer(
Expand Down Expand Up @@ -2554,7 +2554,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
("r2c1", "r2c2", "t2r2c3"), ("r3c1y", "r3c2", "t2r3c3")).toDF("c1", "c2", "c3")
val df3 = Seq((null, "r1c2", "t3r1c3"),
("r2c1", "r2c2", "t3r2c3"), ("r3c1y", "r3c2", "t3r3c3")).toDF("c1", "c2", "c3")
withTempTable("t1", "t2", "t3") {
withTempView("t1", "t2", "t3") {
df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
df3.createOrReplaceTempView("t3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}

test("estimates the size of limit") {
withTempTable("test") {
withTempView("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
Expand All @@ -58,7 +58,7 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}

test("estimates the size of a limit 0 on outer join") {
withTempTable("test") {
withTempView("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
val df1 = spark.table("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-15677: Queries against local relations with scalar subquery in Select list") {
withTempTable("t1", "t2") {
withTempView("t1", "t2") {
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")

Expand Down Expand Up @@ -267,7 +267,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-15832: Test embedded existential predicate sub-queries") {
withTempTable("t1", "t2", "t3", "t4", "t5") {
withTempView("t1", "t2", "t3", "t4", "t5") {
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
Seq((1, 1), (2, 2), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class PlannerSuite extends SharedSQLContext {

test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
def checkPlan(fieldTypes: Seq[DataType]): Unit = {
withTempTable("testLimit") {
withTempView("testLimit") {
val fields = fieldTypes.zipWithIndex.map {
case (dataType, index) => StructField(s"c${index}", dataType, true)
} :+ StructField("key", IntegerType, true)
Expand Down Expand Up @@ -131,7 +131,7 @@ class PlannerSuite extends SharedSQLContext {

test("InMemoryRelation statistics propagation") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "81920") {
withTempTable("tiny") {
withTempView("tiny") {
testData.limit(3).createOrReplaceTempView("tiny")
sql("CACHE TABLE tiny")

Expand All @@ -157,7 +157,7 @@ class PlannerSuite extends SharedSQLContext {
val df = spark.read.parquet(path)
df.createOrReplaceTempView("testPushed")

withTempTable("testPushed") {
withTempView("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]"))
}
Expand Down Expand Up @@ -198,7 +198,7 @@ class PlannerSuite extends SharedSQLContext {
}

test("PartitioningCollection") {
withTempTable("normal", "small", "tiny") {
withTempView("normal", "small", "tiny") {
testData.createOrReplaceTempView("normal")
testData.limit(10).createOrReplaceTempView("small")
testData.limit(3).createOrReplaceTempView("tiny")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("rename temporary table - destination table with database name") {
withTempTable("tab1") {
withTempView("tab1") {
sql(
"""
|CREATE TEMPORARY TABLE tab1
Expand All @@ -523,7 +523,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("rename temporary table - destination table already exists") {
withTempTable("tab1", "tab2") {
withTempView("tab1", "tab2") {
sql(
"""
|CREATE TEMPORARY TABLE tab1
Expand Down Expand Up @@ -678,7 +678,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("show tables") {
withTempTable("show1a", "show2b") {
withTempView("show1a", "show2b") {
sql(
"""
|CREATE TEMPORARY TABLE show1a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("Corrupt records: PERMISSIVE mode") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempTable("jsonTable") {
withTempView("jsonTable") {
val jsonDF = spark.read.json(corruptRecords)
jsonDF.createOrReplaceTempView("jsonTable")
val schema = StructType(
Expand Down Expand Up @@ -1515,7 +1515,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-12057 additional corrupt records do not throw exceptions") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempTable("jsonTable") {
withTempView("jsonTable") {
val schema = StructType(
StructField("_unparsed", StringType, true) ::
StructField("dummy", StringType, true) :: Nil)
Expand Down Expand Up @@ -1632,7 +1632,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

test("Casting long as timestamp") {
withTempTable("jsonTable") {
withTempView("jsonTable") {
val schema = (new StructType).add("ts", TimestampType)
val jsonDF = spark.read.schema(schema).json(timestampAsLong)

Expand Down
Loading