Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,11 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
stagingDir)
}

private def getStagingDir(
private[hive] def getStagingDir(
inputPath: Path,
hadoopConf: Configuration,
stagingDir: String): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val inputPathName: String = inputPath.toString
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
Expand All @@ -228,7 +227,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.sql.hive

import java.io.File

import org.scalatest.BeforeAndAfter
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand All @@ -36,7 +38,7 @@ case class TestData(key: Int, value: String)
case class ThreeCloumntable(key: Int, value: String, key1: String)

class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
with SQLTestUtils {
with SQLTestUtils with PrivateMethodTester {
import spark.implicits._

override lazy val testData = spark.sparkContext.parallelize(
Expand Down Expand Up @@ -550,6 +552,32 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}

test("SPARK-27552: hive.exec.stagingdir is invalid on Windows OS") {
val conf = spark.sessionState.newHadoopConf()
val inputPath = new Path("/tmp/b/c")
var stagingDir = "tmp/b"
val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null)
val getStagingDir = PrivateMethod[Path]('getStagingDir)
var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir)
assert(path.toString.indexOf("/tmp/b_hive_") != -1)

stagingDir = "tmp/b/c"
path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)

stagingDir = "d/e"
path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1)

stagingDir = ".d/e"
path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir)
assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1)

stagingDir = "/tmp/c/"
path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir)
assert(path.toString.indexOf("/tmp/c_hive_") != -1)
}

test("insert overwrite to dir from hive metastore table") {
withTempDir { dir =>
val path = dir.toURI.getPath
Expand Down