Skip to content

Commit 1c0caf7

Browse files
committed
[SPARK-12257] Non partitioned insert into a partitioned Hive table doesn't fail
1 parent d8ec081 commit 1c0caf7

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ case class InsertIntoHiveTable(
155155
val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
156156
val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
157157

158+
// Validate that partition values are specified for partition columns.
159+
if (partitionColumnNames != null && partitionColumnNames.size > 0 && partitionSpec.size == 0) {
160+
throw new SparkException(ErrorMsg.NEED_PARTITION_ERROR.getMsg)
161+
}
162+
158163
// Validate partition spec if there exist any dynamic partitions
159164
if (numDynamicPartitions > 0) {
160165
// Report error if dynamic partitioning is not enabled

sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import java.io.File
2222
import org.apache.hadoop.hive.conf.HiveConf
2323
import org.scalatest.BeforeAndAfter
2424

25+
import org.apache.spark.SparkException
26+
import org.apache.spark.sql._
2527
import org.apache.spark.sql.execution.QueryExecutionException
26-
import org.apache.spark.sql.{QueryTest, _}
2728
import org.apache.spark.sql.hive.test.TestHiveSingleton
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.util.Utils
@@ -259,4 +260,14 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
259260

260261
sql("DROP TABLE table_with_partition")
261262
}
263+
264+
test("Insert into partitioned table with no partition values") {
265+
val df = Seq((1, 1), (2, 2), (3, 3)).toDF("c1", "C2")
266+
df.registerTempTable("nonpartition_tab")
267+
sql("CREATE TABLE partitioned_tab (x INT) PARTITIONED BY (y INT)")
268+
intercept[SparkException] {
269+
sql("INSERT OVERWRITE TABLE partitioned_tab SELECT c1 FROM nonpartition_tab")
270+
}
271+
sql("DROP TABLE partitioned_tab")
272+
}
262273
}

0 commit comments

Comments
 (0)