Skip to content

Commit e76f4f4

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable
## What changes were proposed in this pull request? Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf` ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #14634 from cloud-fan/bug. (cherry picked from commit eb004c6) Signed-off-by: Yin Huai <[email protected]>
1 parent 643f161 commit e76f4f4

File tree

2 files changed

+31
-10
lines changed

2 files changed

+31
-10
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
147147
val hadoopConf = sessionState.newHadoopConf()
148148
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
149149
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
150-
val isCompressed =
151-
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
150+
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
152151

153152
if (isCompressed) {
154153
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
182181
// Validate partition spec if there exist any dynamic partitions
183182
if (numDynamicPartitions > 0) {
184183
// Report error if dynamic partitioning is not enabled
185-
if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
184+
if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
186185
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
187186
}
188187

189188
// Report error if dynamic partition strict mode is on but no static partition is found
190189
if (numStaticPartitions == 0 &&
191-
sessionState.conf.getConfString(
192-
"hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
193-
{
190+
hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
194191
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
195192
}
196193

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,34 @@ import scala.util.Try
2626
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2727
import org.scalatest.BeforeAndAfter
2828

29-
import org.apache.spark.{SparkException, SparkFiles}
30-
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
29+
import org.apache.spark.SparkFiles
30+
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
3131
import org.apache.spark.sql.catalyst.expressions.Cast
3232
import org.apache.spark.sql.catalyst.parser.ParseException
3333
import org.apache.spark.sql.catalyst.plans.logical.Project
3434
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
3535
import org.apache.spark.sql.hive._
36-
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
36+
import org.apache.spark.sql.hive.test.TestHive
3737
import org.apache.spark.sql.hive.test.TestHive._
3838
import org.apache.spark.sql.internal.SQLConf
39+
import org.apache.spark.sql.test.SQLTestUtils
3940

4041
case class TestData(a: Int, b: String)
4142

4243
/**
4344
* A set of test cases expressed in Hive QL that are not covered by the tests
4445
* included in the hive distribution.
4546
*/
46-
class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
47+
class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
4748
private val originalTimeZone = TimeZone.getDefault
4849
private val originalLocale = Locale.getDefault
4950

5051
import org.apache.spark.sql.hive.test.TestHive.implicits._
5152

5253
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
5354

55+
def spark: SparkSession = sparkSession
56+
5457
override def beforeAll() {
5558
super.beforeAll()
5659
TestHive.setCacheTables(true)
@@ -1203,6 +1206,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
12031206
}
12041207
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
12051208
}
1209+
1210+
test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
1211+
val modeConfKey = "hive.exec.dynamic.partition.mode"
1212+
withTable("with_parts") {
1213+
sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")
1214+
1215+
withSQLConf(modeConfKey -> "nonstrict") {
1216+
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
1217+
assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
1218+
}
1219+
1220+
val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
1221+
try {
1222+
spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
1223+
sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
1224+
assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
1225+
} finally {
1226+
spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
1227+
}
1228+
}
1229+
}
12061230
}
12071231

12081232
// for SPARK-2180 test

0 commit comments

Comments
 (0)