diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 430ca9edab799..908e03726d887 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -58,4 +58,24 @@ private object DB2Dialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"RENAME TABLE $oldTable TO $newTable" } + + // scalastyle:off line.size.limit + // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html + // scalastyle:on line.size.limit + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType" + + // scalastyle:off line.size.limit + // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html + // scalastyle:on line.size.limit + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 9ca8879be31e0..c25a024c4278f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -50,4 +50,27 @@ private object DerbyDialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"RENAME TABLE $oldTable TO $newTable" } + + // See https://db.apache.org/derby/docs/10.5/ref/rrefsqljrenamecolumnstatement.html + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String): String = + s"RENAME COLUMN $tableName.$columnName TO $newName" + + // See https://db.apache.org/derby/docs/10.5/ref/rrefsqlj81859.html#rrefsqlj81859__rrefsqlj37860 + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType" + + // See https://db.apache.org/derby/docs/10.5/ref/rrefsqlj81859.html#rrefsqlj81859__rrefsqlj37860 + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "NULL" else "NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index cea5a20917532..644f59d370b09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -199,7 +199,6 @@ abstract class JdbcDialect extends Serializable { /** * Alter an existing table. - * TODO (SPARK-32523): Override this method in the dialects that have different syntax. * * @param tableName The name of the table to be altered. * @param changes Changes to apply to the table. @@ -212,28 +211,50 @@ abstract class JdbcDialect extends Serializable { case add: AddColumn if add.fieldNames.length == 1 => val dataType = JdbcUtils.getJdbcType(add.dataType(), this).databaseTypeDefinition val name = add.fieldNames - updateClause += s"ALTER TABLE $tableName ADD COLUMN ${name(0)} $dataType" + updateClause += getAddColumnQuery(tableName, name(0), dataType) case rename: RenameColumn if rename.fieldNames.length == 1 => val name = rename.fieldNames - updateClause += s"ALTER TABLE $tableName RENAME COLUMN ${name(0)} TO ${rename.newName}" + updateClause += getRenameColumnQuery(tableName, name(0), rename.newName) case delete: DeleteColumn if delete.fieldNames.length == 1 => val name = delete.fieldNames - updateClause += s"ALTER TABLE $tableName DROP COLUMN ${name(0)}" + updateClause += getDeleteColumnQuery(tableName, name(0)) case updateColumnType: UpdateColumnType if updateColumnType.fieldNames.length == 1 => val name = updateColumnType.fieldNames val dataType = JdbcUtils.getJdbcType(updateColumnType.newDataType(), this) .databaseTypeDefinition - updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} $dataType" + updateClause += getUpdateColumnTypeQuery(tableName, name(0), dataType) case updateNull: UpdateColumnNullability if updateNull.fieldNames.length == 1 => val name = updateNull.fieldNames - val nullable = if (updateNull.nullable()) "NULL" else "NOT NULL" - updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} SET $nullable" + updateClause += getUpdateColumnNullabilityQuery(tableName, name(0), updateNull.nullable()) case _ => throw new SQLFeatureNotSupportedException(s"Unsupported TableChange $change") } } updateClause.result() } + + def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = + s"ALTER TABLE $tableName ADD COLUMN $columnName $dataType" + + def getRenameColumnQuery(tableName: String, columnName: String, newName: String): String = + s"ALTER TABLE $tableName RENAME COLUMN $columnName TO $newName" + + def getDeleteColumnQuery(tableName: String, columnName: String): String = + s"ALTER TABLE $tableName DROP COLUMN $columnName" + + def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName ALTER COLUMN $columnName $newDataType" + + def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "NULL" else "NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName SET $nullable" + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 1c6e8c359aa15..631eed7d8774b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -64,4 +64,24 @@ private object MsSqlServerDialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"EXEC sp_rename $oldTable, $newTable" } + + // scalastyle:off line.size.limit + // See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-rename-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String): String = + s"EXEC sp_rename '$tableName.$columnName', '$newName', 'COLUMN'" + + // scalastyle:off line.size.limit + // see https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "NULL" else "NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 24b31b14d9427..0ec854a874eff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{SQLFeatureNotSupportedException, Types} import java.util.Locale import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder} @@ -48,4 +48,25 @@ private case object MySQLDialect extends JdbcDialect { } override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + + // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName MODIFY COLUMN $columnName $newDataType" + + // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html + // require to have column data type to change the column nullability + // ALTER TABLE tbl_name MODIFY [COLUMN] col_name column_definition + // column_definition: + // data_type [NOT NULL | NULL] + // e.g. ALTER TABLE t1 MODIFY b INT NOT NULL; + // We don't have column data type here, so throw Exception for now + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + throw new SQLFeatureNotSupportedException(s"UpdateColumnNullability is not supported") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 4c0623729e00d..a5d40537b0dd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -112,4 +112,23 @@ private case object OracleDialect extends JdbcDialect { case _ => s"TRUNCATE TABLE $table" } } + + // see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001 + override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = + s"ALTER TABLE $tableName ADD $columnName $dataType" + + // see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001 + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName MODIFY $columnName $newDataType" + + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "NULL" else "NOT NULL" + s"ALTER TABLE $tableName MODIFY $columnName $nullable" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index a1ce25a0464c3..507a760e94e02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -126,4 +126,20 @@ private object PostgresDialect extends JdbcDialect { } } + // See https://www.postgresql.org/docs/12/sql-altertable.html + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = { + s"ALTER TABLE $tableName ALTER COLUMN $columnName TYPE $newDataType" + } + + // See https://www.postgresql.org/docs/12/sql-altertable.html + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 58fe62cb6e088..7010ecf81c548 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLFeatureNotSupportedException import java.util.Locale import org.apache.spark.sql.types._ @@ -55,4 +56,31 @@ private case object TeradataDialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"RENAME TABLE $oldTable TO $newTable" } + + // See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA + override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = + s"ALTER TABLE $tableName ADD $columnName $dataType" + + // See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String): String = + s"ALTER TABLE $tableName RENAME $columnName TO $newName" + + // See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA + override def getDeleteColumnQuery(tableName: String, columnName: String): String = + s"ALTER TABLE $tableName DROP $columnName" + + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + throw new SQLFeatureNotSupportedException(s"UpdateColumnType is not supported") + + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = + throw new SQLFeatureNotSupportedException(s"UpdateColumnNullability is not supported") }