Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3652,6 +3652,12 @@
],
"sqlState" : "XXKST"
},
"STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : {
"message" : [
"Binary inequality column is not supported with state store. Provided schema: <schema>."
],
"sqlState" : "XXKST"
},
"STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : {
"message" : [
"State store operation=<operationType> not supported on missing column family=<colFamilyName>."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,12 @@ Null type ordering column with name=`<fieldName>` at index=`<index>` is not supp

`<operationType>` operation not supported with `<entity>`

### STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY

[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)

Binary inequality column is not supported with state store. Provided schema: `<schema>`.

### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
Expand Down Expand Up @@ -635,6 +635,15 @@ object StateStore extends Logging {
storeProvider.getStore(version)
}

private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use following utility function:

  def isBinaryStable(dataType: DataType): Boolean = !dataType.existsRecursively {
    case st: StringType => !CollationFactory.fetchCollation(st.collationId).supportsBinaryEquality
    case _ => false
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done!

if (!UnsafeRowUtils.isBinaryStable(schema)) {
throw new SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
messageParameters = Map("schema" -> schema.json)
)
}
}

private def getStateStoreProvider(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
Expand All @@ -649,6 +658,17 @@ object StateStore extends Logging {

if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) {
val result = schemaValidated.getOrElseUpdate(storeProviderId, {
// SPARK-47776: collation introduces the concept of binary (in)equality, which means
// in some collation we no longer be able to just compare the binary format of two
// UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically"
// same in case insensitive collation.
// State store is basically key-value storage, and the most provider implementations
// rely on the fact that all the columns in the key schema support binary equality.
// We need to disallow using binary inequality column in the key schema, before we
// could support this in majority of state store providers (or high-level of state
// store.)
disallowBinaryInequalityColumn(keySchema)

val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
// regardless of configuration, we check compatibility to at least write schema file
// if necessary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
private val valueSchema65535Bytes = new StructType()
.add(StructField("v" * (65535 - 87), IntegerType, nullable = true))

private val keySchemaWithCollation = new StructType()
.add(StructField("key1", IntegerType, nullable = true))
.add(StructField("key2", StringType("UTF8_BINARY_LCASE"), nullable = true))
.add(StructField("key3", structSchema, nullable = true))

private val valueSchemaWithCollation = new StructType()
.add(StructField("value1", IntegerType, nullable = true))
.add(StructField("value2", StringType("UTF8_BINARY_LCASE"), nullable = true))
.add(StructField("value3", structSchema, nullable = true))

// Checks on adding/removing (nested) field.

test("adding field to key should fail") {
Expand Down Expand Up @@ -241,6 +251,20 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
ignoreValueSchema = true)
}

test("SPARK-47776: checking for compatibility with collation change in key") {
verifyException(keySchema, valueSchema, keySchemaWithCollation, valueSchema,
ignoreValueSchema = false)
verifyException(keySchemaWithCollation, valueSchema, keySchema, valueSchema,
ignoreValueSchema = false)
}

test("SPARK-47776: checking for compatibility with collation change in value") {
verifyException(keySchema, valueSchema, keySchema, valueSchemaWithCollation,
ignoreValueSchema = false)
verifyException(keySchema, valueSchemaWithCollation, keySchema, valueSchema,
ignoreValueSchema = false)
}

private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = {
applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.{SparkException, SparkUnsupportedOperationException, TestUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -1364,6 +1364,45 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-47776: streaming aggregation having binary inequality column in the grouping " +
"key must be disallowed") {
val tableName = "parquet_dummy_tbl"
val collationName = "UTF8_BINARY_LCASE"

withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName (c1 STRING COLLATE $collationName)
|USING PARQUET
|""".stripMargin)

sql(s"INSERT INTO $tableName VALUES ('aaa')")
sql(s"INSERT INTO $tableName VALUES ('AAA')")

val df = spark.readStream.table(tableName)
.groupBy("c1")
.count()

val query = df.writeStream
.format("memory")
.queryName("output")
.outputMode("update")
.start()

val ex = intercept[StreamingQueryException] {
query.processAllAvailable()
}
checkError(
ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
parameters = Map(
"schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+"
),
matchPVals = true
)
}
}

private def checkExceptionMessage(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down