Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,24 @@ private object DB2Dialect extends JdbcDialect {
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
}

// scalastyle:off line.size.limit
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html
// scalastyle:on line.size.limit
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType"

// scalastyle:off line.size.limit
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html
// scalastyle:on line.size.limit
override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,27 @@ private object DerbyDialect extends JdbcDialect {
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
}

// See https://db.apache.org/derby/docs/10.5/ref/rrefsqljrenamecolumnstatement.html
override def getRenameColumnQuery(
tableName: String,
columnName: String,
newName: String): String =
s"RENAME COLUMN $tableName.$columnName TO $newName"

// See https://db.apache.org/derby/docs/10.5/ref/rrefsqlj81859.html#rrefsqlj81859__rrefsqlj37860
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType"

// See https://db.apache.org/derby/docs/10.5/ref/rrefsqlj81859.html#rrefsqlj81859__rrefsqlj37860
override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ abstract class JdbcDialect extends Serializable {

/**
* Alter an existing table.
* TODO (SPARK-32523): Override this method in the dialects that have different syntax.
*
* @param tableName The name of the table to be altered.
* @param changes Changes to apply to the table.
Expand All @@ -212,28 +211,50 @@ abstract class JdbcDialect extends Serializable {
case add: AddColumn if add.fieldNames.length == 1 =>
val dataType = JdbcUtils.getJdbcType(add.dataType(), this).databaseTypeDefinition
val name = add.fieldNames
updateClause += s"ALTER TABLE $tableName ADD COLUMN ${name(0)} $dataType"
updateClause += getAddColumnQuery(tableName, name(0), dataType)
case rename: RenameColumn if rename.fieldNames.length == 1 =>
val name = rename.fieldNames
updateClause += s"ALTER TABLE $tableName RENAME COLUMN ${name(0)} TO ${rename.newName}"
updateClause += getRenameColumnQuery(tableName, name(0), rename.newName)
case delete: DeleteColumn if delete.fieldNames.length == 1 =>
val name = delete.fieldNames
updateClause += s"ALTER TABLE $tableName DROP COLUMN ${name(0)}"
updateClause += getDeleteColumnQuery(tableName, name(0))
case updateColumnType: UpdateColumnType if updateColumnType.fieldNames.length == 1 =>
val name = updateColumnType.fieldNames
val dataType = JdbcUtils.getJdbcType(updateColumnType.newDataType(), this)
.databaseTypeDefinition
updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} $dataType"
updateClause += getUpdateColumnTypeQuery(tableName, name(0), dataType)
case updateNull: UpdateColumnNullability if updateNull.fieldNames.length == 1 =>
val name = updateNull.fieldNames
val nullable = if (updateNull.nullable()) "NULL" else "NOT NULL"
updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} SET $nullable"
updateClause += getUpdateColumnNullabilityQuery(tableName, name(0), updateNull.nullable())
case _ =>
throw new SQLFeatureNotSupportedException(s"Unsupported TableChange $change")
}
}
updateClause.result()
}

def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String =
s"ALTER TABLE $tableName ADD COLUMN $columnName $dataType"

def getRenameColumnQuery(tableName: String, columnName: String, newName: String): String =
s"ALTER TABLE $tableName RENAME COLUMN $columnName TO $newName"

def getDeleteColumnQuery(tableName: String, columnName: String): String =
s"ALTER TABLE $tableName DROP COLUMN $columnName"

def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName ALTER COLUMN $columnName $newDataType"

def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET $nullable"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,24 @@ private object MsSqlServerDialect extends JdbcDialect {
override def renameTable(oldTable: String, newTable: String): String = {
s"EXEC sp_rename $oldTable, $newTable"
}

// scalastyle:off line.size.limit
// See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-rename-transact-sql?view=sql-server-ver15
// scalastyle:on line.size.limit
override def getRenameColumnQuery(
tableName: String,
columnName: String,
newName: String): String =
s"EXEC sp_rename '$tableName.$columnName', '$newName', 'COLUMN'"

// scalastyle:off line.size.limit
// see https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-transact-sql?view=sql-server-ver15
// scalastyle:on line.size.limit
override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.sql.{SQLFeatureNotSupportedException, Types}
import java.util.Locale

import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder}
Expand Down Expand Up @@ -48,4 +48,25 @@ private case object MySQLDialect extends JdbcDialect {
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName MODIFY COLUMN $columnName $newDataType"

// See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
// require to have column data type to change the column nullability
// ALTER TABLE tbl_name MODIFY [COLUMN] col_name column_definition
// column_definition:
// data_type [NOT NULL | NULL]
// e.g. ALTER TABLE t1 MODIFY b INT NOT NULL;
// We don't have column data type here, so throw Exception for now
override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
throw new SQLFeatureNotSupportedException(s"UpdateColumnNullability is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,23 @@ private case object OracleDialect extends JdbcDialect {
case _ => s"TRUNCATE TABLE $table"
}
}

// see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001
override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String =
s"ALTER TABLE $tableName ADD $columnName $dataType"

// see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName MODIFY $columnName $newDataType"

override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName MODIFY $columnName $nullable"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,20 @@ private object PostgresDialect extends JdbcDialect {
}
}

// See https://www.postgresql.org/docs/12/sql-altertable.html
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String = {
s"ALTER TABLE $tableName ALTER COLUMN $columnName TYPE $newDataType"
}

// See https://www.postgresql.org/docs/12/sql-altertable.html
override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.SQLFeatureNotSupportedException
import java.util.Locale

import org.apache.spark.sql.types._
Expand Down Expand Up @@ -55,4 +56,31 @@ private case object TeradataDialect extends JdbcDialect {
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
}

// See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA
override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String =
s"ALTER TABLE $tableName ADD $columnName $dataType"

// See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA
override def getRenameColumnQuery(
tableName: String,
columnName: String,
newName: String): String =
s"ALTER TABLE $tableName RENAME $columnName TO $newName"

// See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/Fl27dlrgTKo4W~zk~cDJMA
override def getDeleteColumnQuery(tableName: String, columnName: String): String =
s"ALTER TABLE $tableName DROP $columnName"

override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
throw new SQLFeatureNotSupportedException(s"UpdateColumnType is not supported")

override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String =
throw new SQLFeatureNotSupportedException(s"UpdateColumnNullability is not supported")
}