Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ case class InsertIntoTable(
if (table.output.isEmpty) {
None
} else {
// Note: The parser (visitPartitionSpec in AstBuilder) already turns
// keys in partition to their lowercase forms.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val staticPartCols = partition.filter(_._2.isDefined).keySet
Some(table.output.filterNot(a => staticPartCols.contains(a.name)))
}
Expand Down
24 changes: 6 additions & 18 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
if (partitioningColumns.isDefined) {
throw new AnalysisException(
"insertInto() can't be used together with partitionBy(). " +
"Partition columns are defined by the table into which is being inserted."
"Partition columns have already be defined for the table. " +
"It is not necessary to use partitionBy()."
)
}

val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap)
val overwrite = mode == SaveMode.Overwrite

// A partitioned relation's schema can be different from the input logicalPlan, since
// partition columns are all moved after data columns. We Project to adjust the ordering.
// TODO: this belongs to the analyzer.
val input = normalizedParCols.map { parCols =>
val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr =>
parCols.contains(attr.name)
}
Project(inputDataCols ++ inputPartCols, df.logicalPlan)
}.getOrElse(df.logicalPlan)

df.sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
input,
overwrite,
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)).toRdd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ case class DataSource(
// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingColumns = Try {
val existingPartitionColumns = Try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the variable since it is for partition columns not all columns.

resolveRelation()
.asInstanceOf[HadoopFsRelation]
.location
Expand All @@ -444,13 +444,14 @@ case class DataSource(
.fieldNames
.toSeq
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
if (existingColumns.size > 0 && !sameColumns) {
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.size > 0 && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingColumns.mkString(", ")}
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
* table. It also does data type casting and field renaming, to make sure that the columns to be
* inserted have the correct data type and fields have the correct names.
*/
private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def preprocess(
insert: InsertIntoTable,
tblName: String,
Expand All @@ -84,7 +84,13 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
if (insert.partition.nonEmpty) {
// the query's partitioning must match the table's partitioning
// this is set for queries like: insert into ... partition (one = "a", two = <expr>)
if (insert.partition.keySet != partColNames.toSet) {
val samePartitionColumns =
if (conf.caseSensitiveAnalysis) {
insert.partition.keySet == partColNames.toSet
} else {
insert.partition.keySet.map(_.toLowerCase) == partColNames.map(_.toLowerCase).toSet
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a best effort to respect case sensitivity setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitioning columns are special since we use them to create directories in the file system. Once directories are created, case sensitivity setting will not affect anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since the parser already turn column names in partition to their lower case form, this change does not really affect anything when we use case insensitive resolution.

if (!samePartitionColumns) {
throw new AnalysisException(
s"""
|Requested partitioning does not match the table $tblName:
Expand All @@ -94,7 +100,8 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] {
}
expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
} else {
// All partition columns are dynamic because this InsertIntoTable had no partitioning
// All partition columns are dynamic because because the InsertIntoTable command does
// not explicitly specify partitioning columns.
expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
.copy(partition = partColNames.map(_ -> None).toMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
PreprocessTableInsertion ::
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,8 +1337,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(sql("select * from partitionedTable").collect().size == 1)
// Inserts new data successfully when partition columns are correctly specified in
// partitionBy(...).
df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable")
assert(sql("select * from partitionedTable").collect().size == 2)
// TODO: Right now, partition columns are always treated in a case-insensitive way.
// See the write method in DataSource.scala.
Seq((4, 5, 6)).toDF("a", "B", "c")
.write
.mode("append")
.partitionBy("a", "B")
.saveAsTable("partitionedTable")

Seq((7, 8, 9)).toDF("a", "b", "c")
.write
.mode("append")
.partitionBy("a", "b")
.saveAsTable("partitionedTable")

checkAnswer(
sql("select a, b, c from partitionedTable"),
Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
catalog.ParquetConversions ::
catalog.OrcConversions ::
catalog.CreateTables ::
PreprocessTableInsertion ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,24 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
assert(!logical.resolved, "Should not resolve: missing partition data")
}
}

testPartitionedTable(
"SPARK-16036: better error message when insert into a table with mismatch schema") {
tableName =>
val e = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
}
assert(e.message.contains("the number of columns are different"))
}

testPartitionedTable(
"SPARK-16037: INSERT statement should match columns by position") {
tableName =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(s"INSERT INTO TABLE $tableName SELECT 1, 2 AS c, 3 AS b")
checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 2, 3")
checkAnswer(sql(s"SELECT a, b, c FROM $tableName"), Row(1, 2, 3))
}
}
Copy link
Contributor Author

@yhuai yhuai Jun 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,41 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("SELECT * FROM boom").queryExecution.analyzed
}

test("SPARK-3810: PreprocessTableInsertion static partitioning support") {
val analyzedPlan = {
loadTestTable("srcpart")
sql("DROP TABLE IF EXISTS withparts")
sql("CREATE TABLE withparts LIKE srcpart")
sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
.queryExecution.analyzed
}

assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size
}
}

test("SPARK-3810: PreprocessTableInsertion dynamic partitioning support") {
val analyzedPlan = {
loadTestTable("srcpart")
sql("DROP TABLE IF EXISTS withparts")
sql("CREATE TABLE withparts LIKE srcpart")
sql("SET hive.exec.dynamic.partition.mode=nonstrict")

sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value, '1', '2' FROM src")
.queryExecution.analyzed
}

assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add these two tests back.


test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,36 +1684,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
)
}
}

test("SPARK-16036: better error message when insert into a table with mismatch schema") {
withTable("hive_table", "datasource_table") {
sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)")
sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)")
val e1 = intercept[AnalysisException] {
sql("INSERT INTO TABLE hive_table PARTITION(b=1, c=2) SELECT 1, 2, 3")
}
assert(e1.message.contains("the number of columns are different"))
val e2 = intercept[AnalysisException] {
sql("INSERT INTO TABLE datasource_table PARTITION(b=1, c=2) SELECT 1, 2, 3")
}
assert(e2.message.contains("the number of columns are different"))
}
}

test("SPARK-16037: INSERT statement should match columns by position") {
withTable("hive_table", "datasource_table") {
sql("CREATE TABLE hive_table(a INT) PARTITIONED BY (b INT, c INT)")
sql("CREATE TABLE datasource_table(a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)")

withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql("INSERT INTO TABLE hive_table SELECT 1, 2 AS c, 3 AS b")
checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3))
sql("INSERT OVERWRITE TABLE hive_table SELECT 1, 2, 3")
checkAnswer(sql("SELECT a, b, c FROM hive_table"), Row(1, 2, 3))
}

sql("INSERT INTO TABLE datasource_table SELECT 1, 2 AS c, 3 AS b")
checkAnswer(sql("SELECT a, b, c FROM datasource_table"), Row(1, 2, 3))
}
}
}