diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index fa4c032fcb012..5d8838906bfc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase} -import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -1033,14 +1033,14 @@ object JdbcUtils extends Logging with SQLConfHelper { def createIndex( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String], options: JDBCOptions): Unit = { val dialect = JdbcDialects.get(options.url) executeStatement(conn, options, - dialect.createIndex(indexName, tableName, columns, columnsProperties, properties)) + dialect.createIndex(indexName, tableIdent, columns, columnsProperties, properties)) } /** @@ -1049,10 +1049,10 @@ object JdbcUtils extends Logging with SQLConfHelper { def indexExists( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Boolean = { val dialect = JdbcDialects.get(options.url) - dialect.indexExists(conn, indexName, tableName, options) + dialect.indexExists(conn, indexName, tableIdent, options) } /** @@ -1061,10 +1061,10 @@ object JdbcUtils extends Logging with SQLConfHelper { def dropIndex( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Unit = { val dialect = JdbcDialects.get(options.url) - executeStatement(conn, options, dialect.dropIndex(indexName, tableName)) + executeStatement(conn, options, dialect.dropIndex(indexName, tableIdent)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 31c0167ab492a..be8e1c68b7cf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -61,14 +61,14 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException(s"Failed to create index $indexName in $name", JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( - conn, indexName, name, columns, columnsProperties, properties, jdbcOptions) + conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } } } override def indexExists(indexName: String): Boolean = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.indexExists(conn, indexName, name, jdbcOptions) + JdbcUtils.indexExists(conn, indexName, ident, jdbcOptions) } } @@ -76,7 +76,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.withConnection(jdbcOptions) { conn => JdbcUtils.classifyException(s"Failed to drop index $indexName in $name", JdbcDialects.get(jdbcOptions.url)) { - JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions) + JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index e58473bb2b31a..d41929225a8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.jdbc -import java.sql.{SQLException, Types} +import java.sql.{Connection, SQLException, Types} +import java.util import java.util.Locale import java.util.concurrent.ConcurrentHashMap @@ -25,10 +26,12 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Expression -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} private[sql] object H2Dialect extends JdbcDialect { @@ -74,6 +77,47 @@ private[sql] object H2Dialect extends JdbcDialect { functionMap.clear() } + // CREATE INDEX syntax + // https://www.h2database.com/html/commands.html#create_index + override def createIndex( + indexName: String, + tableIdent: Identifier, + columns: Array[NamedReference], + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): String = { + val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) + val (indexType, _) = JdbcUtils.processIndexProperties(properties, "h2") + + s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON " + + s"${tableNameWithSchema(tableIdent)} (${columnList.mkString(", ")})" + } + + // DROP INDEX syntax + // https://www.h2database.com/html/commands.html#drop_index + override def dropIndex(indexName: String, tableIdent: Identifier): String = { + s"DROP INDEX ${indexNameWithSchema(tableIdent, indexName)}" + } + + // See https://www.h2database.com/html/systemtables.html?#information_schema_indexes + override def indexExists( + conn: Connection, + indexName: String, + tableIdent: Identifier, + options: JDBCOptions): Boolean = { + val sql = s"SELECT * FROM INFORMATION_SCHEMA.INDEXES WHERE " + + s"TABLE_SCHEMA = '${tableIdent.namespace().last}' AND " + + s"TABLE_NAME = '${tableIdent.name()}' AND INDEX_NAME = '$indexName'" + JdbcUtils.checkIfIndexExists(conn, sql, options) + } + + private def tableNameWithSchema(ident: Identifier): String = { + (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".") + } + + private def indexNameWithSchema(ident: Identifier, indexName: String): String = { + (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") + } + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case exception: SQLException => @@ -88,6 +132,12 @@ private[sql] object H2Dialect extends JdbcDialect { // SCHEMA_NOT_FOUND_1 case 90079 => throw NoSuchNamespaceException(message, cause = Some(e)) + // INDEX_ALREADY_EXISTS_1 + case 42111 => + throw new IndexAlreadyExistsException(message, cause = Some(e)) + // INDEX_NOT_FOUND_1 + case 42112 => + throw new NoSuchIndexException(message, cause = Some(e)) case _ => // do nothing } case _ => // do nothing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index ba3a3f50ecad6..d77299bdc0c0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} -import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -473,7 +473,7 @@ abstract class JdbcDialect extends Serializable with Logging { * Build a create index SQL statement. * * @param indexName the name of the index to be created - * @param tableName the table on which index to be created + * @param tableIdent the table on which index to be created * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created @@ -481,7 +481,7 @@ abstract class JdbcDialect extends Serializable with Logging { */ def createIndex( indexName: String, - tableName: String, + tableIdent: Identifier, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { @@ -492,7 +492,7 @@ abstract class JdbcDialect extends Serializable with Logging { * Checks whether an index exists * * @param indexName the name of the index - * @param tableName the table name on which index to be checked + * @param tableIdent the table on which index to be checked * @param options JDBCOptions of the table * @return true if the index with `indexName` exists in the table with `tableName`, * false otherwise @@ -500,7 +500,7 @@ abstract class JdbcDialect extends Serializable with Logging { def indexExists( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Boolean = { throw new UnsupportedOperationException("indexExists is not supported") } @@ -509,10 +509,10 @@ abstract class JdbcDialect extends Serializable with Logging { * Build a drop index SQL statement. * * @param indexName the name of the index to be dropped. - * @param tableName the table name on which index to be dropped. + * @param tableIdent the table on which index to be dropped. * @return the SQL statement to use for dropping the index. */ - def dropIndex(indexName: String, tableName: String): String = { + def dropIndex(indexName: String, tableIdent: Identifier): String = { throw new UnsupportedOperationException("dropIndex is not supported") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 96b544bb03ef3..cc04b5c7c9296 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuilder import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.errors.QueryExecutionErrors @@ -150,7 +151,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // https://dev.mysql.com/doc/refman/8.0/en/create-index.html override def createIndex( indexName: String, - tableName: String, + tableIdent: Identifier, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { @@ -159,7 +160,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // columnsProperties doesn't apply to MySQL so it is ignored s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" + - s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" + + s" ${quoteIdentifier(tableIdent.name())} (${columnList.mkString(", ")})" + s" ${indexPropertyList.mkString(" ")}" } @@ -168,14 +169,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { override def indexExists( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Boolean = { - val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'" + val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableIdent.name())} " + + s"WHERE key_name = '$indexName'" JdbcUtils.checkIfIndexExists(conn, sql, options) } - override def dropIndex(indexName: String, tableName: String): String = { - s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName" + override def dropIndex(indexName: String, tableIdent: Identifier): String = { + s"DROP INDEX ${quoteIdentifier(indexName)} ON ${tableIdent.name()}" } // SHOW INDEX syntax diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 551f8d6262191..cb78bc806e2f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -24,6 +24,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo @@ -182,7 +183,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { // https://www.postgresql.org/docs/14/sql-createindex.html override def createIndex( indexName: String, - tableName: String, + tableIdent: Identifier, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { @@ -194,7 +195,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")" } - s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" + + s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableIdent.name())}" + s" $indexType (${columnList.mkString(", ")}) $indexProperties" } @@ -203,16 +204,16 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { override def indexExists( conn: Connection, indexName: String, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Boolean = { - val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" + + val sql = s"SELECT * FROM pg_indexes WHERE tablename = '${tableIdent.name()}' AND" + s" indexname = '$indexName'" JdbcUtils.checkIfIndexExists(conn, sql, options) } // DROP INDEX syntax // https://www.postgresql.org/docs/14/sql-dropindex.html - override def dropIndex(indexName: String, tableName: String): String = { + override def dropIndex(indexName: String, tableIdent: Identifier): String = { s"DROP INDEX ${quoteIdentifier(indexName)}" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 2dd6280091bc0..7608c0b148d69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -28,7 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort} import org.apache.spark.sql.connector.{IntegralAverage, StrLen} +import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction} +import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog @@ -2217,4 +2219,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel JdbcDialects.registerDialect(H2Dialect) } } + + test("Test INDEX Using SQL") { + val loaded = Catalogs.load("h2", conf) + val jdbcTable = loaded.asInstanceOf[TableCatalog] + .loadTable(Identifier.of(Array("test"), "people")) + .asInstanceOf[SupportsIndex] + assert(jdbcTable != null) + assert(jdbcTable.indexExists("people_index") == false) + + sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") + assert(jdbcTable.indexExists("people_index")) + + sql(s"DROP INDEX people_index ON TABLE h2.test.people") + assert(jdbcTable.indexExists("people_index") == false) + } }