Skip to content
Closed
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import java.sql.{Connection, DriverManager}
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -63,6 +64,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

test("show tables") {
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people")))
// Check not existing namespace
checkAnswer(sql("SHOW TABLES IN h2.bad_test"), Seq())
}

test("drop a table and test whether the table exists") {
Expand All @@ -72,6 +75,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "to_drop"), Row("test", "people")))
sql("DROP TABLE h2.test.to_drop")
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people")))
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[NoSuchTableException] {
sql(s"DROP TABLE $table")
}
}
}

test("rename a table") {
Expand All @@ -87,6 +95,26 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "dst_table"), Row("test", "people")))
}
// Rename not existing table or namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[org.h2.jdbc.JdbcSQLException] {
sql(s"ALTER TABLE $table RENAME TO test.dst_table")
}
}
// Rename to an existing table
withTable("h2.test.dst_table") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."dst_table" (id INTEGER)""").executeUpdate()
}
withTable("h2.test.src_table") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate()
}
intercept[org.h2.jdbc.JdbcSQLException] {
sql("ALTER TABLE h2.test.src_table RENAME TO h2.test.dst_table")
}
}
}
}

test("load a table") {
Expand All @@ -95,6 +123,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
.add("NAME", StringType)
.add("ID", IntegerType)
assert(t.schema === expectedSchema)
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
spark.table(s"h2.$table").schema
}
}
}

test("create a table") {
Expand All @@ -105,6 +138,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "people"), Row("test", "new_table")))
}
withTable("h2.test.new_table") {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
}
}
intercept[org.h2.jdbc.JdbcSQLException] {
sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _")
}
}

test("alter table ... add column") {
Expand All @@ -121,16 +163,38 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
t = spark.table("h2.test.alt_table")
expectedSchema = expectedSchema.add("C3", DoubleType)
assert(t.schema === expectedSchema)
// Add already existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ADD COLUMNS (C3 DOUBLE)")
}
}
// Add a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table ADD COLUMNS (C4 STRING)")
}
}
}

test("alter table ... rename column") {
withTable("h2.test.alt_table") {
sql("CREATE TABLE h2.test.alt_table (ID INTEGER) USING _")
sql("CREATE TABLE h2.test.alt_table (ID INTEGER, C0 INTEGER) USING _")
sql("ALTER TABLE h2.test.alt_table RENAME COLUMN ID TO C")
val t = spark.table("h2.test.alt_table")
val expectedSchema = new StructType().add("C", IntegerType)
val expectedSchema = new StructType()
.add("C", IntegerType)
.add("C0", IntegerType)
assert(t.schema === expectedSchema)
// Rename to already existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table RENAME COLUMN C TO C0")
}
}
// Rename a column in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table RENAME COLUMN ID TO C")
}
}
}

Expand All @@ -141,6 +205,16 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val t = spark.table("h2.test.alt_table")
val expectedSchema = new StructType().add("C2", IntegerType)
assert(t.schema === expectedSchema)
// Drop not existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table DROP COLUMN bad_column")
}
}
// Drop a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table DROP COLUMN C1")
}
}
}

Expand All @@ -151,6 +225,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val t = spark.table("h2.test.alt_table")
val expectedSchema = new StructType().add("ID", DoubleType)
assert(t.schema === expectedSchema)
// Update not existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
}
// Update column to wrong type
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN id TYPE bad_type")
}
}
// Update column type in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN id TYPE DOUBLE")
}
}
}

Expand All @@ -161,6 +249,16 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val t = spark.table("h2.test.alt_table")
val expectedSchema = new StructType().add("ID", IntegerType, nullable = true)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column DROP NOT NULL")
}
}
// Update column nullability in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID DROP NOT NULL")
}
}
}

Expand All @@ -171,6 +269,16 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN ID COMMENT 'test'")
}
assert(thrown.getMessage.contains("Unsupported TableChange"))
// Update comment for not existing column
intercept[AnalysisException] {
sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column COMMENT 'test'")
}
}
// Update column comments in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID COMMENT 'test'")
}
}
}
}