From 6935bf5ff72815516c429edd7918181a7b7a6947 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 9 Apr 2024 16:12:29 +0900 Subject: [PATCH 1/4] [SPARK-47776][SS] Disallow binary inequality collation be used in key schema of stateful operator --- .../main/resources/error/error-classes.json | 6 +++ .../streaming/state/StateStore.scala | 29 ++++++++++++- ...StateSchemaCompatibilityCheckerSuite.scala | 24 +++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 41 ++++++++++++++++++- 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index c3a01e9dcd907..2d1b80eeb5848 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3664,6 +3664,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : { + "message" : [ + "Binary inequality column is not supported with state store. Provided schema: ." + ], + "sqlState" : "XXKST" + }, "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : { "message" : [ "Static partition column is also specified in the column list." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 959cbbaef8b02..315f3f1aa3eb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -29,14 +29,14 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -635,6 +635,20 @@ object StateStore extends Logging { storeProvider.getStore(version) } + private def disallowBinaryInequalityColumn(schema: StructType): Unit = { + schema.foreach { field => + field.dataType match { + case s: StringType => + if (!s.supportsBinaryEquality) { + throw new SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + messageParameters = Map("schema" -> schema.json) + ) + } + } + } + } + private def getStateStoreProvider( storeProviderId: StateStoreProviderId, keySchema: StructType, @@ -649,6 +663,17 @@ object StateStore extends Logging { if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) { val result = schemaValidated.getOrElseUpdate(storeProviderId, { + // SPARK-47776: collation introduces the concept of binary (in)equality, which means + // in some collation we no longer be able to just compare the binary format of two + // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically" + // same in case insensitive collation. + // State store is basically key-value storage, and the most provider implementations + // rely on the fact that all the columns in the key schema support binary equality. + // We need to disallow using binary inequality column in the key schema, before we + // could support this in majority of state store providers (or high-level of state + // store.) + disallowBinaryInequalityColumn(keySchema) + val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf) // regardless of configuration, we check compatibility to at least write schema file // if necessary diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index 7ba18a8140443..a089a05469f75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -63,6 +63,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { private val valueSchema65535Bytes = new StructType() .add(StructField("v" * (65535 - 87), IntegerType, nullable = true)) + private val keySchemaWithCollation = new StructType() + .add(StructField("key1", IntegerType, nullable = true)) + .add(StructField("key2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("key3", structSchema, nullable = true)) + + private val valueSchemaWithCollation = new StructType() + .add(StructField("value1", IntegerType, nullable = true)) + .add(StructField("value2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("value3", structSchema, nullable = true)) + // Checks on adding/removing (nested) field. test("adding field to key should fail") { @@ -241,6 +251,20 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { ignoreValueSchema = true) } + test("SPARK-47776: checking for compatibility with collation change in key") { + verifyException(keySchema, valueSchema, keySchemaWithCollation, valueSchema, + ignoreValueSchema = false) + verifyException(keySchemaWithCollation, valueSchema, keySchema, valueSchema, + ignoreValueSchema = false) + } + + test("SPARK-47776: checking for compatibility with collation change in value") { + verifyException(keySchema, valueSchema, keySchema, valueSchemaWithCollation, + ignoreValueSchema = false) + verifyException(keySchema, valueSchemaWithCollation, keySchema, valueSchema, + ignoreValueSchema = false) + } + private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7b3d899794705..504c0b334e426 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatestplus.mockito.MockitoSugar -import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.{SparkException, SparkUnsupportedOperationException, TestUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow @@ -1364,6 +1364,45 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-47776: streaming aggregation having binary inequality column in the grouping " + + "key must be disallowed") { + val tableName = "parquet_dummy_tbl" + val collationName = "UTF8_BINARY_LCASE" + + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (c1 STRING COLLATE $collationName) + |USING PARQUET + |""".stripMargin) + + sql(s"INSERT INTO $tableName VALUES ('aaa')") + sql(s"INSERT INTO $tableName VALUES ('AAA')") + + val df = spark.readStream.table(tableName) + .groupBy("c1") + .count() + + val query = df.writeStream + .format("memory") + .queryName("output") + .outputMode("update") + .start() + + val ex = intercept[StreamingQueryException] { + query.processAllAvailable() + } + checkError( + ex.getCause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + parameters = Map( + "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+" + ), + matchPVals = true + ) + } + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => From caed7fab5ac6366a38620b159be135fe3887384c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 9 Apr 2024 17:12:47 +0900 Subject: [PATCH 2/4] fix --- .../apache/spark/sql/execution/streaming/state/StateStore.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 315f3f1aa3eb6..00ce0d2d1715f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -645,6 +645,8 @@ object StateStore extends Logging { messageParameters = Map("schema" -> schema.json) ) } + + case _ => // no-op } } } From d91bad37ac270ba765b3fe994c14cc8ce635b50d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 9 Apr 2024 23:12:54 +0900 Subject: [PATCH 3/4] fix --- .../main/resources/error/error-classes.json | 12 ++++++------ docs/sql-error-conditions.md | 6 ++++++ .../streaming/state/StateStore.scala | 19 ++++++------------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2d1b80eeb5848..45a1ec5e1e849 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3652,6 +3652,12 @@ ], "sqlState" : "XXKST" }, + "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : { + "message" : [ + "Binary inequality column is not supported with state store. Provided schema: ." + ], + "sqlState" : "XXKST" + }, "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : { "message" : [ "State store operation= not supported on missing column family=." @@ -3664,12 +3670,6 @@ ], "sqlState" : "42802" }, - "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : { - "message" : [ - "Binary inequality column is not supported with state store. Provided schema: ." - ], - "sqlState" : "XXKST" - }, "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : { "message" : [ "Static partition column is also specified in the column list." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 1887af2e814be..bb25a4c7f9f0d 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2256,6 +2256,12 @@ Null type ordering column with name=`` at index=`` is not supp `` operation not supported with `` +### STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY + +[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +Binary inequality column is not supported with state store. Provided schema: ``. + ### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 00ce0d2d1715f..69c9e0ed85be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -636,18 +636,11 @@ object StateStore extends Logging { } private def disallowBinaryInequalityColumn(schema: StructType): Unit = { - schema.foreach { field => - field.dataType match { - case s: StringType => - if (!s.supportsBinaryEquality) { - throw new SparkUnsupportedOperationException( - errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", - messageParameters = Map("schema" -> schema.json) - ) - } - - case _ => // no-op - } + if (!UnsafeRowUtils.isBinaryStable(schema)) { + throw new SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + messageParameters = Map("schema" -> schema.json) + ) } } From 07918c76bbfb5560bffabdea4bc2ee1c86d44e4a Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 10 Apr 2024 06:09:04 +0900 Subject: [PATCH 4/4] empty commit