Skip to content

Commit 6fc7b2d

Browse files
committed
backport SPARK-18675
1 parent d8ef0be commit 6fc7b2d

File tree

2 files changed

+75
-12
lines changed

2 files changed

+75
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.{Date, Locale, Random}
2424

25-
import org.apache.hadoop.conf.Configuration
2625
import org.apache.hadoop.fs.{FileSystem, Path}
2726
import org.apache.hadoop.hive.common.FileUtils
2827
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,14 +85,15 @@ case class InsertIntoHiveTable(
8685

8786
val hadoopConf = sessionState.newHadoopConf()
8887
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
88+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
8989

9090
private def executionId: String = {
9191
val rand: Random = new Random
9292
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
9393
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
9494
}
9595

96-
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
96+
private def getStagingDir(inputPath: Path): Path = {
9797
val inputPathUri: URI = inputPath.toUri
9898
val inputPathName: String = inputPathUri.getPath
9999
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,69 @@ case class InsertIntoHiveTable(
121121
return dir
122122
}
123123

124-
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
125-
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
124+
private def getExternalScratchDir(extURI: URI): Path = {
125+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
126126
}
127127

128-
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
128+
def getExternalTmpPath(path: Path): Path = {
129+
import org.apache.spark.sql.hive.client.hive._
130+
131+
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
132+
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
133+
// a common scratch directory. After the writing is finished, Hive will simply empty the table
134+
// directory and move the staging directory to it.
135+
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
136+
// moving staging directory to table directory, Hive will still empty the table directory, but
137+
// will exclude the staging directory there.
138+
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
139+
// staging directory under the table director for Hive prior to 1.1, the staging directory will
140+
// be removed by Hive when Hive is trying to empty the table directory.
141+
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
142+
oldVersionExternalTempPath(path)
143+
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
144+
newVersionExternalTempPath(path)
145+
} else {
146+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
147+
}
148+
}
149+
150+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
151+
def oldVersionExternalTempPath(path: Path): Path = {
152+
val extURI: URI = path.toUri
153+
val scratchPath = new Path(scratchDir, executionId)
154+
var dirPath = new Path(
155+
extURI.getScheme,
156+
extURI.getAuthority,
157+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
158+
159+
try {
160+
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
161+
dirPath = new Path(fs.makeQualified(dirPath).toString())
162+
163+
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
164+
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
165+
}
166+
fs.deleteOnExit(dirPath)
167+
} catch {
168+
case e: IOException =>
169+
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
170+
171+
}
172+
dirPath
173+
}
174+
175+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
176+
def newVersionExternalTempPath(path: Path): Path = {
129177
val extURI: URI = path.toUri
130178
if (extURI.getScheme == "viewfs") {
131-
getExtTmpPathRelTo(path.getParent, hadoopConf)
179+
getExtTmpPathRelTo(path.getParent)
132180
} else {
133-
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
181+
new Path(getExternalScratchDir(extURI), "-ext-10000")
134182
}
135183
}
136184

137-
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
138-
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
185+
def getExtTmpPathRelTo(path: Path): Path = {
186+
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
139187
}
140188

141189
private def saveAsHiveFile(
@@ -172,7 +220,7 @@ case class InsertIntoHiveTable(
172220
// instances within the closure, since Serializer is not serializable while TableDesc is.
173221
val tableDesc = table.tableDesc
174222
val tableLocation = table.hiveQlTable.getDataLocation
175-
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
223+
val tmpLocation = getExternalTmpPath(tableLocation)
176224
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
177225
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
178226

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
2626

2727
import org.apache.spark.SparkFunSuite
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.AnalysisException
29+
import org.apache.spark.sql.{AnalysisException, Row}
3030
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
3434
import org.apache.spark.sql.catalyst.util.quietly
3535
import org.apache.spark.sql.hive.HiveUtils
36+
import org.apache.spark.sql.hive.test.TestHiveSingleton
37+
import org.apache.spark.sql.test.SQLTestUtils
3638
import org.apache.spark.sql.types.IntegerType
3739
import org.apache.spark.sql.types.StructType
3840
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
4547
* is not fully tested.
4648
*/
4749
@ExtendedHiveTest
48-
class VersionsSuite extends SparkFunSuite with Logging {
50+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
4951

5052
private val clientBuilder = new HiveClientBuilder
5153
import clientBuilder.buildClient
@@ -530,5 +532,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
530532
client.reset()
531533
assert(client.listTables("default").isEmpty)
532534
}
535+
536+
///////////////////////////////////////////////////////////////////////////
537+
// End-To-End tests
538+
///////////////////////////////////////////////////////////////////////////
539+
540+
test(s"$version: CREATE TABLE AS SELECT") {
541+
withTable("tbl") {
542+
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
543+
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
544+
}
545+
}
546+
547+
// TODO: add more tests.
533548
}
534549
}

0 commit comments

Comments
 (0)