Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,12 @@
],
"sqlState" : "42K02"
},
"DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
"message" : [
"The schema of the data source table <tableSchema> does not match the actual schema <actualSchema>. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema."
],
"sqlState" : "42K03"
},
"DATETIME_OVERFLOW" : {
"message" : [
"Datetime operation overflow: <operation>."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ Data source '`<provider>`' not found. Please make sure the data source is regist

Failed to find the data source: `<provider>`. Please find packages at `https://spark.apache.org/third-party-projects.html`.

### DATA_SOURCE_TABLE_SCHEMA_MISMATCH

[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

The schema of the data source table `<tableSchema>` does not match the actual schema `<actualSchema>`. If you are using the DataFrameReader.schema API or creating a table, avoid specifying the schema.

### DATETIME_OVERFLOW

[SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
unsupportedTableOperationError(table.name(), "either micro-batch or continuous scan")
}

def unsupportedAppendInBatchModeError(table: Table): Throwable = {
unsupportedTableOperationError(table.name(), "append in batch mode")
def unsupportedAppendInBatchModeError(name: String): Throwable = {
unsupportedTableOperationError(name, "append in batch mode")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we change src/main, you don't need to use [TESTS], @allisonwang-db .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! Thanks for letting me know!

}

def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
Expand Down Expand Up @@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
errorClass = "NESTED_EXECUTE_IMMEDIATE",
messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
}

def dataSourceTableSchemaMismatchError(
tableSchema: StructType, actualSchema: StructType): Throwable = {
new AnalysisException(
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
messageParameters = Map(
"tableSchema" -> toSQLType(tableSchema),
"actualSchema" -> toSQLType(actualSchema)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
// a logical plan for streaming write.
case AppendData(r: DataSourceV2Relation, _, _, _, _, _) if !supportsBatchWrite(r.table) =>
throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.table)
throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name)

case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
Expand All @@ -36,7 +37,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -232,7 +233,21 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}

loadTable(ident)
val table = loadTable(ident)

// Check if the schema of the created table matches the given schema.
// TODO: move this check in loadTable to match the behavior with
// existing file data sources.
if (schema.nonEmpty) {
val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
table.columns().asSchema)
if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check it in loadTable? In case the data source is a bit random and only return wrong schema at second time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it but looks like loadTable is used in many other places, such as in tableExists. If we check the schema there, it will fail commands other than create table (e.g DROP TABLE t will fail because the schema in catalogTable does not match the table schema)

throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
table.columns().asSchema, schema)
}
}

table
}

private def toOptions(properties: Map[String, String]): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private [connector] trait SessionCatalogTest[T <: Table, Catalog <: TestV2Sessio
spark.sessionState.catalogManager.catalog(name)
}

protected val v2Format: String = classOf[FakeV2Provider].getName
protected val v2Format: String = classOf[FakeV2ProviderWithCustomSchema].getName

protected val catalogClassName: String = classOf[InMemoryTableSessionCatalog].getName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ abstract class DataSourceV2SQLSuite
with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase
with AdaptiveSparkPlanHelper {

protected val v2Source = classOf[FakeV2Provider].getName
protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
override protected val v2Format = v2Source

protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}

test("SPARK-46043: create table in SQL with path option") {
val cls = classOf[SupportsExternalMetadataDataSource]
val cls = classOf[WritableDataSourceSupportsExternalMetadata]
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/test"
Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
Expand All @@ -725,8 +725,141 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}
}

test("SPARK-46272: create table - schema mismatch") {
withTable("test") {
val cls = classOf[WritableDataSourceSupportsExternalMetadata]
checkError(
exception = intercept[AnalysisException] {
sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName}")
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
"tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
"actualSchema" -> "\"STRUCT<x: INT, y: INT>\""))
}
}

test("SPARK-46272: create table as select") {
val cls = classOf[WritableDataSourceSupportsExternalMetadata]
withTable("test") {
sql(
s"""
|CREATE TABLE test USING ${cls.getName}
|AS VALUES (0, 1), (1, 2) t(i, j)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq((0, 1), (1, 2)).toDF("i", "j"))
sql(
s"""
|CREATE OR REPLACE TABLE test USING ${cls.getName}
|AS VALUES (2, 3), (4, 5) t(i, j)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j"))
sql(
s"""
|CREATE TABLE IF NOT EXISTS test USING ${cls.getName}
|AS VALUES (3, 4), (4, 5)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", "j"))
}
}

test("SPARK-46272: create table as select - schema name mismatch") {
val cls = classOf[WritableDataSourceSupportsExternalMetadata]
withTable("test") {
checkError(
exception = intercept[AnalysisException] {
sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1), (1, 2)")
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
"tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
"actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
}
}

test("SPARK-46272: create table as select - column type mismatch") {
val cls = classOf[WritableDataSourceSupportsExternalMetadata]
withTable("test") {
checkError(
exception = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE test USING ${cls.getName}
|AS VALUES ('a', 'b'), ('c', 'd') t(i, j)
|""".stripMargin)
},
errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
parameters = Map(
"tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
"actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
}
}

test("SPARK-46272: create or replace table as select with path options") {
val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/test"
Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
withTable("test") {
sql(
s"""
|CREATE TABLE test USING ${cls.getName}
|OPTIONS (PATH '$path')
|AS VALUES (0, 1)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2)))
// Check the data currently in the path location.
checkAnswer(
spark.read.format("csv").load(path),
Seq(Row("0", "1"), Row("0", "1"), Row("1", "2")))
// Replace the table with new data.
sql(
s"""
|CREATE OR REPLACE TABLE test USING ${cls.getName}
|OPTIONS (PATH '$path')
|AS VALUES (2, 3)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3)))
// Replace the table without the path options.
sql(
s"""
|CREATE OR REPLACE TABLE test USING ${cls.getName}
|AS VALUES (3, 4)
|""".stripMargin)
checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4)))
}
}
}

test("SPARK-46272: create table as select with incompatible data sources") {
// CTAS with data sources that do not support external metadata.
withTable("test") {
val cls = classOf[SimpleDataSourceV2]
checkError(
exception = intercept[SparkUnsupportedOperationException] {
sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1)")
},
errorClass = "CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
parameters = Map(
"tableName" -> "`default`.`test`",
"provider" -> "org.apache.spark.sql.connector.SimpleDataSourceV2"))
}
// CTAS with data sources that do not support batch write.
withTable("test") {
val cls = classOf[SchemaRequiredDataSource]
checkError(
exception = intercept[AnalysisException] {
sql(s"CREATE TABLE test USING ${cls.getName} AS SELECT * FROM VALUES (0, 1)")
},
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`test`",
"operation" -> "append in batch mode"))
}
}

test("SPARK-46273: insert into") {
val cls = classOf[SupportsExternalMetadataDataSource]
val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(
s"""
Expand Down Expand Up @@ -766,7 +899,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}

test("SPARK-46273: insert overwrite") {
val cls = classOf[SupportsExternalMetadataDataSource]
val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(
s"""
Expand All @@ -788,7 +921,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}

test("SPARK-46273: insert into with partition") {
val cls = classOf[SupportsExternalMetadataDataSource]
val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)")
sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
Expand Down Expand Up @@ -818,7 +951,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS
}

test("SPARK-46273: insert overwrite with partition") {
val cls = classOf[SupportsExternalMetadataDataSource]
val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)")
sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
Expand Down Expand Up @@ -1334,9 +1467,18 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
}
}

class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
/**
* A writable data source that supports external metadata with a fixed schema (i int, j int).
*/
class WritableDataSourceSupportsExternalMetadata extends SimpleWritableDataSource {
override def supportsExternalMetadata(): Boolean = true
}

/**
* A writable data source that supports external metadata with
* user-specified schema and partitioning.
*/
class CustomSchemaAndPartitioningDataSource extends WritableDataSourceSupportsExternalMetadata {
class TestTable(
schema: StructType,
partitioning: Array[Transform],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,25 @@ class FakeV2Provider extends TableProvider {
object FakeV2Provider {
val schema: StructType = new StructType().add("i", "int").add("j", "int")
}

class FakeV2ProviderWithCustomSchema extends FakeV2Provider {
class FakeTable(
schema: StructType,
partitioning: Array[Transform],
options: CaseInsensitiveStringMap) extends SimpleBatchTable {
override def schema(): StructType = schema

override def partitioning(): Array[Transform] = partitioning

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder()
}
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
new FakeTable(schema, partitioning, new CaseInsensitiveStringMap(properties))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
}

class CatalogSupportingInMemoryTableProvider
extends FakeV2Provider
extends FakeV2ProviderWithCustomSchema
with SupportsCatalogOptions {

override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private object InMemoryV1Provider {
}

class InMemoryV1Provider
extends FakeV2Provider
extends FakeV2ProviderWithCustomSchema
with DataSourceRegister
with CreatableRelationProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatalog}
import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
Expand Down Expand Up @@ -204,7 +204,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
}

test("write: write to table with default session catalog") {
val v2Source = classOf[FakeV2Provider].getName
val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[InMemoryTableSessionCatalog].getName)

Expand Down