From 13defbbd26a2ec4806c1fc94b890f6f43068d411 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 04:11:03 +0000 Subject: [PATCH 1/2] Clear FileSystem deleteOnExit cache when paths are successfully removed. --- .../hive/execution/InsertIntoHiveTable.scala | 9 +- .../sql/hive/execution/SQLQuerySuite.scala | 235 ++++++++++-------- 2 files changed, 143 insertions(+), 101 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b6f4898fd1574..510c33509d8c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -423,7 +423,14 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { - createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + fs.delete(path, true) + // If we successfully delete the staging directory, remove it from FileSystem's cache. + if (!fs.exists(path)) { + fs.cancelDeleteOnExit(path) + } + } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a949e5e829e14..50a04598579b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.util.Locale +import java.util.{Locale, Set} import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.TestUtils import org.apache.spark.sql._ @@ -156,69 +156,71 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-6851: Self-joined converted parquet tables") { - val orders = Seq( - Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), - Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151), - Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151), - Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151), - Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151), - Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151), - Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152), - Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152), - Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152), - Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152), - Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152)) - - val orderUpdates = Seq( - Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151), - Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151)) - - orders.toDF.createOrReplaceTempView("orders1") - orderUpdates.toDF.createOrReplaceTempView("orderupdates1") + withTable("orders", "orderupdates") { + val orders = Seq( + Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), + Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151), + Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151), + Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151), + Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151), + Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151), + Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152), + Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152), + Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152), + Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152), + Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152)) + + val orderUpdates = Seq( + Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151), + Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151)) + + orders.toDF.createOrReplaceTempView("orders1") + orderUpdates.toDF.createOrReplaceTempView("orderupdates1") - sql( - """CREATE TABLE orders( - | id INT, - | make String, - | type String, - | price INT, - | pdate String, - | customer String, - | city String) - |PARTITIONED BY (state STRING, month INT) - |STORED AS PARQUET - """.stripMargin) + sql( + """CREATE TABLE orders( + | id INT, + | make String, + | type String, + | price INT, + | pdate String, + | customer String, + | city String) + |PARTITIONED BY (state STRING, month INT) + |STORED AS PARQUET + """.stripMargin) - sql( - """CREATE TABLE orderupdates( - | id INT, - | make String, - | type String, - | price INT, - | pdate String, - | customer String, - | city String) - |PARTITIONED BY (state STRING, month INT) - |STORED AS PARQUET - """.stripMargin) + sql( + """CREATE TABLE orderupdates( + | id INT, + | make String, + | type String, + | price INT, + | pdate String, + | customer String, + | city String) + |PARTITIONED BY (state STRING, month INT) + |STORED AS PARQUET + """.stripMargin) - sql("set hive.exec.dynamic.partition.mode=nonstrict") - sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1") - sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1") + sql("set hive.exec.dynamic.partition.mode=nonstrict") + sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1") + sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1") - checkAnswer( - sql( - """ - |select orders.state, orders.month - |from orders - |join ( - | select distinct orders.state,orders.month - | from orders - | join orderupdates - | on orderupdates.id = orders.id) ao - | on ao.state = orders.state and ao.month = orders.month - """.stripMargin), - (1 to 6).map(_ => Row("CA", 20151))) + checkAnswer( + sql( + """ + |select orders.state, orders.month + |from orders + |join ( + | select distinct orders.state,orders.month + | from orders + | join orderupdates + | on orderupdates.id = orders.id) ao + | on ao.state = orders.state and ao.month = orders.month + """.stripMargin), + (1 to 6).map(_ => Row("CA", 20151))) + } } test("show functions") { @@ -389,21 +391,25 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("CTAS with WITH clause") { - val df = Seq((1, 1)).toDF("c1", "c2") - df.createOrReplaceTempView("table1") + withTable("with_table1") { + withTempView("table1") { + val df = Seq((1, 1)).toDF("c1", "c2") + df.createOrReplaceTempView("table1") - sql( - """ - |CREATE TABLE with_table1 AS - |WITH T AS ( - | SELECT * - | FROM table1 - |) - |SELECT * - |FROM T - """.stripMargin) - val query = sql("SELECT * FROM with_table1") - checkAnswer(query, Row(1, 1) :: Nil) + sql( + """ + |CREATE TABLE with_table1 AS + |WITH T AS ( + | SELECT * + | FROM table1 + |) + |SELECT * + |FROM T + """.stripMargin) + val query = sql("SELECT * FROM with_table1") + checkAnswer(query, Row(1, 1) :: Nil) + } + } } test("explode nested Field") { @@ -685,6 +691,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("SELECT key, value FROM ctas5 ORDER BY key, value"), sql("SELECT key, value FROM src ORDER BY key, value")) } + sql("DROP TABLE ctas1") + sql("DROP TABLE ctas2") + sql("DROP TABLE ctas3") + sql("DROP TABLE ctas4") + sql("DROP TABLE ctas5") } test("specifying the column list for CTAS") { @@ -756,40 +767,46 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("double nested data") { - sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) - .toDF().createOrReplaceTempView("nested") - checkAnswer( - sql("SELECT f1.f2.f3 FROM nested"), - Row(1)) + withTable("test_ctas_1234") { + sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) + .toDF().createOrReplaceTempView("nested") + checkAnswer( + sql("SELECT f1.f2.f3 FROM nested"), + Row(1)) - sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") - checkAnswer( - sql("SELECT * FROM test_ctas_1234"), - sql("SELECT * FROM nested").collect().toSeq) + sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") + checkAnswer( + sql("SELECT * FROM test_ctas_1234"), + sql("SELECT * FROM nested").collect().toSeq) - intercept[AnalysisException] { - sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() + intercept[AnalysisException] { + sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() + } } } test("test CTAS") { - sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") - checkAnswer( - sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), - sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) + withTable("test_ctas_123") { + sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") + checkAnswer( + sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), + sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) + } } test("SPARK-4825 save join to table") { - val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() - sql("CREATE TABLE test1 (key INT, value STRING)") - testData.write.mode(SaveMode.Append).insertInto("test1") - sql("CREATE TABLE test2 (key INT, value STRING)") - testData.write.mode(SaveMode.Append).insertInto("test2") - testData.write.mode(SaveMode.Append).insertInto("test2") - sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") - checkAnswer( - table("test"), - sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) + withTable("test", "test1", "test2") { + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test1 (key INT, value STRING)") + testData.write.mode(SaveMode.Append).insertInto("test1") + sql("CREATE TABLE test2 (key INT, value STRING)") + testData.write.mode(SaveMode.Append).insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") + sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") + checkAnswer( + table("test"), + sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) + } } test("SPARK-3708 Backticks aren't handled correctly is aliases") { @@ -2021,4 +2038,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4)) } } + + test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") { + withTable("test1") { + val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") + deleteOnExitField.setAccessible(true) + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] + + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test1 (key INT, value STRING)") + val pathSizeToDeleteOnExit = setOfPath.size() + + (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1")) + + assert(setOfPath.size() == pathSizeToDeleteOnExit) + } + } } From 2b0fd74e0eebaf77f41f2e6e53830a961ff71c1e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 14 Aug 2017 23:45:17 +0000 Subject: [PATCH 2/2] Use fs.delete returned value, revert the change to old tests. --- .../hive/execution/InsertIntoHiveTable.scala | 5 +- .../sql/hive/execution/SQLQuerySuite.scala | 217 ++++++++---------- 2 files changed, 102 insertions(+), 120 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 510c33509d8c3..858f29c7ee530 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -425,9 +425,8 @@ case class InsertIntoHiveTable( try { createdTempDir.foreach { path => val fs = path.getFileSystem(hadoopConf) - fs.delete(path, true) - // If we successfully delete the staging directory, remove it from FileSystem's cache. - if (!fs.exists(path)) { + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. fs.cancelDeleteOnExit(path) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 50a04598579b5..45bbb0c674be3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -156,71 +156,69 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-6851: Self-joined converted parquet tables") { - withTable("orders", "orderupdates") { - val orders = Seq( - Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), - Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151), - Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151), - Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151), - Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151), - Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151), - Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152), - Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152), - Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152), - Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152), - Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152)) - - val orderUpdates = Seq( - Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151), - Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151)) - - orders.toDF.createOrReplaceTempView("orders1") - orderUpdates.toDF.createOrReplaceTempView("orderupdates1") + val orders = Seq( + Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), + Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151), + Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151), + Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151), + Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151), + Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151), + Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152), + Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152), + Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152), + Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152), + Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152)) + + val orderUpdates = Seq( + Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151), + Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151)) + + orders.toDF.createOrReplaceTempView("orders1") + orderUpdates.toDF.createOrReplaceTempView("orderupdates1") - sql( - """CREATE TABLE orders( - | id INT, - | make String, - | type String, - | price INT, - | pdate String, - | customer String, - | city String) - |PARTITIONED BY (state STRING, month INT) - |STORED AS PARQUET - """.stripMargin) + sql( + """CREATE TABLE orders( + | id INT, + | make String, + | type String, + | price INT, + | pdate String, + | customer String, + | city String) + |PARTITIONED BY (state STRING, month INT) + |STORED AS PARQUET + """.stripMargin) - sql( - """CREATE TABLE orderupdates( - | id INT, - | make String, - | type String, - | price INT, - | pdate String, - | customer String, - | city String) - |PARTITIONED BY (state STRING, month INT) - |STORED AS PARQUET - """.stripMargin) + sql( + """CREATE TABLE orderupdates( + | id INT, + | make String, + | type String, + | price INT, + | pdate String, + | customer String, + | city String) + |PARTITIONED BY (state STRING, month INT) + |STORED AS PARQUET + """.stripMargin) - sql("set hive.exec.dynamic.partition.mode=nonstrict") - sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1") - sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1") + sql("set hive.exec.dynamic.partition.mode=nonstrict") + sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1") + sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1") - checkAnswer( - sql( - """ - |select orders.state, orders.month - |from orders - |join ( - | select distinct orders.state,orders.month - | from orders - | join orderupdates - | on orderupdates.id = orders.id) ao - | on ao.state = orders.state and ao.month = orders.month - """.stripMargin), - (1 to 6).map(_ => Row("CA", 20151))) - } + checkAnswer( + sql( + """ + |select orders.state, orders.month + |from orders + |join ( + | select distinct orders.state,orders.month + | from orders + | join orderupdates + | on orderupdates.id = orders.id) ao + | on ao.state = orders.state and ao.month = orders.month + """.stripMargin), + (1 to 6).map(_ => Row("CA", 20151))) } test("show functions") { @@ -391,25 +389,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("CTAS with WITH clause") { - withTable("with_table1") { - withTempView("table1") { - val df = Seq((1, 1)).toDF("c1", "c2") - df.createOrReplaceTempView("table1") + val df = Seq((1, 1)).toDF("c1", "c2") + df.createOrReplaceTempView("table1") - sql( - """ - |CREATE TABLE with_table1 AS - |WITH T AS ( - | SELECT * - | FROM table1 - |) - |SELECT * - |FROM T - """.stripMargin) - val query = sql("SELECT * FROM with_table1") - checkAnswer(query, Row(1, 1) :: Nil) - } - } + sql( + """ + |CREATE TABLE with_table1 AS + |WITH T AS ( + | SELECT * + | FROM table1 + |) + |SELECT * + |FROM T + """.stripMargin) + val query = sql("SELECT * FROM with_table1") + checkAnswer(query, Row(1, 1) :: Nil) } test("explode nested Field") { @@ -691,11 +685,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("SELECT key, value FROM ctas5 ORDER BY key, value"), sql("SELECT key, value FROM src ORDER BY key, value")) } - sql("DROP TABLE ctas1") - sql("DROP TABLE ctas2") - sql("DROP TABLE ctas3") - sql("DROP TABLE ctas4") - sql("DROP TABLE ctas5") } test("specifying the column list for CTAS") { @@ -767,46 +756,40 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("double nested data") { - withTable("test_ctas_1234") { - sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) - .toDF().createOrReplaceTempView("nested") - checkAnswer( - sql("SELECT f1.f2.f3 FROM nested"), - Row(1)) + sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) + .toDF().createOrReplaceTempView("nested") + checkAnswer( + sql("SELECT f1.f2.f3 FROM nested"), + Row(1)) - sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") - checkAnswer( - sql("SELECT * FROM test_ctas_1234"), - sql("SELECT * FROM nested").collect().toSeq) + sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") + checkAnswer( + sql("SELECT * FROM test_ctas_1234"), + sql("SELECT * FROM nested").collect().toSeq) - intercept[AnalysisException] { - sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() - } + intercept[AnalysisException] { + sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() } } test("test CTAS") { - withTable("test_ctas_123") { - sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") - checkAnswer( - sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), - sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) - } + sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") + checkAnswer( + sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), + sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) } test("SPARK-4825 save join to table") { - withTable("test", "test1", "test2") { - val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() - sql("CREATE TABLE test1 (key INT, value STRING)") - testData.write.mode(SaveMode.Append).insertInto("test1") - sql("CREATE TABLE test2 (key INT, value STRING)") - testData.write.mode(SaveMode.Append).insertInto("test2") - testData.write.mode(SaveMode.Append).insertInto("test2") - sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") - checkAnswer( - table("test"), - sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) - } + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test1 (key INT, value STRING)") + testData.write.mode(SaveMode.Append).insertInto("test1") + sql("CREATE TABLE test2 (key INT, value STRING)") + testData.write.mode(SaveMode.Append).insertInto("test2") + testData.write.mode(SaveMode.Append).insertInto("test2") + sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") + checkAnswer( + table("test"), + sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) } test("SPARK-3708 Backticks aren't handled correctly is aliases") { @@ -2040,7 +2023,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") { - withTable("test1") { + withTable("test21721") { val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") deleteOnExitField.setAccessible(true) @@ -2048,7 +2031,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() - sql("CREATE TABLE test1 (key INT, value STRING)") + sql("CREATE TABLE test21721 (key INT, value STRING)") val pathSizeToDeleteOnExit = setOfPath.size() (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))