diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index f936cf565b2bc..bdf711799f065 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,24 +17,33 @@ package org.apache.spark.sql.hive.execution +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat import java.util +import java.util.{Date, Random} -import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils + +import scala.util.control.NonFatal +import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ import org.apache.spark.sql.types.DataType @@ -54,6 +63,63 @@ case class InsertIntoHiveTable( @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog + @transient var createdTempDir: Option[Path] = None + val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + + private def executionId: String = { + val rand: Random = new Random + val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") + val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + executionId + } + + private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + val stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + createdTempDir = Some(dir) + fs.deleteOnExit(dir) + } + catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + + } + return dir + } + + private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + } + + def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + } + } + + def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { + new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + } + private def newSerializer(tableDesc: TableDesc): Serializer = { val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] serializer.initialize(null, tableDesc.getProperties) @@ -129,7 +195,9 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) + val jobConf = new JobConf(sc.hiveconf) + val tmpLocation = getExternalTmpPath(tableLocation, jobConf) + val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sc.hiveconf.getBoolean( ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) @@ -175,7 +243,6 @@ case class InsertIntoHiveTable( } } - val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn @@ -260,6 +327,15 @@ case class InsertIntoHiveTable( holdDDLTime) } + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) } + } catch { + case NonFatal(e) => + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + // Invalidate the cache. sqlContext.cacheManager.invalidateCache(table) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 502b240f3650f..09a10672e232b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client import java.io.File import org.apache.hadoop.util.VersionInfo - +import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkFunSuite} -import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.types.IntegerType import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton + /** * A simple set of tests that call the methods of a hive ClientInterface, loading different version @@ -36,7 +39,7 @@ import org.apache.spark.util.Utils * is not fully tested. */ @ExtendedHiveTest -class VersionsSuite extends SparkFunSuite with Logging { +class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging { // In order to speed up test execution during development or in Jenkins, you can specify the path // of an existing Ivy cache: @@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging { "as 'COMPACT' WITH DEFERRED REBUILD") client.reset() } + + test(s"$version: CREATE TABLE AS SELECT") { + withTable("tbl") { + sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") + assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1))) + } + } + + test(s"$version: Delete the temporary staging directory and files after each insert") { + withTempDir { tmpDir => + withTable("tab", "tbl") { + sqlContext.sql( + s""" + |CREATE TABLE tab(c1 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") + sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") + + def listFiles(path: File): List[String] = { + val dir = path.listFiles() + val folders = dir.filter(_.isDirectory).toList + val filePaths = dir.map(_.getName).toList + folders.flatMap(listFiles) ++: filePaths + } + val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil + assert(listFiles(tmpDir).sorted == expectedFiles) + } + } + } } }