Skip to content

Commit d53f18c

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions
## What changes were proposed in this pull request? Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it. After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there. In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory. This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory) ## How was this patch tested? new test Author: Wenchen Fan <[email protected]> Closes #16104 from cloud-fan/hive-0.13.
1 parent 096f868 commit d53f18c

File tree

2 files changed

+75
-12
lines changed

2 files changed

+75
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.{Date, Locale, Random}
2424

25-
import org.apache.hadoop.conf.Configuration
2625
import org.apache.hadoop.fs.{FileSystem, Path}
2726
import org.apache.hadoop.hive.common.FileUtils
2827
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,14 +85,15 @@ case class InsertIntoHiveTable(
8685

8786
val hadoopConf = sessionState.newHadoopConf()
8887
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
88+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
8989

9090
private def executionId: String = {
9191
val rand: Random = new Random
9292
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
9393
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
9494
}
9595

96-
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
96+
private def getStagingDir(inputPath: Path): Path = {
9797
val inputPathUri: URI = inputPath.toUri
9898
val inputPathName: String = inputPathUri.getPath
9999
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,69 @@ case class InsertIntoHiveTable(
121121
return dir
122122
}
123123

124-
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
125-
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
124+
private def getExternalScratchDir(extURI: URI): Path = {
125+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
126126
}
127127

128-
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
128+
def getExternalTmpPath(path: Path): Path = {
129+
import org.apache.spark.sql.hive.client.hive._
130+
131+
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
132+
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
133+
// a common scratch directory. After the writing is finished, Hive will simply empty the table
134+
// directory and move the staging directory to it.
135+
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
136+
// moving staging directory to table directory, Hive will still empty the table directory, but
137+
// will exclude the staging directory there.
138+
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
139+
// staging directory under the table director for Hive prior to 1.1, the staging directory will
140+
// be removed by Hive when Hive is trying to empty the table directory.
141+
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
142+
oldVersionExternalTempPath(path)
143+
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
144+
newVersionExternalTempPath(path)
145+
} else {
146+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
147+
}
148+
}
149+
150+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
151+
def oldVersionExternalTempPath(path: Path): Path = {
152+
val extURI: URI = path.toUri
153+
val scratchPath = new Path(scratchDir, executionId)
154+
var dirPath = new Path(
155+
extURI.getScheme,
156+
extURI.getAuthority,
157+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
158+
159+
try {
160+
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
161+
dirPath = new Path(fs.makeQualified(dirPath).toString())
162+
163+
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
164+
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
165+
}
166+
fs.deleteOnExit(dirPath)
167+
} catch {
168+
case e: IOException =>
169+
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
170+
171+
}
172+
dirPath
173+
}
174+
175+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
176+
def newVersionExternalTempPath(path: Path): Path = {
129177
val extURI: URI = path.toUri
130178
if (extURI.getScheme == "viewfs") {
131-
getExtTmpPathRelTo(path.getParent, hadoopConf)
179+
getExtTmpPathRelTo(path.getParent)
132180
} else {
133-
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
181+
new Path(getExternalScratchDir(extURI), "-ext-10000")
134182
}
135183
}
136184

137-
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
138-
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
185+
def getExtTmpPathRelTo(path: Path): Path = {
186+
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
139187
}
140188

141189
private def saveAsHiveFile(
@@ -172,7 +220,7 @@ case class InsertIntoHiveTable(
172220
// instances within the closure, since Serializer is not serializable while TableDesc is.
173221
val tableDesc = table.tableDesc
174222
val tableLocation = table.hiveQlTable.getDataLocation
175-
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
223+
val tmpLocation = getExternalTmpPath(tableLocation)
176224
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
177225
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
178226

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.AnalysisException
29+
import org.apache.spark.sql.{AnalysisException, Row}
3030
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3434
import org.apache.spark.sql.catalyst.util.quietly
3535
import org.apache.spark.sql.hive.HiveUtils
36+
import org.apache.spark.sql.hive.test.TestHiveSingleton
37+
import org.apache.spark.sql.test.SQLTestUtils
3638
import org.apache.spark.sql.types.IntegerType
3739
import org.apache.spark.sql.types.StructType
3840
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
4547
* is not fully tested.
4648
*/
4749
@ExtendedHiveTest
48-
class VersionsSuite extends SparkFunSuite with Logging {
50+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
4951

5052
private val clientBuilder = new HiveClientBuilder
5153
import clientBuilder.buildClient
@@ -532,5 +534,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
532534
client.reset()
533535
assert(client.listTables("default").isEmpty)
534536
}
537+
538+
///////////////////////////////////////////////////////////////////////////
539+
// End-To-End tests
540+
///////////////////////////////////////////////////////////////////////////
541+
542+
test(s"$version: CREATE TABLE AS SELECT") {
543+
withTable("tbl") {
544+
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
545+
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
546+
}
547+
}
548+
549+
// TODO: add more tests.
535550
}
536551
}

0 commit comments

Comments
 (0)