Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types.DataType
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.util.SerializableJobConf

private[hive]
Expand Down Expand Up @@ -155,23 +155,28 @@ case class InsertIntoHiveTable(
val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull

// Validate that partition values are specified for partition columns.
if (partitionColumnNames != null && partitionColumnNames.size > 0 && partitionSpec.size == 0) {
throw new AnalysisException(ErrorMsg.NEED_PARTITION_ERROR.getMsg)
}

// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
throw new AnalysisException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}

// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
throw new AnalysisException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}

// Report error if any static partition appears after a dynamic partition
val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
import java.io.File

import org.apache.hadoop.hive.conf.HiveConf

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -259,4 +260,14 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef

sql("DROP TABLE table_with_partition")
}

test("Insert into partitioned table with no partition values") {
sql("DROP TABLE IF EXISTS partitioned_tab")
val df = Seq((1, 1), (2, 2), (3, 3)).toDF("c1", "C2")
df.registerTempTable("nonpartition_tab")
sql("CREATE TABLE partitioned_tab (x INT) PARTITIONED BY (y INT)")
intercept[AnalysisException] {
sql("INSERT OVERWRITE TABLE partitioned_tab SELECT c1 FROM nonpartition_tab")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkFiles
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.hive.test.TestHive._

case class TestData(a: Int, b: String)

Expand Down Expand Up @@ -1077,7 +1077,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SET hive.exec.dynamic.partition.mode=strict")

// Should throw when using strict dynamic partition mode without any static partition
intercept[SparkException] {
intercept[AnalysisException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp)
|SELECT key, value, key % 5 FROM src
Expand All @@ -1087,7 +1087,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SET hive.exec.dynamic.partition.mode=nonstrict")

// Should throw when a static partition appears after a dynamic partition
intercept[SparkException] {
intercept[AnalysisException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
|SELECT key, value, key % 5 FROM src
Expand Down