From 1c0caf72bcb30b0b3b8c479937c3669d64a63066 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 10 Dec 2015 02:26:24 -0800 Subject: [PATCH 1/2] [SPARK-12257] Non partitioned insert into a partitioned Hive table doesn't fail --- .../sql/hive/execution/InsertIntoHiveTable.scala | 5 +++++ .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 13 ++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index f936cf565b2bc..6c907f98c397c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -155,6 +155,11 @@ case class InsertIntoHiveTable( val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull + // Validate that partition values are specified for partition columns. + if (partitionColumnNames != null && partitionColumnNames.size > 0 && partitionSpec.size == 0) { + throw new SparkException(ErrorMsg.NEED_PARTITION_ERROR.getMsg) + } + // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 81ee9ba71beb6..91e0bc73d7e2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -22,8 +22,9 @@ import java.io.File import org.apache.hadoop.hive.conf.HiveConf import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException +import org.apache.spark.sql._ import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -259,4 +260,14 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE table_with_partition") } + + test("Insert into partitioned table with no partition values") { + val df = Seq((1, 1), (2, 2), (3, 3)).toDF("c1", "C2") + df.registerTempTable("nonpartition_tab") + sql("CREATE TABLE partitioned_tab (x INT) PARTITIONED BY (y INT)") + intercept[SparkException] { + sql("INSERT OVERWRITE TABLE partitioned_tab SELECT c1 FROM nonpartition_tab") + } + sql("DROP TABLE partitioned_tab") + } } From f0666789dc358a77ab0f0867e60d7e09490a8236 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 10 Dec 2015 15:48:40 -0800 Subject: [PATCH 2/2] Address michael's comments --- .../hive/execution/InsertIntoHiveTable.scala | 22 +++++++++---------- .../sql/hive/InsertIntoHiveTableSuite.scala | 6 ++--- .../sql/hive/execution/HiveQuerySuite.scala | 10 ++++----- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 6c907f98c397c..bea267b894429 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -23,22 +23,22 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} +import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types.DataType -import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.util.SerializableJobConf private[hive] @@ -157,26 +157,26 @@ case class InsertIntoHiveTable( // Validate that partition values are specified for partition columns. if (partitionColumnNames != null && partitionColumnNames.size > 0 && partitionSpec.size == 0) { - throw new SparkException(ErrorMsg.NEED_PARTITION_ERROR.getMsg) + throw new AnalysisException(ErrorMsg.NEED_PARTITION_ERROR.getMsg) } // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) + throw new AnalysisException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) + throw new AnalysisException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 91e0bc73d7e2d..a73213809ab33 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.hadoop.hive.conf.HiveConf + import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -262,12 +262,12 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } test("Insert into partitioned table with no partition values") { + sql("DROP TABLE IF EXISTS partitioned_tab") val df = Seq((1, 1), (2, 2), (3, 3)).toDF("c1", "C2") df.registerTempTable("nonpartition_tab") sql("CREATE TABLE partitioned_tab (x INT) PARTITIONED BY (y INT)") - intercept[SparkException] { + intercept[AnalysisException] { sql("INSERT OVERWRITE TABLE partitioned_tab SELECT c1 FROM nonpartition_tab") } - sql("DROP TABLE partitioned_tab") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 8a5acaf3e10bc..6137e32127d38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -26,14 +26,14 @@ import scala.util.Try import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkFiles +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.{SparkException, SparkFiles} +import org.apache.spark.sql.hive.test.TestHive._ case class TestData(a: Int, b: String) @@ -1077,7 +1077,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=strict") // Should throw when using strict dynamic partition mode without any static partition - intercept[SparkException] { + intercept[AnalysisException] { sql( """INSERT INTO TABLE dp_test PARTITION(dp) |SELECT key, value, key % 5 FROM src @@ -1087,7 +1087,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=nonstrict") // Should throw when a static partition appears after a dynamic partition - intercept[SparkException] { + intercept[AnalysisException] { sql( """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) |SELECT key, value, key % 5 FROM src