Skip to content

Commit be7a2fc

Browse files
committed
[SPARK-13078][SQL] API and test cases for internal catalog
This pull request creates an internal catalog API. The creation of this API is the first step towards consolidating SQLContext and HiveContext. I envision we will have two different implementations in Spark 2.0: (1) a simple in-memory implementation, and (2) an implementation based on the current HiveClient (ClientWrapper). I took a look at what Hive's internal metastore interface/implementation, and then created this API based on it. I believe this is the minimal set needed in order to achieve all the needed functionality. Author: Reynold Xin <[email protected]> Closes #10982 from rxin/SPARK-13078.
1 parent a2973fe commit be7a2fc

File tree

4 files changed

+710
-0
lines changed

4 files changed

+710
-0
lines changed
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.catalog
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.AnalysisException
23+
24+
25+
/**
26+
* An in-memory (ephemeral) implementation of the system catalog.
27+
*
28+
* All public methods should be synchronized for thread-safety.
29+
*/
30+
class InMemoryCatalog extends Catalog {
31+
32+
private class TableDesc(var table: Table) {
33+
val partitions = new mutable.HashMap[String, TablePartition]
34+
}
35+
36+
private class DatabaseDesc(var db: Database) {
37+
val tables = new mutable.HashMap[String, TableDesc]
38+
val functions = new mutable.HashMap[String, Function]
39+
}
40+
41+
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
42+
43+
private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
44+
val regex = pattern.replaceAll("\\*", ".*").r
45+
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
46+
}
47+
48+
private def existsFunction(db: String, funcName: String): Boolean = {
49+
catalog(db).functions.contains(funcName)
50+
}
51+
52+
private def existsTable(db: String, table: String): Boolean = {
53+
catalog(db).tables.contains(table)
54+
}
55+
56+
private def assertDbExists(db: String): Unit = {
57+
if (!catalog.contains(db)) {
58+
throw new AnalysisException(s"Database $db does not exist")
59+
}
60+
}
61+
62+
private def assertFunctionExists(db: String, funcName: String): Unit = {
63+
assertDbExists(db)
64+
if (!existsFunction(db, funcName)) {
65+
throw new AnalysisException(s"Function $funcName does not exists in $db database")
66+
}
67+
}
68+
69+
private def assertTableExists(db: String, table: String): Unit = {
70+
assertDbExists(db)
71+
if (!existsTable(db, table)) {
72+
throw new AnalysisException(s"Table $table does not exists in $db database")
73+
}
74+
}
75+
76+
// --------------------------------------------------------------------------
77+
// Databases
78+
// --------------------------------------------------------------------------
79+
80+
override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
81+
if (catalog.contains(dbDefinition.name)) {
82+
if (!ifNotExists) {
83+
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
84+
}
85+
} else {
86+
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
87+
}
88+
}
89+
90+
override def dropDatabase(
91+
db: String,
92+
ignoreIfNotExists: Boolean,
93+
cascade: Boolean): Unit = synchronized {
94+
if (catalog.contains(db)) {
95+
if (!cascade) {
96+
// If cascade is false, make sure the database is empty.
97+
if (catalog(db).tables.nonEmpty) {
98+
throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
99+
}
100+
if (catalog(db).functions.nonEmpty) {
101+
throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
102+
}
103+
}
104+
// Remove the database.
105+
catalog.remove(db)
106+
} else {
107+
if (!ignoreIfNotExists) {
108+
throw new AnalysisException(s"Database $db does not exist")
109+
}
110+
}
111+
}
112+
113+
override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
114+
assertDbExists(db)
115+
assert(db == dbDefinition.name)
116+
catalog(db).db = dbDefinition
117+
}
118+
119+
override def getDatabase(db: String): Database = synchronized {
120+
assertDbExists(db)
121+
catalog(db).db
122+
}
123+
124+
override def listDatabases(): Seq[String] = synchronized {
125+
catalog.keySet.toSeq
126+
}
127+
128+
override def listDatabases(pattern: String): Seq[String] = synchronized {
129+
filterPattern(listDatabases(), pattern)
130+
}
131+
132+
// --------------------------------------------------------------------------
133+
// Tables
134+
// --------------------------------------------------------------------------
135+
136+
override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
137+
: Unit = synchronized {
138+
assertDbExists(db)
139+
if (existsTable(db, tableDefinition.name)) {
140+
if (!ifNotExists) {
141+
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
142+
}
143+
} else {
144+
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
145+
}
146+
}
147+
148+
override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
149+
: Unit = synchronized {
150+
assertDbExists(db)
151+
if (existsTable(db, table)) {
152+
catalog(db).tables.remove(table)
153+
} else {
154+
if (!ignoreIfNotExists) {
155+
throw new AnalysisException(s"Table $table does not exist in $db database")
156+
}
157+
}
158+
}
159+
160+
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
161+
assertTableExists(db, oldName)
162+
val oldDesc = catalog(db).tables(oldName)
163+
oldDesc.table = oldDesc.table.copy(name = newName)
164+
catalog(db).tables.put(newName, oldDesc)
165+
catalog(db).tables.remove(oldName)
166+
}
167+
168+
override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
169+
assertTableExists(db, table)
170+
assert(table == tableDefinition.name)
171+
catalog(db).tables(table).table = tableDefinition
172+
}
173+
174+
override def getTable(db: String, table: String): Table = synchronized {
175+
assertTableExists(db, table)
176+
catalog(db).tables(table).table
177+
}
178+
179+
override def listTables(db: String): Seq[String] = synchronized {
180+
assertDbExists(db)
181+
catalog(db).tables.keySet.toSeq
182+
}
183+
184+
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
185+
assertDbExists(db)
186+
filterPattern(listTables(db), pattern)
187+
}
188+
189+
// --------------------------------------------------------------------------
190+
// Partitions
191+
// --------------------------------------------------------------------------
192+
193+
override def alterPartition(db: String, table: String, part: TablePartition)
194+
: Unit = synchronized {
195+
throw new UnsupportedOperationException
196+
}
197+
198+
override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
199+
: Unit = synchronized {
200+
throw new UnsupportedOperationException
201+
}
202+
203+
// --------------------------------------------------------------------------
204+
// Functions
205+
// --------------------------------------------------------------------------
206+
207+
override def createFunction(
208+
db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
209+
assertDbExists(db)
210+
211+
if (existsFunction(db, func.name)) {
212+
if (!ifNotExists) {
213+
throw new AnalysisException(s"Function $func already exists in $db database")
214+
}
215+
} else {
216+
catalog(db).functions.put(func.name, func)
217+
}
218+
}
219+
220+
override def dropFunction(db: String, funcName: String): Unit = synchronized {
221+
assertFunctionExists(db, funcName)
222+
catalog(db).functions.remove(funcName)
223+
}
224+
225+
override def alterFunction(db: String, funcName: String, funcDefinition: Function)
226+
: Unit = synchronized {
227+
assertFunctionExists(db, funcName)
228+
if (funcName != funcDefinition.name) {
229+
// Also a rename; remove the old one and add the new one back
230+
catalog(db).functions.remove(funcName)
231+
}
232+
catalog(db).functions.put(funcName, funcDefinition)
233+
}
234+
235+
override def getFunction(db: String, funcName: String): Function = synchronized {
236+
assertFunctionExists(db, funcName)
237+
catalog(db).functions(funcName)
238+
}
239+
240+
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
241+
assertDbExists(db)
242+
val regex = pattern.replaceAll("\\*", ".*").r
243+
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
244+
}
245+
246+
}

0 commit comments

Comments
 (0)