From ae40a71de735b7b53e95bebc912a7bb989777720 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 07:19:38 +0200 Subject: [PATCH 01/15] making ANSI store assignment policy as default --- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 ++++++++---------- .../analysis/TableOutputResolver.scala | 11 ++++------- .../apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../sql/execution/datasources/rules.scala | 10 ++-------- 4 files changed, 17 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b4d159eab4508..58bc6578d5781 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2503,9 +2503,9 @@ class Analyzer( override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case append @ AppendData(table, query, _, isByName) if table.resolved && query.resolved && !append.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { append.copy(query = projection) @@ -2515,9 +2515,9 @@ class Analyzer( case overwrite @ OverwriteByExpression(table, _, query, _, isByName) if table.resolved && query.resolved && !overwrite.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { overwrite.copy(query = projection) @@ -2527,9 +2527,9 @@ class Analyzer( case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName) if table.resolved && query.resolved && !overwrite.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { overwrite.copy(query = projection) @@ -2539,16 +2539,14 @@ class Analyzer( } } - private def storeAssignmentPolicy: StoreAssignmentPolicy.Value = { - val policy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.STRICT) + private def validateStoreAssignmentPolicy(): Unit = { // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2. - if (policy == StoreAssignmentPolicy.LEGACY) { + if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) { val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key throw new AnalysisException(s""" |"LEGACY" store assignment policy is disallowed in Spark data source V2. |Please set the configuration $configKey to other values.""".stripMargin) } - policy } private def commonNaturalJoinProcessing( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index e5d25547d4d55..4f33ca99c02db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -32,8 +32,7 @@ object TableOutputResolver { expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, - conf: SQLConf, - storeAssignmentPolicy: StoreAssignmentPolicy.Value): LogicalPlan = { + conf: SQLConf): LogicalPlan = { if (expected.size < query.output.size) { throw new AnalysisException( @@ -47,8 +46,7 @@ object TableOutputResolver { expected.flatMap { tableAttr => query.resolve(Seq(tableAttr.name), conf.resolver) match { case Some(queryExpr) => - checkField( - tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err) + checkField(tableAttr, queryExpr, byName, conf, err => errors += err) case None => errors += s"Cannot find data for output column '${tableAttr.name}'" None @@ -66,8 +64,7 @@ object TableOutputResolver { query.output.zip(expected).flatMap { case (queryExpr, tableAttr) => - checkField( - tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err) + checkField(tableAttr, queryExpr, byName, conf, err => errors += err) } } @@ -88,9 +85,9 @@ object TableOutputResolver { queryExpr: NamedExpression, byName: Boolean, conf: SQLConf, - storeAssignmentPolicy: StoreAssignmentPolicy.Value, addError: String => Unit): Option[NamedExpression] = { + val storeAssignmentPolicy = conf.storeAssignmentPolicy lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) && tableAttr.name == queryExpr.name && tableAttr.metadata == queryExpr.metadata) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index eebf4b6dfd396..627c9f0b2cfef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1731,7 +1731,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(StoreAssignmentPolicy.values.map(_.toString)) - .createOptional + .createWithDefault(StoreAssignmentPolicy.ANSI.toString) val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled") .doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " + @@ -2461,8 +2461,8 @@ class SQLConf extends Serializable with Logging { def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) - def storeAssignmentPolicy: Option[StoreAssignmentPolicy.Value] = - getConf(STORE_ASSIGNMENT_POLICY).map(StoreAssignmentPolicy.withName) + def storeAssignmentPolicy: StoreAssignmentPolicy.Value = + StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY)) def ansiEnabled: Boolean = getConf(ANSI_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c92c68095db8f..50d73d9a7a2eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -190,14 +190,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi query } - // SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy. - // TODO: use ANSI store assignment policy by default in SPARK-28495. - val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY) c.copy( tableDesc = existingTable, query = Some(TableOutputResolver.resolveOutputColumns( tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery, - byName = true, conf, storeAssignmentPolicy))) + byName = true, conf))) // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity // config, and do various checks: @@ -403,11 +400,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - // SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy. - // TODO: use ANSI store assignment policy by default in SPARK-28495. - val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY) val newQuery = TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, insert.query, byName = false, conf, storeAssignmentPolicy) + tblName, expectedColumns, insert.query, byName = false, conf) if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { throw new AnalysisException( From 6f9cfa1d00295af0fc6d14ca7445709112f54d10 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 13:47:37 +0200 Subject: [PATCH 02/15] fix test failures --- .../spark/sql/catalyst/expressions/Cast.scala | 1 + .../org/apache/spark/sql/types/DataType.scala | 2 ++ .../org/apache/spark/sql/SQLQueryTestSuite.scala | 14 +++++++++++++- .../execution/datasources/orc/OrcSourceSuite.scala | 4 +++- .../thriftserver/ThriftServerQueryTestSuite.scala | 11 +++++++++++ .../hive/execution/HiveCompatibilitySuite.scala | 3 +++ .../spark/sql/hive/orc/HiveOrcQuerySuite.scala | 4 +++- 7 files changed, 36 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 42b471f20ff91..d71f300dd26dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -165,6 +165,7 @@ object Cast { */ def canANSIStoreAssign(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true + case (NullType, _) => true case (_: NumericType, _: NumericType) => true case (_: AtomicType, StringType) => true case (_: CalendarIntervalType, StringType) => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 3a10a56f6937f..141cb8a585267 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -448,6 +448,8 @@ object DataType { fieldCompatible + case (_: NullType, _) => true + case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT => if (!Cast.canUpCast(w, r)) { addError(s"Cannot safely cast '$context': $w to $r") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index c74fa2da42afa..1302c6f06dcfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -143,7 +143,19 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { /** List of test cases to ignore, in lower cases. */ protected def blackList: Set[String] = Set( - "blacklist.sql" // Do NOT remove this one. It is here to test the blacklist functionality. + "blacklist.sql", // Do NOT remove this one. It is here to test the blacklist functionality. + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + "postgreSQL/numeric.sql", + "postgreSQL/int2.sql", + "postgreSQL/int4.sql", + "postgreSQL/int8.sql", + "postgreSQL/float4.sql", + "postgreSQL/float8.sql", + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. + "postgreSQL/date.sql", + "postgreSQL/timestamp.sql" ) // Create all the test cases. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 55b361d5ac994..1e27593584786 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -346,7 +346,9 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } - test("SPARK-23340 Empty float/double array columns raise EOFException") { + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + ignore("SPARK-23340 Empty float/double array columns raise EOFException") { Seq(Seq(Array.empty[Float]).toDF(), Seq(Array.empty[Double]).toDF()).foreach { df => withTempPath { path => df.write.format("orc").save(path.getCanonicalPath) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 36fcde35982cc..20f496c1a1178 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -92,6 +92,17 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "date.sql", // SPARK-28620 "postgreSQL/float4.sql", + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + "postgreSQL/numeric.sql", + "postgreSQL/int2.sql", + "postgreSQL/int4.sql", + "postgreSQL/int8.sql", + "postgreSQL/float8.sql", + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. + "postgreSQL/date.sql", + "postgreSQL/timestamp.sql" // SPARK-28636 "decimalArithmeticOperations.sql", "literals.sql", diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 7a9f5c67fc693..36c19c680d165 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy /** * Runs the test cases that are included in the hive distribution. @@ -59,6 +60,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) + // Ensures that the table insertion behaivor is consistent with Hive + TestHive.setConf(SQLConf.STORE_ASSIGNMENT_POLICY, StoreAssignmentPolicy.LEGACY.toString) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 00333397e1fbb..1a6279de87d86 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -210,7 +210,9 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } - test("SPARK-23340 Empty float/double array columns raise EOFException") { + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + ignore("SPARK-23340 Empty float/double array columns raise EOFException") { withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { withTable("spark_23340") { sql("CREATE TABLE spark_23340(a array, b array) STORED AS ORC") From ad8f5787d54d29a21ad63f0e7ba16680586fa83c Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 14:00:17 +0200 Subject: [PATCH 03/15] fix one more test failure --- .../spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 25ff3544185af..498bee3b2a54f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -168,7 +168,9 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS checkNumericTypes(fileFormat, "DECIMAL(38, 2)", 2.1D) // Date/Time Types - checkDateTimeTypes(fileFormat) + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. + // checkDateTimeTypes(fileFormat) // String Types checkStringTypes(fileFormat, "STRING", "s1") From 758ac1fc1cd364d3e4512cdf79aa19b841f1b7f6 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 14:13:35 +0200 Subject: [PATCH 04/15] fix failure --- .../sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 20f496c1a1178..799f00a28fd49 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -102,7 +102,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. "postgreSQL/date.sql", - "postgreSQL/timestamp.sql" + "postgreSQL/timestamp.sql", // SPARK-28636 "decimalArithmeticOperations.sql", "literals.sql", From 28c49e7a5d81e3bca2c21ca285e8a1ab837b3bb2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 14:58:14 +0200 Subject: [PATCH 05/15] fix more failures --- .../datasources/parquet/ParquetQuerySuite.scala | 16 ++++++++-------- .../spark/sql/hive/orc/HiveOrcQuerySuite.scala | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 88b94281d88ee..f38973f7dffd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -162,9 +162,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS test("SPARK-10634 timestamp written and read as INT64 - truncation") { withTable("ts") { sql("create table ts (c1 int, c2 timestamp) using parquet") - sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')") sql("insert into ts values (2, null)") - sql("insert into ts values (3, '1965-01-01 10:11:12.123456')") + sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.123456')") val expected = Seq( (1, "2016-01-01 10:11:12.123456"), (2, null), @@ -177,13 +177,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS withTable("ts") { withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") { sql("create table ts (c1 int, c2 timestamp) using parquet") - sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')") sql("insert into ts values (2, null)") - sql("insert into ts values (3, '1965-01-01 10:11:12.125456')") - sql("insert into ts values (4, '1965-01-01 10:11:12.125')") - sql("insert into ts values (5, '1965-01-01 10:11:12.1')") - sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')") - sql("insert into ts values (7, '0001-01-01 00:00:00.000000')") + sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.125456')") + sql("insert into ts values (4, timestamp'1965-01-01 10:11:12.125')") + sql("insert into ts values (5, timestamp'1965-01-01 10:11:12.1')") + sql("insert into ts values (6, timestamp'1965-01-01 10:11:12.123456789')") + sql("insert into ts values (7, timestamp'0001-01-01 00:00:00.000000')") val expected = Seq( (1, "2016-01-01 10:11:12.123"), (2, null), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 1a6279de87d86..3c545c577f16d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -212,6 +212,7 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { // SPARK-28885 String value is not allowed to be stored as numeric type with // ANSI store assignment policy. + // TODO: re-enable the test case when SPARK-29462 is fixed. ignore("SPARK-23340 Empty float/double array columns raise EOFException") { withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { withTable("spark_23340") { From 3d802005e3d27b9b0c5bc45c6abd8ae9fbc0321d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 15:30:37 +0200 Subject: [PATCH 06/15] more fixes --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 5 ++++- .../spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 70b1db8e5f0d2..16348096013ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -150,9 +150,9 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) - Seq("c" -> 3).toDF("i", "j").write.mode("append").saveAsTable("t") + Seq(3.5 -> 3).toDF("i", "j").write.mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") - :: Row(null, "3") :: Nil) + :: Row(3, "3") :: Nil) Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5e853e666be64..9e33b8aaec5d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -729,7 +729,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { spark.sessionState.catalog.createTable(newTable, false) sql("INSERT INTO TABLE test_table SELECT 1, 'a'") - sql("INSERT INTO TABLE test_table SELECT 2, null") + val msg = intercept[AnalysisException] { + sql("INSERT INTO TABLE test_table SELECT 2, null") + }.getMessage + assert(msg.contains("Cannot write nullable values to non-null column 's'")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 498bee3b2a54f..6c68feab483ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -119,7 +119,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')") checkAnswer(spark.table("hive_serde"), Row("1".getBytes)) - spark.sql("INSERT INTO TABLE hive_serde values('2')") + spark.sql("INSERT INTO TABLE hive_serde values(BINARY('2'))") checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes))) } } From 4bd989e43d63ab97c7f2ee8d2dd0e55910cdd0ae Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 16:00:56 +0200 Subject: [PATCH 07/15] add migration guide --- docs/sql-migration-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 8c5721340a30c..f250fec7d0689 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -23,6 +23,7 @@ license: | {:toc} ## Upgrading from Spark SQL 2.4 to 3.0 + - Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior. - In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`. From 944a426713534612cd752f54eb3f68d65e6dd0a2 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 16:21:05 +0200 Subject: [PATCH 08/15] fix more test failures --- .../org/apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1c82c7e86faab..ac31557b94197 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -908,7 +908,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "data type mismatch: cannot cast decimal(2,1) to binary" + val errorMsg = "Cannot safely cast 'f0': DecimalType(2,1) to BinaryType" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4253fe2e1edcb..6d12310714164 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1828,10 +1828,10 @@ class HiveDDLSuite .write.format("hive").mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) - Seq("c" -> 3).toDF("i", "j") + Seq(3.5 -> 3).toDF("i", "j") .write.format("hive").mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") - :: Row(null, "3") :: Nil) + :: Row(3, "3") :: Nil) Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1") From 24056b96346d15918387332bddb7f0fb4b44ceab Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 20:02:47 +0200 Subject: [PATCH 09/15] fix --- .../src/main/scala/org/apache/spark/sql/types/DataType.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 141cb8a585267..ad1d6b62ef3a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -448,8 +448,6 @@ object DataType { fieldCompatible - case (_: NullType, _) => true - case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT => if (!Cast.canUpCast(w, r)) { addError(s"Cannot safely cast '$context': $w to $r") @@ -458,6 +456,8 @@ object DataType { true } + case (_: NullType, _) if storeAssignmentPolicy == ANSI => true + case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI => if (!Cast.canANSIStoreAssign(w, r)) { addError(s"Cannot safely cast '$context': $w to $r") From b602083389bd9cd4080049bb78f0d0adfc5d07f9 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 22:25:47 +0200 Subject: [PATCH 10/15] fix --- .../spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 799f00a28fd49..74272033432d0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -99,6 +99,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "postgreSQL/int4.sql", "postgreSQL/int8.sql", "postgreSQL/float8.sql", + "pgSQL/window_part1.sql", // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. "postgreSQL/date.sql", From cb70ddc0679c743c068cc947fba7520551c136e7 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 14 Oct 2019 23:11:39 +0200 Subject: [PATCH 11/15] fix more failure --- .../src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 1302c6f06dcfc..a80aa3eb40c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -152,6 +152,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { "postgreSQL/int8.sql", "postgreSQL/float4.sql", "postgreSQL/float8.sql", + "pgSQL/window_part1.sql", // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. "postgreSQL/date.sql", From e24e35d5e2cebe97e0167ef325ebf2064b780fbf Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 15 Oct 2019 00:39:41 +0200 Subject: [PATCH 12/15] Revert "fix" This reverts commit b602083389bd9cd4080049bb78f0d0adfc5d07f9. --- .../spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 74272033432d0..799f00a28fd49 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -99,7 +99,6 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "postgreSQL/int4.sql", "postgreSQL/int8.sql", "postgreSQL/float8.sql", - "pgSQL/window_part1.sql", // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. "postgreSQL/date.sql", From e7ccbac569cced6b2c3e7bd8a1976131c09d72e9 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 15 Oct 2019 00:39:46 +0200 Subject: [PATCH 13/15] Revert "fix more failure" This reverts commit cb70ddc0679c743c068cc947fba7520551c136e7. --- .../src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index a80aa3eb40c92..1302c6f06dcfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -152,7 +152,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { "postgreSQL/int8.sql", "postgreSQL/float4.sql", "postgreSQL/float8.sql", - "pgSQL/window_part1.sql", // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. "postgreSQL/date.sql", From 4b777365c3074a5be77e33683686f26865e644fa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 15 Oct 2019 09:02:40 +0200 Subject: [PATCH 14/15] fix one more failure --- .../DataTypeWriteCompatibilitySuite.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 9d6827194f004..c47332f5d9fcb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -76,6 +76,14 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa assert(err.contains("Cannot safely cast")) } } + + test("Check NullType is incompatible with all other types") { + allNonNullTypes.foreach { t => + assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => + assert(err.contains(s"incompatible with $t")) + } + } + } } class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBaseSuite { @@ -145,6 +153,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType")) } } + + test("Check NullType is compatible with all other types") { + allNonNullTypes.foreach { t => + assertAllowed(NullType, t, "nulls", s"Should allow writing None to type $t") + } + } } abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { @@ -175,17 +189,9 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { private val nestedContainerTypes = Seq(ArrayType(point2, containsNull = false), MapType(StringType, point3, valueContainsNull = false)) - private val allNonNullTypes = Seq( + protected val allNonNullTypes = Seq( atomicTypes, simpleContainerTypes, nestedContainerTypes, Seq(CalendarIntervalType)).flatten - test("Check NullType is incompatible with all other types") { - allNonNullTypes.foreach { t => - assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) - } - } - } - test("Check each type with itself") { allNonNullTypes.foreach { t => assertAllowed(t, t, "t", s"Should allow writing type to itself $t") From b9abb67cd9761e59ee62ecb77b269deb725b1185 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 15 Oct 2019 15:21:30 +0200 Subject: [PATCH 15/15] recover one test case --- .../spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 6c68feab483ac..f8ba7bf2c1a62 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -65,7 +65,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')") checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12 15:50:00')") + spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12 15:50:00'))") checkAnswer( spark.table("hive_serde"), Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")), @@ -77,7 +77,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')") checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12')") + spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12'))") checkAnswer( spark.table("hive_serde"), Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12")))) @@ -170,7 +170,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS // Date/Time Types // SPARK-28885 String value is not allowed to be stored as date/timestamp type with // ANSI store assignment policy. - // checkDateTimeTypes(fileFormat) + checkDateTimeTypes(fileFormat) // String Types checkStringTypes(fileFormat, "STRING", "s1")