Skip to content

Commit 774f1df

Browse files
huaxingaochenzhx
authored andcommitted
[SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL dialect)
### What changes were proposed in this pull request? This PR implements `dropIndex` and `listIndexes` in MySQL dialect ### Why are the changes needed? As a subtask of the V2 Index support, this PR completes the implementation for JDBC V2 index support. ### Does this PR introduce _any_ user-facing change? Yes, `dropIndex/listIndexes` in DS V2 JDBC ### How was this patch tested? new tests Closes apache#34236 from huaxingao/listIndexJDBC. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent efa440b commit 774f1df

File tree

9 files changed

+245
-67
lines changed

9 files changed

+245
-67
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._
2424

2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.sql.AnalysisException
27-
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
28-
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
2927
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
3028
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
3129
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
@@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
121119
assert(t.schema === expectedSchema)
122120
}
123121

124-
override def testIndex(tbl: String): Unit = {
125-
val loaded = Catalogs.load("mysql", conf)
126-
val jdbcTable = loaded.asInstanceOf[TableCatalog]
127-
.loadTable(Identifier.of(Array.empty[String], "new_table"))
128-
.asInstanceOf[SupportsIndex]
129-
assert(jdbcTable.indexExists("i1") == false)
130-
assert(jdbcTable.indexExists("i2") == false)
122+
override def supportsIndex: Boolean = true
131123

124+
override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
132125
val properties = new util.Properties();
133126
properties.put("KEY_BLOCK_SIZE", "10")
134127
properties.put("COMMENT", "'this is a comment'")
135-
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
128+
// MySQL doesn't allow property set on individual column, so use empty Array for
129+
// column properties
130+
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
136131
Array.empty[util.Map[NamedReference, util.Properties]], properties)
137132

138-
jdbcTable.createIndex("i2", "",
139-
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
140-
Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties)
141-
142-
assert(jdbcTable.indexExists("i1") == true)
143-
assert(jdbcTable.indexExists("i2") == true)
144-
145-
val m = intercept[IndexAlreadyExistsException] {
146-
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
147-
Array.empty[util.Map[NamedReference, util.Properties]], properties)
148-
}.getMessage
149-
assert(m.contains("Failed to create index: i1 in new_table"))
133+
var index = jdbcTable.listIndexes()
134+
// The index property size is actually 1. Even though the index is created
135+
// with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
136+
// retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
137+
assert(index(0).properties.size == 1)
138+
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
150139
}
151140
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.sql.jdbc.v2
1919

20+
import java.util
21+
2022
import org.apache.log4j.Level
2123

22-
import org.apache.spark.sql.AnalysisException
23-
import org.apache.spark.sql.DataFrame
24-
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample}
24+
import org.apache.spark.sql.{AnalysisException, DataFrame}
25+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample}
26+
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
27+
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
28+
import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc
2529
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
2630
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
2731
import org.apache.spark.sql.test.SharedSparkSession
@@ -186,6 +190,96 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
186190
}
187191
}
188192

193+
def supportsIndex: Boolean = false
194+
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
195+
196+
test("SPARK-36913: Test INDEX") {
197+
if (supportsIndex) {
198+
withTable(s"$catalogName.new_table") {
199+
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
200+
s" col4 INT, col5 INT)")
201+
val loaded = Catalogs.load(catalogName, conf)
202+
val jdbcTable = loaded.asInstanceOf[TableCatalog]
203+
.loadTable(Identifier.of(Array.empty[String], "new_table"))
204+
.asInstanceOf[SupportsIndex]
205+
assert(jdbcTable.indexExists("i1") == false)
206+
assert(jdbcTable.indexExists("i2") == false)
207+
208+
val properties = new util.Properties();
209+
val indexType = "DUMMY"
210+
var m = intercept[UnsupportedOperationException] {
211+
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
212+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
213+
}.getMessage
214+
assert(m.contains(s"Index Type $indexType is not supported." +
215+
s" The supported Index Types are: BTREE and HASH"))
216+
217+
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
218+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
219+
220+
jdbcTable.createIndex("i2", "",
221+
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
222+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
223+
224+
assert(jdbcTable.indexExists("i1") == true)
225+
assert(jdbcTable.indexExists("i2") == true)
226+
227+
m = intercept[IndexAlreadyExistsException] {
228+
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
229+
Array.empty[util.Map[NamedReference, util.Properties]], properties)
230+
}.getMessage
231+
assert(m.contains("Failed to create index: i1 in new_table"))
232+
233+
var index = jdbcTable.listIndexes()
234+
assert(index.length == 2)
235+
236+
assert(index(0).indexName.equals("i1"))
237+
assert(index(0).indexType.equals("BTREE"))
238+
var cols = index(0).columns
239+
assert(cols.length == 1)
240+
assert(cols(0).describe().equals("col1"))
241+
assert(index(0).properties.size == 0)
242+
243+
assert(index(1).indexName.equals("i2"))
244+
assert(index(1).indexType.equals("BTREE"))
245+
cols = index(1).columns
246+
assert(cols.length == 3)
247+
assert(cols(0).describe().equals("col2"))
248+
assert(cols(1).describe().equals("col3"))
249+
assert(cols(2).describe().equals("col5"))
250+
assert(index(1).properties.size == 0)
251+
252+
jdbcTable.dropIndex("i1")
253+
assert(jdbcTable.indexExists("i1") == false)
254+
assert(jdbcTable.indexExists("i2") == true)
255+
256+
index = jdbcTable.listIndexes()
257+
assert(index.length == 1)
258+
259+
assert(index(0).indexName.equals("i2"))
260+
assert(index(0).indexType.equals("BTREE"))
261+
cols = index(0).columns
262+
assert(cols.length == 3)
263+
assert(cols(0).describe().equals("col2"))
264+
assert(cols(1).describe().equals("col3"))
265+
assert(cols(2).describe().equals("col5"))
266+
267+
jdbcTable.dropIndex("i2")
268+
assert(jdbcTable.indexExists("i1") == false)
269+
assert(jdbcTable.indexExists("i2") == false)
270+
index = jdbcTable.listIndexes()
271+
assert(index.length == 0)
272+
273+
m = intercept[NoSuchIndexException] {
274+
jdbcTable.dropIndex("i2")
275+
}.getMessage
276+
assert(m.contains("Failed to drop index: i2"))
277+
278+
testIndexProperties(jdbcTable)
279+
}
280+
}
281+
}
282+
189283
def supportsTableSample: Boolean = false
190284

191285
private def samplePushed(df: DataFrame): Boolean = {
@@ -219,16 +313,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
219313
scan.schema.names.sameElements(Seq(col))
220314
}
221315

222-
223-
def testIndex(tbl: String): Unit = {}
224-
225-
test("SPARK-36913: Test INDEX") {
226-
withTable(s"$catalogName.new_table") {
227-
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
228-
testIndex(s"$catalogName.new_table")
229-
}
230-
}
231-
232316
test("SPARK-37038: Test TABLESAMPLE") {
233317
if (supportsTableSample) {
234318
withTable(s"$catalogName.new_table") {

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public interface SupportsIndex extends Table {
4242
* @param columns the columns on which index to be created
4343
* @param columnsProperties the properties of the columns on which index to be created
4444
* @param properties the properties of the index to be created
45-
* @throws IndexAlreadyExistsException If the index already exists (optional)
45+
* @throws IndexAlreadyExistsException If the index already exists.
4646
*/
4747
void createIndex(String indexName,
4848
String indexType,
@@ -55,10 +55,9 @@ void createIndex(String indexName,
5555
* Drops the index with the given name.
5656
*
5757
* @param indexName the name of the index to be dropped.
58-
* @return true if the index is dropped
59-
* @throws NoSuchIndexException If the index does not exist (optional)
58+
* @throws NoSuchIndexException If the index does not exist.
6059
*/
61-
boolean dropIndex(String indexName) throws NoSuchIndexException;
60+
void dropIndex(String indexName) throws NoSuchIndexException;
6261

6362
/**
6463
* Checks whether an index exists in this table.

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,27 +53,25 @@ public TableIndex(
5353
/**
5454
* @return the Index name.
5555
*/
56-
String indexName() { return indexName; }
56+
public String indexName() { return indexName; }
5757

5858
/**
5959
* @return the indexType of this Index.
6060
*/
61-
String indexType() { return indexType; }
61+
public String indexType() { return indexType; }
6262

6363
/**
6464
* @return the column(s) this Index is on. Could be multi columns (a multi-column index).
6565
*/
66-
NamedReference[] columns() { return columns; }
66+
public NamedReference[] columns() { return columns; }
6767

6868
/**
6969
* @return the map of column and column property map.
7070
*/
71-
Map<NamedReference, Properties> columnProperties() { return columnProperties; }
71+
public Map<NamedReference, Properties> columnProperties() { return columnProperties; }
7272

7373
/**
7474
* Returns the index properties.
7575
*/
76-
Properties properties() {
77-
return properties;
78-
}
76+
public Properties properties() { return properties; }
7977
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,5 @@ class NoSuchPartitionsException(message: String) extends AnalysisException(messa
9696
class NoSuchTempFunctionException(func: String)
9797
extends AnalysisException(s"Temporary function '$func' not found")
9898

99-
class NoSuchIndexException(indexName: String)
100-
extends AnalysisException(s"Index '$indexName' not found")
99+
class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
100+
extends AnalysisException(message, cause = cause)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3838
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
3939
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
4040
import org.apache.spark.sql.connector.catalog.TableChange
41+
import org.apache.spark.sql.connector.catalog.index.TableIndex
4142
import org.apache.spark.sql.connector.expressions.NamedReference
4243
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
4344
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
@@ -1050,6 +1051,29 @@ object JdbcUtils extends Logging {
10501051
dialect.indexExists(conn, indexName, tableName, options)
10511052
}
10521053

1054+
/**
1055+
* Drop an index.
1056+
*/
1057+
def dropIndex(
1058+
conn: Connection,
1059+
indexName: String,
1060+
tableName: String,
1061+
options: JDBCOptions): Unit = {
1062+
val dialect = JdbcDialects.get(options.url)
1063+
executeStatement(conn, options, dialect.dropIndex(indexName, tableName))
1064+
}
1065+
1066+
/**
1067+
* List all the indexes in a table.
1068+
*/
1069+
def listIndexes(
1070+
conn: Connection,
1071+
tableName: String,
1072+
options: JDBCOptions): Array[TableIndex] = {
1073+
val dialect = JdbcDialects.get(options.url)
1074+
dialect.listIndexes(conn, tableName, options)
1075+
}
1076+
10531077
private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
10541078
val statement = conn.createStatement
10551079
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
7373
}
7474
}
7575

76-
override def dropIndex(indexName: String): Boolean = {
77-
throw new UnsupportedOperationException("dropIndex is not supported yet")
76+
override def dropIndex(indexName: String): Unit = {
77+
JdbcUtils.withConnection(jdbcOptions) { conn =>
78+
JdbcUtils.classifyException(s"Failed to drop index: $indexName",
79+
JdbcDialects.get(jdbcOptions.url)) {
80+
JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions)
81+
}
82+
}
7883
}
7984

8085
override def listIndexes(): Array[TableIndex] = {
81-
throw new UnsupportedOperationException("listIndexes is not supported yet")
86+
JdbcUtils.withConnection(jdbcOptions) { conn =>
87+
JdbcUtils.listIndexes(conn, name, jdbcOptions)
88+
}
8289
}
8390
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException
3131
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
3232
import org.apache.spark.sql.connector.catalog.TableChange
3333
import org.apache.spark.sql.connector.catalog.TableChange._
34+
import org.apache.spark.sql.connector.catalog.index.TableIndex
3435
import org.apache.spark.sql.connector.expressions.NamedReference
3536
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, Max, Min, Sum}
3637
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -327,14 +328,15 @@ abstract class JdbcDialect extends Serializable with Logging{
327328
}
328329

329330
/**
330-
* Creates an index.
331+
* Build a create index SQL statement.
331332
*
332333
* @param indexName the name of the index to be created
333334
* @param indexType the type of the index to be created
334335
* @param tableName the table on which index to be created
335336
* @param columns the columns on which index to be created
336337
* @param columnsProperties the properties of the columns on which index to be created
337338
* @param properties the properties of the index to be created
339+
* @return the SQL statement to use for creating the index.
338340
*/
339341
def createIndex(
340342
indexName: String,
@@ -363,6 +365,27 @@ abstract class JdbcDialect extends Serializable with Logging{
363365
throw new UnsupportedOperationException("indexExists is not supported")
364366
}
365367

368+
/**
369+
* Build a drop index SQL statement.
370+
*
371+
* @param indexName the name of the index to be dropped.
372+
* @param tableName the table name on which index to be dropped.
373+
* @return the SQL statement to use for dropping the index.
374+
*/
375+
def dropIndex(indexName: String, tableName: String): String = {
376+
throw new UnsupportedOperationException("dropIndex is not supported")
377+
}
378+
379+
/**
380+
* Lists all the indexes in this table.
381+
*/
382+
def listIndexes(
383+
conn: Connection,
384+
tableName: String,
385+
options: JDBCOptions): Array[TableIndex] = {
386+
throw new UnsupportedOperationException("listIndexes is not supported")
387+
}
388+
366389
/**
367390
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
368391
* @param message The error message to be placed to the returned exception.

0 commit comments

Comments
 (0)