@@ -97,12 +97,23 @@ case class InsertIntoHiveTable(
9797 val inputPathUri : URI = inputPath.toUri
9898 val inputPathName : String = inputPathUri.getPath
9999 val fs : FileSystem = inputPath.getFileSystem(hadoopConf)
100- val stagingPathName : String =
100+ var stagingPathName : String =
101101 if (inputPathName.indexOf(stagingDir) == - 1 ) {
102102 new Path (inputPathName, stagingDir).toString
103103 } else {
104104 inputPathName.substring(0 , inputPathName.indexOf(stagingDir) + stagingDir.length)
105105 }
106+
107+ // SPARK-20594: The staging directory should be a child directory starts with "." to avoid
108+ // being deleted if we set hive.exec.stagingdir under the table directory.
109+ if (FileUtils .isSubDir(new Path (stagingPathName), inputPath, fs)
110+ && ! stagingPathName.stripPrefix(inputPathName).startsWith(" ." )) {
111+ logDebug(s " The staging dir ' $stagingPathName' should be a child directory starts " +
112+ s " with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
113+ s " directory. " )
114+ stagingPathName = new Path (inputPathName, " .hive-staging" ).toString
115+ }
116+
106117 val dir : Path =
107118 fs.makeQualified(
108119 new Path (stagingPathName + " _" + executionId + " -" + TaskRunner .getTaskRunnerID))
@@ -222,7 +233,7 @@ case class InsertIntoHiveTable(
222233 val externalCatalog = sparkSession.sharedState.externalCatalog
223234 val hiveVersion = externalCatalog.asInstanceOf [HiveExternalCatalog ].client.version
224235 val hadoopConf = sessionState.newHadoopConf()
225- val stagingDir = hadoopConf.get(" hive.exec.stagingdir" , " .hive-staging" ) + " /.hive-staging "
236+ val stagingDir = hadoopConf.get(" hive.exec.stagingdir" , " .hive-staging" )
226237 val scratchDir = hadoopConf.get(" hive.exec.scratchdir" , " /tmp/hive" )
227238
228239 val hiveQlTable = HiveClientImpl .toHiveTable(table)
0 commit comments