Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -75,10 +76,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "to_drop"), Row("test", "people")))
sql("DROP TABLE h2.test.to_drop")
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people")))
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[NoSuchTableException] {
Seq(
"h2.test.not_existing_table" -> "Table test.not_existing_table not found",
"h2.bad_test.not_existing_table" -> "Table bad_test.not_existing_table not found"
).foreach { case (table, expectedMsg) =>
val msg = intercept[NoSuchTableException] {
sql(s"DROP TABLE $table")
}
}.getMessage
assert(msg.contains(expectedMsg))
}
}

Expand All @@ -96,10 +101,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
Seq(Row("test", "dst_table"), Row("test", "people")))
}
// Rename not existing table or namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[org.h2.jdbc.JdbcSQLException] {
Seq(
"h2.test.not_existing_table" -> "Table \"not_existing_table\" not found",
"h2.bad_test.not_existing_table" -> "Schema \"bad_test\" not found"
).foreach { case (table, expectedMsg) =>
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
sql(s"ALTER TABLE $table RENAME TO test.dst_table")
}
}.getMessage
assert(msg.contains(expectedMsg))
}
// Rename to an existing table
withTable("h2.test.dst_table") {
Expand All @@ -110,9 +119,10 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate()
}
intercept[org.h2.jdbc.JdbcSQLException] {
sql("ALTER TABLE h2.test.src_table RENAME TO h2.test.dst_table")
}
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table")
}.getMessage
assert(msg.contains("Table \"dst_table\" already exists"))
}
}
}
Expand All @@ -124,9 +134,10 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
.add("ID", IntegerType)
assert(t.schema === expectedSchema)
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
spark.table(s"h2.$table").schema
}
val msg = intercept[AnalysisException] {
spark.table(table).schema
}.getMessage
assert(msg.contains("Table or view not found"))
}
}

Expand All @@ -140,13 +151,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
}
withTable("h2.test.new_table") {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
}
}.getMessage
assert(msg.contains("Table test.new_table already exists"))
}
intercept[org.h2.jdbc.JdbcSQLException] {
val msg = intercept[org.h2.jdbc.JdbcSQLException] {
sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _")
}
}.getMessage
assert(msg.contains("Schema \"bad_test\" not found"))
}

test("alter table ... add column") {
Expand All @@ -164,15 +177,17 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
expectedSchema = expectedSchema.add("C3", DoubleType)
assert(t.schema === expectedSchema)
// Add already existing column
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ADD COLUMNS (C3 DOUBLE)")
}
}.getMessage
assert(msg.contains("Cannot add column, because C3 already exists"))
}
// Add a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ADD COLUMNS (C4 STRING)")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}

Expand All @@ -186,15 +201,17 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
.add("C0", IntegerType)
assert(t.schema === expectedSchema)
// Rename to already existing column
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table RENAME COLUMN C TO C0")
}
}.getMessage
assert(msg.contains("Cannot rename column, because C0 already exists"))
}
// Rename a column in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table RENAME COLUMN ID TO C")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}

Expand All @@ -206,15 +223,17 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val expectedSchema = new StructType().add("C2", IntegerType)
assert(t.schema === expectedSchema)
// Drop not existing column
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table DROP COLUMN bad_column")
}
}.getMessage
assert(msg.contains("Cannot delete missing field bad_column in test.alt_table schema"))
}
// Drop a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table DROP COLUMN C1")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}

Expand All @@ -226,19 +245,22 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val expectedSchema = new StructType().add("ID", DoubleType)
assert(t.schema === expectedSchema)
// Update not existing column
intercept[AnalysisException] {
val msg1 = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
}
}.getMessage
assert(msg1.contains("Cannot update missing field bad_column in test.alt_table schema"))
// Update column to wrong type
intercept[AnalysisException] {
val msg2 = intercept[ParseException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN id TYPE bad_type")
}
}.getMessage
assert(msg2.contains("DataType bad_type is not supported"))
}
// Update column type in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN id TYPE DOUBLE")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}

Expand All @@ -250,35 +272,39 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val expectedSchema = new StructType().add("ID", IntegerType, nullable = true)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column DROP NOT NULL")
}
}.getMessage
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
}
// Update column nullability in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID DROP NOT NULL")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}

test("alter table ... update column comment not supported") {
withTable("h2.test.alt_table") {
sql("CREATE TABLE h2.test.alt_table (ID INTEGER) USING _")
val thrown = intercept[java.sql.SQLFeatureNotSupportedException] {
val msg1 = intercept[java.sql.SQLFeatureNotSupportedException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN ID COMMENT 'test'")
}
assert(thrown.getMessage.contains("Unsupported TableChange"))
}.getMessage
assert(msg1.contains("Unsupported TableChange"))
// Update comment for not existing column
intercept[AnalysisException] {
val msg2 = intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column COMMENT 'test'")
}
}.getMessage
assert(msg2.contains("Cannot update missing field bad_column in test.alt_table"))
}
// Update column comments in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID COMMENT 'test'")
}
}.getMessage
assert(msg.contains("Table not found"))
}
}
}