From ac65375a64c2a8a2fe019dc0e2c031f413df74b8 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Tue, 8 Nov 2016 16:41:32 -0800 Subject: [PATCH 1/7] SPARK-18372 --- .../hive/execution/InsertIntoHiveTable.scala | 73 +++++++++++++++++-- .../sql/hive/InsertIntoHiveTableSuite.scala | 2 + 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index f936cf565b2bc..aca47dba00069 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,24 +17,31 @@ package org.apache.spark.sql.hive.execution +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat import java.util +import java.util.{Date, Random} -import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.DataType @@ -54,6 +61,61 @@ case class InsertIntoHiveTable( @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog + val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + + private def executionId: String = { + val rand: Random = new Random + val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") + val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + return executionId + } + + private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + val stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + fs.deleteOnExit(dir) + } + catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + + } + return dir + } + + private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + } + + def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + } + } + + def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { + new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + } + private def newSerializer(tableDesc: TableDesc): Serializer = { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] serializer.initialize(null, tableDesc.getProperties) @@ -129,7 +191,9 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) + val jobConf = new JobConf(sc.hiveconf) + val tmpLocation = getExternalTmpPath(tableLocation, jobConf) + val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sc.hiveconf.getBoolean( ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) @@ -175,7 +239,6 @@ case class InsertIntoHiveTable( } } - val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 81ee9ba71beb6..559667c948811 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -76,6 +76,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("SELECT * FROM createAndInsertTest"), testData.collect().toSeq ) + + } test("Double create fails when allowExisting = false") { From e537239c659f1e3a28e81822e954677c6e65945c Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Mon, 19 Dec 2016 16:03:13 -0800 Subject: [PATCH 2/7] [SPARK-18703][Backport][Branch-1.6.x] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables (#16134) --- .../sql/hive/execution/InsertIntoHiveTable.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index aca47dba00069..c5638fbc8b2a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils +import scala.util.control.NonFatal + import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -61,6 +63,7 @@ case class InsertIntoHiveTable( @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog + @transient var createdTempDir: Option[Path] = None val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) private def executionId: String = { @@ -88,6 +91,7 @@ case class InsertIntoHiveTable( if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } + createdTempDir = Some(dir) fs.deleteOnExit(dir) } catch { @@ -323,6 +327,15 @@ case class InsertIntoHiveTable( holdDDLTime) } + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) } + } catch { + case NonFatal(e) => + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + // Invalidate the cache. sqlContext.cacheManager.invalidateCache(table) From 8648a4675b5b5a653e67aedb058f9f04a051890e Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Sun, 1 Jan 2017 23:33:29 -0800 Subject: [PATCH 3/7] backport the testbase from 16339 --- .../sql/hive/InsertIntoHiveTableSuite.scala | 2 - .../spark/sql/hive/client/VersionsSuite.scala | 40 +++++++++++++++++-- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 559667c948811..81ee9ba71beb6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -76,8 +76,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("SELECT * FROM createAndInsertTest"), testData.collect().toSeq ) - - } test("Double create fails when allowExisting = false") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 502b240f3650f..3a578ac088b17 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.hive.client import java.io.File import org.apache.hadoop.util.VersionInfo - +import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkFunSuite} -import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.types.IntegerType import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton /** * A simple set of tests that call the methods of a hive ClientInterface, loading different version @@ -36,7 +38,7 @@ import org.apache.spark.util.Utils * is not fully tested. */ @ExtendedHiveTest -class VersionsSuite extends SparkFunSuite with Logging { +class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging { // In order to speed up test execution during development or in Jenkins, you can specify the path // of an existing Ivy cache: @@ -216,5 +218,37 @@ class VersionsSuite extends SparkFunSuite with Logging { "as 'COMPACT' WITH DEFERRED REBUILD") client.reset() } + + test(s"$version: CREATE TABLE AS SELECT") { + withTable("tbl") { + sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") + assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1))) + } + } + + test(s"$version: Delete the temporary staging directory and files after each insert") { + withTempDir { tmpDir => + withTable("tab", "tbl") { + sqlContext.sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") + + sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") + + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + folders.flatMap(listFiles) ++: filePaths + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sorted == expectedFiles) + } + } + } } } From 881c96bbf59714baf9f474b16fc44f88722a1dd4 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Mon, 2 Jan 2017 16:13:15 -0800 Subject: [PATCH 4/7] fix based on tao's comment --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c5638fbc8b2a2..a0de5ec7e3779 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -70,7 +70,7 @@ case class InsertIntoHiveTable( val rand: Random = new Random val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - return executionId + executionId } private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 3a578ac088b17..004f0c09be00f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -25,12 +25,13 @@ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.hive.test.TestHiveSingleton + /** * A simple set of tests that call the methods of a hive ClientInterface, loading different version * of hive from maven central. These tests are simple in that they are mostly just testing to make @@ -235,8 +236,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingle |location '${tmpDir.toURI.toString}' """.stripMargin) - sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") - + import sqlContext.implicits._ + Seq(Tuple1("a")).toDF("value").registerTempTable("tbl") sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") def listFiles(path: File): List[String] = { From 15da7a83c8599a0d6d28f5a0bce8a3132033867c Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Mon, 2 Jan 2017 22:01:52 -0800 Subject: [PATCH 5/7] Based ON Xiao Li's review --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 004f0c09be00f..6262e10c19d90 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.IntegerType import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils import org.apache.spark.sql.test.SQLTestUtils From 4f26b287dc9fa94c618747023d1dee84804c94a8 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Thu, 5 Jan 2017 00:59:25 -0800 Subject: [PATCH 6/7] fix indent and repartition the DF to meet the test case --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a0de5ec7e3779..bdf711799f065 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -70,7 +70,7 @@ case class InsertIntoHiveTable( val rand: Random = new Random val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - executionId + executionId } private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 6262e10c19d90..3fbd570878792 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -228,16 +228,16 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingle } test(s"$version: Delete the temporary staging directory and files after each insert") { + import sqlContext.implicits._ withTempDir { tmpDir => withTable("tab", "tbl") { sqlContext.sql( s""" - |CREATE TABLE tab(c1 string) + |CREATE TABLE tab(c1 string) |location '${tmpDir.toURI.toString}' """.stripMargin) - import sqlContext.implicits._ - Seq(Tuple1("a")).toDF("value").registerTempTable("tbl") + Seq(Tuple1("a")).toDF("value").repartition(1).registerTempTable("tbl") sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") def listFiles(path: File): List[String] = { From ab5e36974b17a0e4533f5af15d61fb7c8d9aeaf0 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Thu, 5 Jan 2017 22:55:04 -0800 Subject: [PATCH 7/7] revert to create table for testing --- .../scala/org/apache/spark/sql/hive/client/VersionsSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 3fbd570878792..09a10672e232b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -228,7 +228,6 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingle } test(s"$version: Delete the temporary staging directory and files after each insert") { - import sqlContext.implicits._ withTempDir { tmpDir => withTable("tab", "tbl") { sqlContext.sql( @@ -237,7 +236,7 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingle |location '${tmpDir.toURI.toString}' """.stripMargin) - Seq(Tuple1("a")).toDF("value").repartition(1).registerTempTable("tbl") + sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") def listFiles(path: File): List[String] = {