diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index c3a01e9dcd907..45a1ec5e1e849 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3652,6 +3652,12 @@ ], "sqlState" : "XXKST" }, + "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : { + "message" : [ + "Binary inequality column is not supported with state store. Provided schema: ." + ], + "sqlState" : "XXKST" + }, "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : { "message" : [ "State store operation= not supported on missing column family=." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 1887af2e814be..bb25a4c7f9f0d 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2256,6 +2256,12 @@ Null type ordering column with name=`` at index=`` is not supp `` operation not supported with `` +### STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY + +[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +Binary inequality column is not supported with state store. Provided schema: ``. + ### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 959cbbaef8b02..69c9e0ed85be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils @@ -635,6 +635,15 @@ object StateStore extends Logging { storeProvider.getStore(version) } + private def disallowBinaryInequalityColumn(schema: StructType): Unit = { + if (!UnsafeRowUtils.isBinaryStable(schema)) { + throw new SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + messageParameters = Map("schema" -> schema.json) + ) + } + } + private def getStateStoreProvider( storeProviderId: StateStoreProviderId, keySchema: StructType, @@ -649,6 +658,17 @@ object StateStore extends Logging { if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) { val result = schemaValidated.getOrElseUpdate(storeProviderId, { + // SPARK-47776: collation introduces the concept of binary (in)equality, which means + // in some collation we no longer be able to just compare the binary format of two + // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically" + // same in case insensitive collation. + // State store is basically key-value storage, and the most provider implementations + // rely on the fact that all the columns in the key schema support binary equality. + // We need to disallow using binary inequality column in the key schema, before we + // could support this in majority of state store providers (or high-level of state + // store.) + disallowBinaryInequalityColumn(keySchema) + val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf) // regardless of configuration, we check compatibility to at least write schema file // if necessary diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index 7ba18a8140443..a089a05469f75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -63,6 +63,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { private val valueSchema65535Bytes = new StructType() .add(StructField("v" * (65535 - 87), IntegerType, nullable = true)) + private val keySchemaWithCollation = new StructType() + .add(StructField("key1", IntegerType, nullable = true)) + .add(StructField("key2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("key3", structSchema, nullable = true)) + + private val valueSchemaWithCollation = new StructType() + .add(StructField("value1", IntegerType, nullable = true)) + .add(StructField("value2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("value3", structSchema, nullable = true)) + // Checks on adding/removing (nested) field. test("adding field to key should fail") { @@ -241,6 +251,20 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { ignoreValueSchema = true) } + test("SPARK-47776: checking for compatibility with collation change in key") { + verifyException(keySchema, valueSchema, keySchemaWithCollation, valueSchema, + ignoreValueSchema = false) + verifyException(keySchemaWithCollation, valueSchema, keySchema, valueSchema, + ignoreValueSchema = false) + } + + test("SPARK-47776: checking for compatibility with collation change in value") { + verifyException(keySchema, valueSchema, keySchema, valueSchemaWithCollation, + ignoreValueSchema = false) + verifyException(keySchema, valueSchemaWithCollation, keySchema, valueSchema, + ignoreValueSchema = false) + } + private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7b3d899794705..504c0b334e426 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatestplus.mockito.MockitoSugar -import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.{SparkException, SparkUnsupportedOperationException, TestUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow @@ -1364,6 +1364,45 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-47776: streaming aggregation having binary inequality column in the grouping " + + "key must be disallowed") { + val tableName = "parquet_dummy_tbl" + val collationName = "UTF8_BINARY_LCASE" + + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (c1 STRING COLLATE $collationName) + |USING PARQUET + |""".stripMargin) + + sql(s"INSERT INTO $tableName VALUES ('aaa')") + sql(s"INSERT INTO $tableName VALUES ('AAA')") + + val df = spark.readStream.table(tableName) + .groupBy("c1") + .count() + + val query = df.writeStream + .format("memory") + .queryName("output") + .outputMode("update") + .start() + + val ex = intercept[StreamingQueryException] { + query.processAllAvailable() + } + checkError( + ex.getCause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + parameters = Map( + "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+" + ), + matchPVals = true + ) + } + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir =>