Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -1033,14 +1033,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
def createIndex(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String],
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
dialect.createIndex(indexName, tableIdent, columns, columnsProperties, properties))
}

/**
Expand All @@ -1049,10 +1049,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
def indexExists(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Boolean = {
val dialect = JdbcDialects.get(options.url)
dialect.indexExists(conn, indexName, tableName, options)
dialect.indexExists(conn, indexName, tableIdent, options)
}

/**
Expand All @@ -1061,10 +1061,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
def dropIndex(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
executeStatement(conn, options, dialect.dropIndex(indexName, tableIdent))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,22 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
}
}

override def indexExists(indexName: String): Boolean = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
JdbcUtils.indexExists(conn, indexName, ident, jdbcOptions)
}
}

override def dropIndex(indexName: String): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to drop index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
}
Expand Down
56 changes: 53 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

package org.apache.spark.sql.jdbc

import java.sql.{SQLException, Types}
import java.sql.{Connection, SQLException, Types}
import java.util
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}

private[sql] object H2Dialect extends JdbcDialect {
Expand Down Expand Up @@ -74,6 +77,47 @@ private[sql] object H2Dialect extends JdbcDialect {
functionMap.clear()
}

// CREATE INDEX syntax
// https://www.h2database.com/html/commands.html#create_index
override def createIndex(
indexName: String,
tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
val (indexType, _) = JdbcUtils.processIndexProperties(properties, "h2")

s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON " +
s"${tableNameWithSchema(tableIdent)} (${columnList.mkString(", ")})"
}

// DROP INDEX syntax
// https://www.h2database.com/html/commands.html#drop_index
override def dropIndex(indexName: String, tableIdent: Identifier): String = {
s"DROP INDEX ${indexNameWithSchema(tableIdent, indexName)}"
}

// See https://www.h2database.com/html/systemtables.html?#information_schema_indexes
override def indexExists(
conn: Connection,
indexName: String,
tableIdent: Identifier,
options: JDBCOptions): Boolean = {
val sql = s"SELECT * FROM INFORMATION_SCHEMA.INDEXES WHERE " +
s"TABLE_SCHEMA = '${tableIdent.namespace().last}' AND " +
s"TABLE_NAME = '${tableIdent.name()}' AND INDEX_NAME = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

private def tableNameWithSchema(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}

private def indexNameWithSchema(ident: Identifier, indexName: String): String = {
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case exception: SQLException =>
Expand All @@ -88,6 +132,12 @@ private[sql] object H2Dialect extends JdbcDialect {
// SCHEMA_NOT_FOUND_1
case 90079 =>
throw NoSuchNamespaceException(message, cause = Some(e))
// INDEX_ALREADY_EXISTS_1
case 42111 =>
throw new IndexAlreadyExistsException(message, cause = Some(e))
// INDEX_NOT_FOUND_1
case 42112 =>
throw new NoSuchIndexException(message, cause = Some(e))
case _ => // do nothing
}
case _ => // do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
Expand Down Expand Up @@ -473,15 +473,15 @@ abstract class JdbcDialect extends Serializable with Logging {
* Build a create index SQL statement.
*
* @param indexName the name of the index to be created
* @param tableName the table on which index to be created
* @param tableIdent the table on which index to be created
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @return the SQL statement to use for creating the index.
*/
def createIndex(
indexName: String,
tableName: String,
tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
Expand All @@ -492,15 +492,15 @@ abstract class JdbcDialect extends Serializable with Logging {
* Checks whether an index exists
*
* @param indexName the name of the index
* @param tableName the table name on which index to be checked
* @param tableIdent the table on which index to be checked
* @param options JDBCOptions of the table
* @return true if the index with `indexName` exists in the table with `tableName`,
* false otherwise
*/
def indexExists(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Boolean = {
throw new UnsupportedOperationException("indexExists is not supported")
}
Expand All @@ -509,10 +509,10 @@ abstract class JdbcDialect extends Serializable with Logging {
* Build a drop index SQL statement.
*
* @param indexName the name of the index to be dropped.
* @param tableName the table name on which index to be dropped.
* @param tableIdent the table on which index to be dropped.
* @return the SQL statement to use for dropping the index.
*/
def dropIndex(indexName: String, tableName: String): String = {
def dropIndex(indexName: String, tableIdent: Identifier): String = {
throw new UnsupportedOperationException("dropIndex is not supported")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuilder
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -150,7 +151,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/create-index.html
override def createIndex(
indexName: String,
tableName: String,
tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
Expand All @@ -159,7 +160,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {

// columnsProperties doesn't apply to MySQL so it is ignored
s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" +
s" ${quoteIdentifier(tableIdent.name())} (${columnList.mkString(", ")})" +
s" ${indexPropertyList.mkString(" ")}"
}

Expand All @@ -168,14 +169,15 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
override def indexExists(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Boolean = {
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'"
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableIdent.name())} " +
s"WHERE key_name = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

override def dropIndex(indexName: String, tableName: String): String = {
s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName"
override def dropIndex(indexName: String, tableIdent: Identifier): String = {
s"DROP INDEX ${quoteIdentifier(indexName)} ON ${tableIdent.name()}"
}

// SHOW INDEX syntax
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
Expand Down Expand Up @@ -182,7 +183,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
// https://www.postgresql.org/docs/14/sql-createindex.html
override def createIndex(
indexName: String,
tableName: String,
tableIdent: Identifier,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
Expand All @@ -194,7 +195,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")"
}

s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" +
s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableIdent.name())}" +
s" $indexType (${columnList.mkString(", ")}) $indexProperties"
}

Expand All @@ -203,16 +204,16 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
override def indexExists(
conn: Connection,
indexName: String,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Boolean = {
val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" +
val sql = s"SELECT * FROM pg_indexes WHERE tablename = '${tableIdent.name()}' AND" +
s" indexname = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

// DROP INDEX syntax
// https://www.postgresql.org/docs/14/sql-dropindex.html
override def dropIndex(indexName: String, tableName: String): String = {
override def dropIndex(indexName: String, tableIdent: Identifier): String = {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sort}
import org.apache.spark.sql.connector.{IntegralAverage, StrLen}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
Expand Down Expand Up @@ -2217,4 +2219,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
JdbcDialects.registerDialect(H2Dialect)
}
}

test("Test INDEX Using SQL") {
val loaded = Catalogs.load("h2", conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array("test"), "people"))
.asInstanceOf[SupportsIndex]
assert(jdbcTable != null)
assert(jdbcTable.indexExists("people_index") == false)

sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
assert(jdbcTable.indexExists("people_index"))

sql(s"DROP INDEX people_index ON TABLE h2.test.people")
assert(jdbcTable.indexExists("people_index") == false)
}
}