Skip to content

Commit 027b265

Browse files
committed
backport SPARK-18675
1 parent b2abb8a commit 027b265

File tree

2 files changed

+75
-12
lines changed

2 files changed

+75
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import java.util.{Date, Random}
2525

2626
import scala.collection.JavaConverters._
2727

28-
import org.apache.hadoop.conf.Configuration
2928
import org.apache.hadoop.fs.{FileSystem, Path}
3029
import org.apache.hadoop.hive.common.FileUtils
3130
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -57,6 +56,7 @@ case class InsertIntoHiveTable(
5756

5857
val hadoopConf = sessionState.newHadoopConf()
5958
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
59+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
6060

6161
private def executionId: String = {
6262
val rand: Random = new Random
@@ -65,7 +65,7 @@ case class InsertIntoHiveTable(
6565
return executionId
6666
}
6767

68-
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
68+
private def getStagingDir(inputPath: Path): Path = {
6969
val inputPathUri: URI = inputPath.toUri
7070
val inputPathName: String = inputPathUri.getPath
7171
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -94,21 +94,69 @@ case class InsertIntoHiveTable(
9494
return dir
9595
}
9696

97-
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
98-
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
97+
private def getExternalScratchDir(extURI: URI): Path = {
98+
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
9999
}
100100

101-
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
101+
def getExternalTmpPath(path: Path): Path = {
102+
import org.apache.spark.sql.hive.client.hive._
103+
104+
val hiveVersion = client.version
105+
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
106+
// a common scratch directory. After the writing is finished, Hive will simply empty the table
107+
// directory and move the staging directory to it.
108+
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
109+
// moving staging directory to table directory, Hive will still empty the table directory, but
110+
// will exclude the staging directory there.
111+
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
112+
// staging directory under the table director for Hive prior to 1.1, the staging directory will
113+
// be removed by Hive when Hive is trying to empty the table directory.
114+
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
115+
oldVersionExternalTempPath(path)
116+
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
117+
newVersionExternalTempPath(path)
118+
} else {
119+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
120+
}
121+
}
122+
123+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
124+
def oldVersionExternalTempPath(path: Path): Path = {
125+
val extURI: URI = path.toUri
126+
val scratchPath = new Path(scratchDir, executionId)
127+
var dirPath = new Path(
128+
extURI.getScheme,
129+
extURI.getAuthority,
130+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
131+
132+
try {
133+
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
134+
dirPath = new Path(fs.makeQualified(dirPath).toString())
135+
136+
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
137+
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
138+
}
139+
fs.deleteOnExit(dirPath)
140+
} catch {
141+
case e: IOException =>
142+
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
143+
144+
}
145+
dirPath
146+
}
147+
148+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
149+
def newVersionExternalTempPath(path: Path): Path = {
102150
val extURI: URI = path.toUri
103151
if (extURI.getScheme == "viewfs") {
104-
getExtTmpPathRelTo(path.getParent, hadoopConf)
152+
getExtTmpPathRelTo(path.getParent)
105153
} else {
106-
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
154+
new Path(getExternalScratchDir(extURI), "-ext-10000")
107155
}
108156
}
109157

110-
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
111-
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
158+
def getExtTmpPathRelTo(path: Path): Path = {
159+
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
112160
}
113161

114162
private def saveAsHiveFile(
@@ -145,7 +193,7 @@ case class InsertIntoHiveTable(
145193
// instances within the closure, since Serializer is not serializable while TableDesc is.
146194
val tableDesc = table.tableDesc
147195
val tableLocation = table.hiveQlTable.getDataLocation
148-
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
196+
val tmpLocation = getExternalTmpPath(tableLocation)
149197
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
150198
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
151199

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ import org.apache.hadoop.util.VersionInfo
2828

2929
import org.apache.spark.{SparkConf, SparkFunSuite}
3030
import org.apache.spark.internal.Logging
31-
import org.apache.spark.sql.AnalysisException
31+
import org.apache.spark.sql.{AnalysisException, Row}
3232
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3333
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
3434
import org.apache.spark.sql.catalyst.catalog._
3535
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
3636
import org.apache.spark.sql.catalyst.util.quietly
3737
import org.apache.spark.sql.hive.HiveUtils
38+
import org.apache.spark.sql.hive.test.TestHiveSingleton
39+
import org.apache.spark.sql.test.SQLTestUtils
3840
import org.apache.spark.sql.types.IntegerType
3941
import org.apache.spark.tags.ExtendedHiveTest
4042
import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -46,7 +48,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
4648
* is not fully tested.
4749
*/
4850
@ExtendedHiveTest
49-
class VersionsSuite extends SparkFunSuite with Logging {
51+
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
5052

5153
private val sparkConf = new SparkConf()
5254

@@ -531,5 +533,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
531533
client.reset()
532534
assert(client.listTables("default").isEmpty)
533535
}
536+
537+
///////////////////////////////////////////////////////////////////////////
538+
// End-To-End tests
539+
///////////////////////////////////////////////////////////////////////////
540+
541+
test(s"$version: CREATE TABLE AS SELECT") {
542+
withTable("tbl") {
543+
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
544+
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
545+
}
546+
}
547+
548+
// TODO: add more tests.
534549
}
535550
}

0 commit comments

Comments
 (0)