Skip to content

Commit 3080f99

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-18703][SPARK-18675][SQL][BACKPORT-2.1] CTAS for hive serde table should work for all hive versions AND Drop Staging Directories and Data Files
### What changes were proposed in this pull request? This PR is to backport #16104 and #16134. ---------- [[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions](#16104) Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it. After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there. In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory. This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory) ---------- [[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables](#16134) Below are the files/directories generated for three inserts againsts a Hive table: ``` /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000 ``` The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped. Only the last two files are needed, as shown below. ``` /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000 ``` The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped. This PR is to drop the created staging files and temporary data files after each insert/CTAS. ### How was this patch tested? Added test cases. Author: gatorsmile <[email protected]> Closes #16325 from gatorsmile/backport-18703&18675.
1 parent a5da8db commit 3080f99

File tree

2 files changed

+112
-13
lines changed

2 files changed

+112
-13
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.{Date, Locale, Random}
2424

25-
import org.apache.hadoop.conf.Configuration
25+
import scala.util.control.NonFatal
26+
2627
import org.apache.hadoop.fs.{FileSystem, Path}
2728
import org.apache.hadoop.hive.common.FileUtils
2829
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -85,15 +86,17 @@ case class InsertIntoHiveTable(
8586
def output: Seq[Attribute] = Seq.empty
8687

8788
val hadoopConf = sessionState.newHadoopConf()
89+
var createdTempDir: Option[Path] = None
8890
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
91+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
8992

9093
private def executionId: String = {
9194
val rand: Random = new Random
9295
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
9396
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
9497
}
9598

96-
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
99+
private def getStagingDir(inputPath: Path): Path = {
97100
val inputPathUri: URI = inputPath.toUri
98101
val inputPathName: String = inputPathUri.getPath
99102
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -111,31 +114,79 @@ case class InsertIntoHiveTable(
111114
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
112115
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
113116
}
117+
createdTempDir = Some(dir)
114118
fs.deleteOnExit(dir)
115119
} catch {
116120
case e: IOException =>
117121
throw new RuntimeException(
118122
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
119-
120123
}
121124
return dir
122125
}
123126

124-
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
125-
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
127+
private def getExternalScratchDir(extURI: URI): Path = {
128+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
129+
}
130+
131+
def getExternalTmpPath(path: Path): Path = {
132+
import org.apache.spark.sql.hive.client.hive._
133+
134+
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
135+
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
136+
// a common scratch directory. After the writing is finished, Hive will simply empty the table
137+
// directory and move the staging directory to it.
138+
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
139+
// moving staging directory to table directory, Hive will still empty the table directory, but
140+
// will exclude the staging directory there.
141+
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
142+
// staging directory under the table director for Hive prior to 1.1, the staging directory will
143+
// be removed by Hive when Hive is trying to empty the table directory.
144+
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
145+
oldVersionExternalTempPath(path)
146+
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
147+
newVersionExternalTempPath(path)
148+
} else {
149+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
150+
}
151+
}
152+
153+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
154+
def oldVersionExternalTempPath(path: Path): Path = {
155+
val extURI: URI = path.toUri
156+
val scratchPath = new Path(scratchDir, executionId)
157+
var dirPath = new Path(
158+
extURI.getScheme,
159+
extURI.getAuthority,
160+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
161+
162+
try {
163+
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
164+
dirPath = new Path(fs.makeQualified(dirPath).toString())
165+
166+
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
167+
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
168+
}
169+
createdTempDir = Some(dirPath)
170+
fs.deleteOnExit(dirPath)
171+
} catch {
172+
case e: IOException =>
173+
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
174+
}
175+
dirPath
126176
}
127177

128-
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
178+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
179+
def newVersionExternalTempPath(path: Path): Path = {
129180
val extURI: URI = path.toUri
130181
if (extURI.getScheme == "viewfs") {
131-
getExtTmpPathRelTo(path.getParent, hadoopConf)
182+
getExtTmpPathRelTo(path.getParent)
132183
} else {
133-
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
184+
new Path(getExternalScratchDir(extURI), "-ext-10000")
134185
}
135186
}
136187

137-
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
138-
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
188+
def getExtTmpPathRelTo(path: Path): Path = {
189+
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
139190
}
140191

141192
private def saveAsHiveFile(
@@ -172,7 +223,7 @@ case class InsertIntoHiveTable(
172223
// instances within the closure, since Serializer is not serializable while TableDesc is.
173224
val tableDesc = table.tableDesc
174225
val tableLocation = table.hiveQlTable.getDataLocation
175-
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
226+
val tmpLocation = getExternalTmpPath(tableLocation)
176227
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
177228
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
178229

@@ -328,6 +379,15 @@ case class InsertIntoHiveTable(
328379
holdDDLTime)
329380
}
330381

382+
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
383+
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
384+
try {
385+
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
386+
} catch {
387+
case NonFatal(e) =>
388+
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
389+
}
390+
331391
// Invalidate the cache.
332392
sqlContext.sharedState.cacheManager.invalidateCache(table)
333393
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.AnalysisException
29+
import org.apache.spark.sql.{AnalysisException, Row}
3030
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3434
import org.apache.spark.sql.catalyst.util.quietly
3535
import org.apache.spark.sql.hive.HiveUtils
36+
import org.apache.spark.sql.hive.test.TestHiveSingleton
37+
import org.apache.spark.sql.test.SQLTestUtils
3638
import org.apache.spark.sql.types.IntegerType
3739
import org.apache.spark.sql.types.StructType
3840
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
4547
* is not fully tested.
4648
*/
4749
@ExtendedHiveTest
48-
class VersionsSuite extends SparkFunSuite with Logging {
50+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
4951

5052
private val clientBuilder = new HiveClientBuilder
5153
import clientBuilder.buildClient
@@ -530,5 +532,42 @@ class VersionsSuite extends SparkFunSuite with Logging {
530532
client.reset()
531533
assert(client.listTables("default").isEmpty)
532534
}
535+
536+
///////////////////////////////////////////////////////////////////////////
537+
// End-To-End tests
538+
///////////////////////////////////////////////////////////////////////////
539+
540+
test(s"$version: CREATE TABLE AS SELECT") {
541+
withTable("tbl") {
542+
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
543+
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
544+
}
545+
}
546+
547+
test(s"$version: Delete the temporary staging directory and files after each insert") {
548+
withTempDir { tmpDir =>
549+
withTable("tab") {
550+
spark.sql(
551+
s"""
552+
|CREATE TABLE tab(c1 string)
553+
|location '${tmpDir.toURI.toString}'
554+
""".stripMargin)
555+
556+
(1 to 3).map { i =>
557+
spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
558+
}
559+
def listFiles(path: File): List[String] = {
560+
val dir = path.listFiles()
561+
val folders = dir.filter(_.isDirectory).toList
562+
val filePaths = dir.map(_.getName).toList
563+
folders.flatMap(listFiles) ++: filePaths
564+
}
565+
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
566+
assert(listFiles(tmpDir).sorted == expectedFiles)
567+
}
568+
}
569+
}
570+
571+
// TODO: add more tests.
533572
}
534573
}

0 commit comments

Comments
 (0)