Skip to content

Commit 8e2da8d

Browse files
committed
Support ALTER TABLE in JDBC v2 Table Catalog: update type and nullability of columns (DB2 dialect)
1 parent 3099fd9 commit 8e2da8d

File tree

2 files changed

+133
-0
lines changed

2 files changed

+133
-0
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.jdbc.v2
19+
20+
import java.sql.Connection
21+
22+
import org.scalatest.time.SpanSugar._
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.catalyst.parser.ParseException
27+
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
28+
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
29+
import org.apache.spark.sql.test.SharedSparkSession
30+
import org.apache.spark.sql.types._
31+
import org.apache.spark.tags.DockerTest
32+
33+
/**
34+
* To run this test suite for a specific version (e.g., ibmcom/db2:11.5.4.0):
35+
* {{{
36+
* DB2_DOCKER_IMAGE_NAME=ibmcom/db2:11.5.4.0
37+
* ./build/sbt -Pdocker-integration-tests "test-only *DB2IntegrationSuite"
38+
* }}}
39+
*/
40+
@DockerTest
41+
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
42+
override val db = new DatabaseOnDocker {
43+
override val imageName = sys.env.getOrElse("DB2_DOCKER_IMAGE_NAME", "ibmcom/db2:11.5.4.0")
44+
override val env = Map(
45+
"DB2INST1_PASSWORD" -> "rootpass",
46+
"LICENSE" -> "accept",
47+
"DBNAME" -> "foo",
48+
"ARCHIVE_LOGS" -> "false",
49+
"AUTOCONFIG" -> "false"
50+
)
51+
override val usesIpc = false
52+
override val jdbcPort: Int = 50000
53+
override val privileged = true
54+
override def getJdbcUrl(ip: String, port: Int): String =
55+
s"jdbc:db2://$ip:$port/foo:user=db2inst1;password=rootpass;retrieveMessagesFromServerOnGetMessage=true;" //scalastyle:ignore
56+
}
57+
58+
override def sparkConf: SparkConf = super.sparkConf
59+
.set("spark.sql.catalog.db2", classOf[JDBCTableCatalog].getName)
60+
.set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort))
61+
62+
override def dataPreparation(conn: Connection): Unit = {}
63+
64+
test("SPARK-33034: ALTER TABLE ... update column type") {
65+
withTable("db2.alt_table") {
66+
sql("CREATE TABLE db2.alt_table (ID INTEGER) USING _")
67+
sql("ALTER TABLE db2.alt_table ALTER COLUMN id TYPE DOUBLE")
68+
val t = spark.table("db2.alt_table")
69+
val expectedSchema = new StructType().add("ID", DoubleType)
70+
assert(t.schema === expectedSchema)
71+
// Update column type from DOUBLE to STRING
72+
val msg1 = intercept[AnalysisException] {
73+
sql("ALTER TABLE db2.alt_table ALTER COLUMN id TYPE VARCHAR(10)")
74+
}.getMessage
75+
assert(msg1.contains("Cannot update alt_table field ID: double cannot be cast to varchar"))
76+
// Update not existing column
77+
val msg2 = intercept[AnalysisException] {
78+
sql("ALTER TABLE db2.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
79+
}.getMessage
80+
assert(msg2.contains("Cannot update missing field bad_column"))
81+
// Update column to wrong type
82+
val msg3 = intercept[ParseException] {
83+
sql("ALTER TABLE db2.alt_table ALTER COLUMN id TYPE bad_type")
84+
}.getMessage
85+
assert(msg3.contains("DataType bad_type is not supported"))
86+
}
87+
// Update column type in not existing table
88+
val msg = intercept[AnalysisException] {
89+
sql(s"ALTER TABLE db2.not_existing_table ALTER COLUMN id TYPE DOUBLE")
90+
}.getMessage
91+
assert(msg.contains("Table not found"))
92+
}
93+
94+
test("SPARK-33034: ALTER TABLE ... update column nullability") {
95+
withTable("db2.alt_table") {
96+
sql("CREATE TABLE db2.alt_table (ID STRING NOT NULL) USING _")
97+
sql("ALTER TABLE db2.alt_table ALTER COLUMN ID DROP NOT NULL")
98+
val t = spark.table("db2.alt_table")
99+
val expectedSchema = new StructType().add("ID", StringType, nullable = true)
100+
assert(t.schema === expectedSchema)
101+
// Update nullability of not existing column
102+
val msg = intercept[AnalysisException] {
103+
sql("ALTER TABLE db2.alt_table ALTER COLUMN bad_column DROP NOT NULL")
104+
}.getMessage
105+
assert(msg.contains("Cannot update missing field bad_column"))
106+
}
107+
// Update column nullability in not existing table
108+
val msg = intercept[AnalysisException] {
109+
sql(s"ALTER TABLE db2.not_existing_table ALTER COLUMN ID DROP NOT NULL")
110+
}.getMessage
111+
assert(msg.contains("Table not found"))
112+
}
113+
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,24 @@ private object DB2Dialect extends JdbcDialect {
5858
override def renameTable(oldTable: String, newTable: String): String = {
5959
s"RENAME TABLE $oldTable TO $newTable"
6060
}
61+
62+
// scalastyle:off line.size.limit
63+
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html
64+
// scalastyle:on line.size.limit
65+
override def getUpdateColumnTypeQuery(
66+
tableName: String,
67+
columnName: String,
68+
newDataType: String): String =
69+
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType"
70+
71+
// scalastyle:off line.size.limit
72+
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html
73+
// scalastyle:on line.size.limit
74+
override def getUpdateColumnNullabilityQuery(
75+
tableName: String,
76+
columnName: String,
77+
isNullable: Boolean): String = {
78+
val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL"
79+
s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable"
80+
}
6181
}

0 commit comments

Comments
 (0)