From 8773c039b6db39d684efa30d0ad42e49f6b4b76d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 3 Dec 2016 22:26:03 -0800 Subject: [PATCH 1/6] fix --- .../hive/execution/InsertIntoHiveTable.scala | 13 +++++++++++ .../sql/hive/InsertIntoHiveTableSuite.scala | 23 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5f5c8e2432d6c..de14bea344743 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,6 +22,8 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils @@ -85,6 +87,7 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty val hadoopConf = sessionState.newHadoopConf() + val createdTempDir = new scala.collection.mutable.ArrayBuffer[Path] val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") private def executionId: String = { @@ -111,6 +114,7 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } + createdTempDir += dir fs.deleteOnExit(dir) } catch { case e: IOException => @@ -328,6 +332,15 @@ case class InsertIntoHiveTable( holdDDLTime) } + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + } catch { + case NonFatal(e) => + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index e3ddaf725424d..81374d0506044 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -166,6 +166,29 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE tmp_table") } + test("Delete the temporary staging directory and files after each insert") { + withTable("tab") { + val tmpDir = Utils.createTempDir() + sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + (1 to 3).map { i => + sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") + } + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + filePaths ::: folders.flatMap(listFiles) + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) + } + } + test("INSERT OVERWRITE - partition IF NOT EXISTS") { withTempDir { tmpDir => val table = "table_with_partition" From 34340eee2f0fb54ab95cc92f70d8ab0c731be76f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 4 Dec 2016 10:05:58 -0800 Subject: [PATCH 2/6] address comments. --- .../sql/hive/InsertIntoHiveTableSuite.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 81374d0506044..9d829343f0114 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -167,25 +167,26 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } test("Delete the temporary staging directory and files after each insert") { - withTable("tab") { - val tmpDir = Utils.createTempDir() - sql( - s""" - |CREATE TABLE tab(c1 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - - (1 to 3).map { i => - sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") - } - def listFiles(path: File): List[String] = { - val dir = path.listFiles() - val folders = dir.filter(_.isDirectory).toList - val filePaths = dir.map(_.getName).toList - filePaths ::: folders.flatMap(listFiles) + withTempDir { tmpDir => + withTable("tab") { + sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + (1 to 3).map { i => + sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") + } + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + filePaths ::: folders.flatMap(listFiles) + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) } - val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil - assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) } } From 1ed228f90100409ef98533ecdfbeca78c58fa66d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 10 Dec 2016 10:00:23 -0800 Subject: [PATCH 3/6] address comments. --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 4 ++-- .../org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index de14bea344743..7d6f00edf2f08 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -87,7 +87,7 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty val hadoopConf = sessionState.newHadoopConf() - val createdTempDir = new scala.collection.mutable.ArrayBuffer[Path] + var createdTempDir: Option[Path] = None val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") private def executionId: String = { @@ -114,7 +114,7 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } - createdTempDir += dir + createdTempDir = Some(dir) fs.deleteOnExit(dir) } catch { case e: IOException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 9d829343f0114..50a5a268ffa15 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -182,7 +182,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef val dir = path.listFiles() val folders = dir.filter(_.isDirectory).toList val filePaths = dir.map(_.getName).toList - filePaths ::: folders.flatMap(listFiles) + folders.flatMap(listFiles) ++: filePaths } val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) From 16c9da3f6ad33dd5c675647cbad30276262e55eb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 14 Dec 2016 00:35:12 -0800 Subject: [PATCH 4/6] address comments. --- .../hive/execution/InsertIntoHiveTable.scala | 3 +-- .../sql/hive/InsertIntoHiveTableSuite.scala | 24 ------------------- .../spark/sql/hive/client/VersionsSuite.scala | 24 +++++++++++++++++++ 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c7a82f028ee7b..aa858e808edf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -120,7 +120,6 @@ case class InsertIntoHiveTable( case e: IOException => throw new RuntimeException( "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) - } return dir } @@ -167,11 +166,11 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) } + createdTempDir = Some(dirPath) fs.deleteOnExit(dirPath) } catch { case e: IOException => throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) - } dirPath } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 50a5a268ffa15..e3ddaf725424d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -166,30 +166,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE tmp_table") } - test("Delete the temporary staging directory and files after each insert") { - withTempDir { tmpDir => - withTable("tab") { - sql( - s""" - |CREATE TABLE tab(c1 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - - (1 to 3).map { i => - sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") - } - def listFiles(path: File): List[String] = { - val dir = path.listFiles() - val folders = dir.filter(_.isDirectory).toList - val filePaths = dir.map(_.getName).toList - folders.flatMap(listFiles) ++: filePaths - } - val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil - assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) - } - } - } - test("INSERT OVERWRITE - partition IF NOT EXISTS") { withTempDir { tmpDir => val table = "table_with_partition" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9b26383a162dd..4ec023d6a91c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet } } + test(s"$version: Delete the temporary staging directory and files after each insert") { + withTempDir { tmpDir => + withTable("tab") { + spark.sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + (1 to 3).map { i => + spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") + } + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + folders.flatMap(listFiles) ++: filePaths + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) + } + } + } + // TODO: add more tests. } } From 5aca883f1c9495c4e5bab068283b4743e3b43ebd Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 14 Dec 2016 00:36:07 -0800 Subject: [PATCH 5/6] style --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 4ec023d6a91c1..505c9699b6730 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -553,7 +553,7 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet s""" |CREATE TABLE tab(c1 string) |location '${tmpDir.toURI.toString}' - """.stripMargin) + """.stripMargin) (1 to 3).map { i => spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") From 9da395171df6ed354ac64f1fd3c55da2b53757d4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 14 Dec 2016 06:59:49 -0800 Subject: [PATCH 6/6] address comments. --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 505c9699b6730..8dd06998ba3c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -565,7 +565,7 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet folders.flatMap(listFiles) ++: filePaths } val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil - assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) + assert(listFiles(tmpDir).sorted == expectedFiles) } } }