Skip to content

Commit 7f53116

Browse files
wangyumgatorsmile
authored andcommitted
1 parent 60caa92 commit 7f53116

File tree

5 files changed

+206
-6
lines changed

5 files changed

+206
-6
lines changed

sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class GetTablesOperation extends MetadataOperation {
4646
private final String schemaName;
4747
private final String tableName;
4848
private final List<String> tableTypes = new ArrayList<String>();
49-
private final RowSet rowSet;
49+
protected final RowSet rowSet;
5050
private final TableTypeMapping tableTypeMapping;
5151

5252

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import java.util.{List => JList}
21+
22+
import scala.collection.JavaConverters.seqAsJavaListConverter
23+
24+
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
25+
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
26+
import org.apache.hive.service.cli._
27+
import org.apache.hive.service.cli.operation.GetTablesOperation
28+
import org.apache.hive.service.cli.session.HiveSession
29+
30+
import org.apache.spark.sql.SQLContext
31+
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
32+
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
33+
34+
/**
35+
* Spark's own GetTablesOperation
36+
*
37+
* @param sqlContext SQLContext to use
38+
* @param parentSession a HiveSession from SessionManager
39+
* @param catalogName catalog name. null if not applicable
40+
* @param schemaName database name, null or a concrete database name
41+
* @param tableName table name pattern
42+
* @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW"
43+
*/
44+
private[hive] class SparkGetTablesOperation(
45+
sqlContext: SQLContext,
46+
parentSession: HiveSession,
47+
catalogName: String,
48+
schemaName: String,
49+
tableName: String,
50+
tableTypes: JList[String])
51+
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) {
52+
53+
if (tableTypes != null) {
54+
this.tableTypes.addAll(tableTypes)
55+
}
56+
57+
override def runInternal(): Unit = {
58+
setState(OperationState.RUNNING)
59+
// Always use the latest class loader provided by executionHive's state.
60+
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
61+
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
62+
63+
val catalog = sqlContext.sessionState.catalog
64+
val schemaPattern = convertSchemaPattern(schemaName)
65+
val matchingDbs = catalog.listDatabases(schemaPattern)
66+
67+
if (isAuthV2Enabled) {
68+
val privObjs =
69+
HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava)
70+
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
71+
authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr)
72+
}
73+
74+
val tablePattern = convertIdentifierPattern(tableName, true)
75+
matchingDbs.foreach { dbName =>
76+
catalog.listTables(dbName, tablePattern).foreach { tableIdentifier =>
77+
val catalogTable = catalog.getTableMetadata(tableIdentifier)
78+
val tableType = tableTypeString(catalogTable.tableType)
79+
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) {
80+
val rowData = Array[AnyRef](
81+
"",
82+
catalogTable.database,
83+
catalogTable.identifier.table,
84+
tableType,
85+
catalogTable.comment.getOrElse(""))
86+
rowSet.addRow(rowData)
87+
}
88+
}
89+
}
90+
setState(OperationState.FINISHED)
91+
}
92+
93+
private def tableTypeString(tableType: CatalogTableType): String = tableType match {
94+
case EXTERNAL | MANAGED => "TABLE"
95+
case VIEW => "VIEW"
96+
case t =>
97+
throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t")
98+
}
99+
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver.server
1919

20-
import java.util.{Map => JMap}
20+
import java.util.{List => JList, Map => JMap}
2121
import java.util.concurrent.ConcurrentHashMap
2222

2323
import org.apache.hive.service.cli._
24-
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetSchemasOperation, Operation, OperationManager}
24+
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
2525
import org.apache.hive.service.cli.session.HiveSession
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.SQLContext
2929
import org.apache.spark.sql.hive.HiveUtils
30-
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetSchemasOperation}
30+
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
3131
import org.apache.spark.sql.internal.SQLConf
3232

3333
/**
@@ -76,6 +76,22 @@ private[thriftserver] class SparkSQLOperationManager()
7676
operation
7777
}
7878

79+
override def newGetTablesOperation(
80+
parentSession: HiveSession,
81+
catalogName: String,
82+
schemaName: String,
83+
tableName: String,
84+
tableTypes: JList[String]): MetadataOperation = synchronized {
85+
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
86+
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
87+
" initialized or had already closed.")
88+
val operation = new SparkGetTablesOperation(sqlContext, parentSession,
89+
catalogName, schemaName, tableName, tableTypes)
90+
handleToOperation.put(operation.getHandle, operation)
91+
logDebug(s"Created GetTablesOperation with session=$parentSession.")
92+
operation
93+
}
94+
7995
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
8096
val iterator = confMap.entrySet().iterator()
8197
while (iterator.hasNext) {

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
280280
var defaultV2: String = null
281281
var data: ArrayBuffer[Int] = null
282282

283-
withMultipleConnectionJdbcStatement("test_map")(
283+
withMultipleConnectionJdbcStatement("test_map", "db1.test_map2")(
284284
// create table
285285
{ statement =>
286286

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import java.util.Properties
20+
import java.util.{Arrays => JArrays, List => JList, Properties}
2121

2222
import org.apache.hive.jdbc.{HiveConnection, HiveQueryResultSet, Utils => JdbcUtils}
2323
import org.apache.hive.service.auth.PlainSaslHelper
@@ -100,4 +100,89 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
100100
}
101101
}
102102
}
103+
104+
test("Spark's own GetTablesOperation(SparkGetTablesOperation)") {
105+
def testGetTablesOperation(
106+
schema: String,
107+
tableNamePattern: String,
108+
tableTypes: JList[String])(f: HiveQueryResultSet => Unit): Unit = {
109+
val rawTransport = new TSocket("localhost", serverPort)
110+
val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties)
111+
val user = System.getProperty("user.name")
112+
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
113+
val client = new TCLIService.Client(new TBinaryProtocol(transport))
114+
transport.open()
115+
116+
var rs: HiveQueryResultSet = null
117+
118+
try {
119+
val openResp = client.OpenSession(new TOpenSessionReq)
120+
val sessHandle = openResp.getSessionHandle
121+
122+
val getTableReq = new TGetTablesReq(sessHandle)
123+
getTableReq.setSchemaName(schema)
124+
getTableReq.setTableName(tableNamePattern)
125+
getTableReq.setTableTypes(tableTypes)
126+
127+
val getTableResp = client.GetTables(getTableReq)
128+
129+
JdbcUtils.verifySuccess(getTableResp.getStatus)
130+
131+
rs = new HiveQueryResultSet.Builder(connection)
132+
.setClient(client)
133+
.setSessionHandle(sessHandle)
134+
.setStmtHandle(getTableResp.getOperationHandle)
135+
.build()
136+
137+
f(rs)
138+
} finally {
139+
rs.close()
140+
connection.close()
141+
transport.close()
142+
rawTransport.close()
143+
}
144+
}
145+
146+
def checkResult(tableNames: Seq[String], rs: HiveQueryResultSet): Unit = {
147+
if (tableNames.nonEmpty) {
148+
for (i <- tableNames.indices) {
149+
assert(rs.next())
150+
assert(rs.getString("TABLE_NAME") === tableNames(i))
151+
}
152+
} else {
153+
assert(!rs.next())
154+
}
155+
}
156+
157+
withJdbcStatement("table1", "table2") { statement =>
158+
Seq(
159+
"CREATE TABLE table1(key INT, val STRING)",
160+
"CREATE TABLE table2(key INT, val STRING)",
161+
"CREATE VIEW view1 AS SELECT * FROM table2").foreach(statement.execute)
162+
163+
testGetTablesOperation("%", "%", null) { rs =>
164+
checkResult(Seq("table1", "table2", "view1"), rs)
165+
}
166+
167+
testGetTablesOperation("%", "table1", null) { rs =>
168+
checkResult(Seq("table1"), rs)
169+
}
170+
171+
testGetTablesOperation("%", "table_not_exist", null) { rs =>
172+
checkResult(Seq.empty, rs)
173+
}
174+
175+
testGetTablesOperation("%", "%", JArrays.asList("TABLE")) { rs =>
176+
checkResult(Seq("table1", "table2"), rs)
177+
}
178+
179+
testGetTablesOperation("%", "%", JArrays.asList("VIEW")) { rs =>
180+
checkResult(Seq("view1"), rs)
181+
}
182+
183+
testGetTablesOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs =>
184+
checkResult(Seq("table1", "table2", "view1"), rs)
185+
}
186+
}
187+
}
103188
}

0 commit comments

Comments
 (0)