Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3f36b20
classify syntax errors for pgsql, mysql and sqlserver
milastdbx Sep 20, 2024
35100c2
Merge branch 'apache:master' into syntaxErrorsJdbc
ivanjevtic-db Oct 2, 2024
a6a9f93
Add isRuntime arg to classifyException signature
ivanjevtic-db Oct 4, 2024
2b8926d
Merge branch 'syntaxErrorsJdbc' of github.com:ivanjevtic-db/spark int…
ivanjevtic-db Oct 4, 2024
a31f363
Merge branch 'apache:master' into syntaxErrorsJdbc
ivanjevtic-db Oct 4, 2024
e205788
Finish revert
ivanjevtic-db Oct 4, 2024
1d54e5a
Merge branch 'syntaxErrorsJdbc' of github.com:ivanjevtic-db/spark int…
ivanjevtic-db Oct 4, 2024
bd1fb3f
Remove unused import
ivanjevtic-db Oct 4, 2024
c68ab5b
Remove deprecated function from subclass
ivanjevtic-db Oct 4, 2024
a65b3c4
Remove unused import
ivanjevtic-db Oct 4, 2024
17632b1
Fix comments
ivanjevtic-db Oct 4, 2024
977792c
Fix wrong override
ivanjevtic-db Oct 7, 2024
935d8f3
Merge branch 'Change-classify-exception-function-signature' into synt…
ivanjevtic-db Oct 7, 2024
685c212
Classify syntax errors for pgsql, mysql, sqlserver
ivanjevtic-db Oct 7, 2024
21734fd
Resolve conflicts
ivanjevtic-db Oct 7, 2024
0793158
Fix error-conditions messages
ivanjevtic-db Oct 15, 2024
5260e92
Merge remote-tracking branch 'origin/master' into syntax-errors-jdbc
ivanjevtic-db Oct 15, 2024
6ec9aa3
Fix syntax and schema errors, Add tests
ivanjevtic-db Oct 20, 2024
894fab6
Merge branch 'master' into syntax-errors-jdbc
ivanjevtic-db Oct 20, 2024
19cbc07
Remove unused import
ivanjevtic-db Oct 20, 2024
c587b07
Merge branch 'syntax-errors-jdbc' of github.com:ivanjevtic-db/spark i…
ivanjevtic-db Oct 20, 2024
1edcb97
Remove unused import
ivanjevtic-db Oct 20, 2024
0db4dc6
Fix parameters in tests
ivanjevtic-db Oct 21, 2024
387563f
Use checkError with matchPVals=true
ivanjevtic-db Oct 21, 2024
737d1db
Match parameters to query
ivanjevtic-db Oct 22, 2024
8b9ef47
Fix checkError to check for value too
ivanjevtic-db Oct 23, 2024
96c59ac
Remove commented tests
ivanjevtic-db Oct 23, 2024
d3d224f
Fix
ivanjevtic-db Oct 31, 2024
94b64da
Remove failed test
ivanjevtic-db Oct 31, 2024
c848332
Fix error-conditions.json
ivanjevtic-db Oct 31, 2024
1c19246
Sort by name
ivanjevtic-db Oct 31, 2024
170e194
Remove sync changes from error-conditions.json
ivanjevtic-db Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,16 @@
"Drop the namespace <namespace>."
]
},
"EXECUTE_QUERY" : {
"message" : [
"Execution of the query: <query>."
]
},
"GET_SCHEMA" : {
"message" : [
"Schema fetching for an external query or a table: <query>."
]
},
"GET_TABLES" : {
"message" : [
"Get tables from the namespace: <namespace>."
Expand Down Expand Up @@ -1422,6 +1432,11 @@
"Rename the table <oldName> to <newName>."
]
},
"SYNTAX_ERROR" : {
"message" : [
"Compilation of the query for an external DBMS: <query>."
]
},
"TABLE_EXISTS" : {
"message" : [
"Check that the table <tableName> exists."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
assert(df3.collect().length == 3)
}

test("SPARK-49730: get_schema error classification") {
checkErrorMatchPVals(
exception = intercept[AnalysisException] {
val schema = StructType(
Seq(StructField("id", IntegerType, true)))

spark.read
.format("jdbc")
.schema(schema)
.option("url", jdbcUrl)
.option("query", "SELECT * FROM non_existent_table")
.load()
},
condition = "FAILED_JDBC.GET_SCHEMA",
parameters = Map(
"url" -> jdbcUrl,
"query" ->
"SELECT \\* FROM \\(SELECT \\* FROM non_existent_table\\) SPARK_GEN_SUBQ_\\d+ WHERE 1=0")
)
}

test("SPARK-47994: SQLServer does not support 1 or 0 as boolean type in CASE WHEN filter") {
val df = sql(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,42 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
}
}

test("SPARK-49730: get_schema error classification") {
checkErrorMatchPVals(
exception = intercept[AnalysisException] {
val schema = StructType(
Seq(StructField("id", IntegerType, true)))

spark.read
.format("jdbc")
.schema(schema)
.option("url", jdbcUrl)
.option("query", "SELECT * FROM non_existent_table")
.load()
},
condition = "FAILED_JDBC.GET_SCHEMA",
parameters = Map(
"url" -> jdbcUrl,
"query" ->
"SELECT \\* FROM \\(SELECT \\* FROM non_existent_table\\) SPARK_GEN_SUBQ_\\d+ WHERE 1=0")
)
}

test("SPARK-49730: create_table error classification") {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE mysql.new_table (i INT) TBLPROPERTIES('a'='1')")
}

checkErrorMatchPVals(
exception = e,
condition = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:.*",
"tableName" -> s"`new_table`"
)
)
}

override def testDatetime(tbl: String): Unit = {
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,42 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
}
}

test("SPARK-49730: get_schema error classification") {
checkErrorMatchPVals(
exception = intercept[AnalysisException] {
val schema = StructType(
Seq(StructField("id", IntegerType, true)))

spark.read
.format("jdbc")
.schema(schema)
.option("url", jdbcUrl)
.option("query", "SELECT * FROM non_existent_table")
.load()
},
condition = "FAILED_JDBC.GET_SCHEMA",
parameters = Map(
"url" -> jdbcUrl,
"query" ->
"SELECT \\* FROM \\(SELECT \\* FROM non_existent_table\\) SPARK_GEN_SUBQ_\\d+ WHERE 1=0")
)
}

test("SPARK-49730: create_table error classification") {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE postgresql.new_table (i INT) TBLPROPERTIES('a'='1')")
}

checkErrorMatchPVals(
exception = e,
condition = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:.*",
"tableName" -> s"`new_table`"
)
)
}

override def testDatetime(tbl: String): Unit = {
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc.v2

import org.apache.logging.log4j.Level

import org.apache.spark.{SparkThrowable}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort}
Expand Down Expand Up @@ -91,7 +92,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
def testCreateTableWithProperty(tbl: String): Unit = {}

private def checkErrorFailedJDBC(
e: AnalysisException,
e: Throwable with SparkThrowable,
condition: String,
tbl: String): Unit = {
checkErrorMatchPVals(
Expand Down Expand Up @@ -249,16 +250,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}

test("CREATE TABLE with table property") {
withTable(s"$catalogName.new_table") {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}
checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}

def supportsIndex: Boolean = false

def supportListIndexes: Boolean = false
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ abstract class SparkFunSuite
exp => {
val parm = parameters.getOrElse(exp._1,
throw new IllegalArgumentException("Missing parameter" + exp._1))
if (!exp._2.matches(parm)) {
if (!exp._2.matches(parm) && exp._2 != parm) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I am using checkErrorMatchPVals function, either I should:

  • have this change so that we check regex or check if the exact value is the same or:
  • from a variable which is the exact match to expected, create something like this: "parameter" -> ("^" + Pattern.quote(val) + "$").

I am using checkErrorMatchPVals in the first place, since I have some parameters that can only be checked with regex, but for some parameters I have the exact value.

throw new IllegalArgumentException("For parameter '" + exp._1 + "' value '" + exp._2 +
"' does not match: " + parm)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,26 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"operation" -> operation))
}

def jdbcGeneratedQuerySyntaxError(url: String, query: String): Throwable = {
new AnalysisException(
errorClass = "FAILED_JDBC.SYNTAX_ERROR",
messageParameters = Map(
"query" -> query,
"url" -> url
)
)
}

def jdbcGeneratedQueryGetSchemaError(url: String, query: String): Throwable = {
new AnalysisException(
errorClass = "FAILED_JDBC.GET_SCHEMA",
messageParameters = Map(
"query" -> query,
"url" -> url
)
)
}

def schemaNotSpecifiedForSchemaRelationProviderError(className: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1132",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"jdbcQueryString" -> jdbcQueryString))
}

def jdbcGeneratedQueryExecutionError(url: String, query: String): Throwable = {
new SparkRuntimeException(
errorClass = "FAILED_JDBC.EXECUTE_QUERY",
messageParameters = Map(
"query" -> query,
"url" -> url
)
)
}

def missingJdbcTableNameAndQueryError(
jdbcTableName: String, jdbcQueryString: String): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,18 @@ private[sql] object JDBCRelation extends Logging {
* @return resolved Catalyst schema of a JDBC table
*/
def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = {
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
val dialect = JdbcDialects.get(jdbcOptions.url)
val tableSchema = JdbcUtils.classifyException(
errorClass = "...",
messageParameters = Map(
"query" -> dialect.getSchemaQuery(jdbcOptions.tableOrQuery),
"url" -> jdbcOptions.url),
dialect = dialect,
description =
s"Failed to fetch schema for: ${dialect.getSchemaQuery(jdbcOptions.tableOrQuery)}",
isRuntime = false) {
JDBCRDD.resolveTable(jdbcOptions)
}
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.getCustomSchema(
tableSchema, customSchema, resolver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
override def removeSchemaCommentQuery(schema: String): String = {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}

override def classifyException(
e: Throwable,
errorClass: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}

Expand Down Expand Up @@ -240,6 +241,11 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
// SYNTAX_ERROR_1, SYNTAX_ERROR_2
case 42000 | 42001 =>
throw QueryCompilationErrors.jdbcGeneratedQuerySyntaxError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case _ => // do nothing
}
case _ => // do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,17 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case 15335 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case 15335 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case 102 | 122 | 142 | 148 | 156 | 319 | 336 =>
throw QueryCompilationErrors.jdbcGeneratedQuerySyntaxError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case 208 if errorClass == "..." => // Error code for "Invalid object name"
throw QueryCompilationErrors.jdbcGeneratedQueryGetSchemaError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case 1064 if errorClass == "..." =>
throw QueryCompilationErrors.jdbcGeneratedQuerySyntaxError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case 1146 if errorClass == "..." =>
throw QueryCompilationErrors.jdbcGeneratedQueryGetSchemaError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ private case class PostgresDialect()
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.postgresql.org/docs/14/errcodes-appendix.html
case "42601" if errorClass == "..." =>
throw QueryCompilationErrors.jdbcGeneratedQuerySyntaxError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case "42P01" if errorClass == "..." =>
throw QueryCompilationErrors.jdbcGeneratedQueryGetSchemaError(
messageParameters.get("url").getOrElse(""),
messageParameters.get("query").getOrElse(""))
case "42P07" =>
if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
throw new IndexAlreadyExistsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.Random
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._

import org.apache.spark.{SparkException, SparkSQLException}
import org.apache.spark.{SparkException, SparkRuntimeException, SparkSQLException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Observation, QueryTest, Row}
import org.apache.spark.sql.catalyst.{analysis, TableIdentifier}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
Expand Down Expand Up @@ -1515,6 +1515,27 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
}

test("SPARK-49730: syntax error classification") {
checkErrorMatchPVals(
exception = intercept[SparkRuntimeException] {
val schema = StructType(
Seq(StructField("id", IntegerType, true, defaultMetadata(IntegerType))))

spark.read
.format("jdbc")
.schema(schema)
.option("url", urlWithUserAndPass)
.option("query", "SELECT * FRM tbl")
.load()
},
condition = "FAILED_JDBC.SYNTAX_ERROR",
parameters = Map(
"url" -> urlWithUserAndPass,
"query" ->
"SELECT \\* FROM \\(SELECT \\* FRM tbl\\) SPARK_GEN_SUBQ_\\d+ WHERE 1=0")
)
}

test("unsupported types") {
checkError(
exception = intercept[SparkSQLException] {
Expand Down