Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
*/
def listIndexes(
conn: Connection,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
val dialect = JdbcDialects.get(options.url)
dialect.listIndexes(conn, tableName, options)
dialect.listIndexes(conn, tableIdent, options)
}

private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt

override def listIndexes(): Array[TableIndex] = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.listIndexes(conn, name, jdbcOptions)
JdbcUtils.listIndexes(conn, ident, jdbcOptions)
}
}
}
66 changes: 63 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}

Expand Down Expand Up @@ -110,6 +112,64 @@ private[sql] object H2Dialect extends JdbcDialect {
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

// See
// https://www.h2database.com/html/systemtables.html?#information_schema_indexes
// https://www.h2database.com/html/systemtables.html?#information_schema_index_columns
override def listIndexes(
conn: Connection,
tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
val sql = {
s"""
| SELECT
| i.INDEX_CATALOG AS INDEX_CATALOG,
| i.INDEX_SCHEMA AS INDEX_SCHEMA,
| i.INDEX_NAME AS INDEX_NAME,
| i.INDEX_TYPE_NAME AS INDEX_TYPE_NAME,
| i.REMARKS as REMARKS,
| ic.COLUMN_NAME AS COLUMN_NAME
| FROM INFORMATION_SCHEMA.INDEXES i, INFORMATION_SCHEMA.INDEX_COLUMNS ic
| WHERE i.TABLE_CATALOG = ic.TABLE_CATALOG
| AND i.TABLE_SCHEMA = ic.TABLE_SCHEMA
| AND i.TABLE_NAME = ic.TABLE_NAME
| AND i.INDEX_CATALOG = ic.INDEX_CATALOG
| AND i.INDEX_SCHEMA = ic.INDEX_SCHEMA
| AND i.INDEX_NAME = ic.INDEX_NAME
| AND i.TABLE_NAME = '${tableIdent.name()}'
| AND i.INDEX_SCHEMA = '${tableIdent.namespace().last}'
|""".stripMargin
}
var indexMap: Map[String, TableIndex] = Map()
try {
JdbcUtils.executeQuery(conn, options, sql) { rs =>
while (rs.next()) {
val indexName = rs.getString("INDEX_NAME")
val colName = rs.getString("COLUMN_NAME")
val indexType = rs.getString("INDEX_TYPE_NAME")
val indexComment = rs.getString("REMARKS")
if (indexMap.contains(indexName)) {
val index = indexMap(indexName)
val newIndex = new TableIndex(indexName, indexType,
index.columns() :+ FieldReference(colName),
index.columnProperties, index.properties)
indexMap += (indexName -> newIndex)
} else {
val properties = new util.Properties()
if (StringUtils.isNotEmpty(indexComment)) properties.put("COMMENT", indexComment)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I found possible null pointer in the implementation of listIndexes(H2), indexComment default value is null.
However, the default value of MySQLDIalect is empty String.
@huaxingao

val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)),
new util.HashMap[NamedReference, util.Properties](), properties)
indexMap += (indexName -> index)
}
}
}
} catch {
case _: Exception =>
logWarning("Cannot retrieved index info.")
}

indexMap.values.toArray
}

private def tableNameWithSchema(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}
Expand Down Expand Up @@ -161,7 +221,7 @@ private[sql] object H2Dialect extends JdbcDialect {
funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) {
throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
s"support aggregate function: $funcName with DISTINCT");
s"support aggregate function: $funcName with DISTINCT")
} else {
super.visitAggregateFunction(funcName, isDistinct, inputs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def listIndexes(
conn: Connection,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
throw new UnsupportedOperationException("listIndexes is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/show-index.html
override def listIndexes(
conn: Connection,
tableName: String,
tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
val sql = s"SHOW INDEXES FROM $tableName"
val sql = s"SHOW INDEXES FROM ${tableIdent.name()}"
var indexMap: Map[String, TableIndex] = Map()
try {
JdbcUtils.executeQuery(conn, options, sql) { rs =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2259,11 +2259,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.asInstanceOf[SupportsIndex]
assert(jdbcTable != null)
assert(jdbcTable.indexExists("people_index") == false)
val indexes1 = jdbcTable.listIndexes()
assert(indexes1.isEmpty)

sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
assert(jdbcTable.indexExists("people_index"))
val indexes2 = jdbcTable.listIndexes()
assert(!indexes2.isEmpty)
assert(indexes2.size == 1)
val tableIndex = indexes2.head
assert(tableIndex.indexName() == "people_index")

sql(s"DROP INDEX people_index ON TABLE h2.test.people")
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
assert(indexes3.isEmpty)
}
}