Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
Expand Down Expand Up @@ -55,7 +55,10 @@ case class InsertIntoHiveTable(

def output: Seq[Attribute] = Seq.empty

val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val hadoopConf = sessionState.newHadoopConf()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15744 needs to be backported too.

var createdTempDir: Option[Path] = None
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")

private def executionId: String = {
val rand: Random = new Random
Expand All @@ -64,7 +67,7 @@ case class InsertIntoHiveTable(
return executionId
}

private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
private def getStagingDir(inputPath: Path): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
Expand All @@ -82,32 +85,80 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
}
catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

}
return dir
}

private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
private def getExternalScratchDir(extURI: URI): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
}

def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._

val hiveVersion = client.version
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
oldVersionExternalTempPath(path)
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
newVersionExternalTempPath(path)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}

def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
def oldVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())

try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())

if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
}
dirPath
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
def newVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
getExtTmpPathRelTo(path.getParent)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
new Path(getExternalScratchDir(extURI), "-ext-10000")
}
}

def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
def getExtTmpPathRelTo(path: Path): Path = {
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
}

private def saveAsHiveFile(
Expand Down Expand Up @@ -144,8 +195,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val tmpLocation = getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean

Expand Down Expand Up @@ -293,6 +343,15 @@ case class InsertIntoHiveTable(
holdDDLTime)
}

// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
} catch {
case NonFatal(e) =>
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}

// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.{MutableURLClassLoader, Utils}
Expand All @@ -46,7 +48,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {

private val sparkConf = new SparkConf()

Expand Down Expand Up @@ -531,5 +533,42 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.reset()
assert(client.listTables("default").isEmpty)
}

///////////////////////////////////////////////////////////////////////////
// End-To-End tests
///////////////////////////////////////////////////////////////////////////

test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
}
}

test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
spark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)

(1 to 3).map { i =>
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
assert(listFiles(tmpDir).sorted == expectedFiles)
}
}
}

// TODO: add more tests.
}
}