Skip to content

Commit 77252af

Browse files
committed
init import: create database and create function.
1 parent 138c300 commit 77252af

File tree

6 files changed

+130
-4
lines changed

6 files changed

+130
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
2323
import scala.collection.mutable
2424
import scala.collection.mutable.ArrayBuffer
2525

26-
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.{AnalysisException, Row}
2727
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
2929

@@ -50,6 +50,10 @@ trait Catalog {
5050
throw new UnsupportedOperationException
5151
}
5252

53+
def runNativeCommand(sql: String): Seq[Row] = {
54+
throw new UnsupportedOperationException
55+
}
56+
5357
/**
5458
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
5559
* isTemporary is a Boolean value indicates if a table is a temporary or not.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,61 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
6262
val tableIdent = extractTableIdent(nameParts)
6363
RefreshTable(tableIdent)
6464

65+
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: createDatabaseArgs) =>
66+
val Seq(
67+
allowExisting,
68+
dbLocation,
69+
databaseComment,
70+
dbprops) = getClauses(Seq(
71+
"TOK_IFNOTEXISTS",
72+
"TOK_DATABASELOCATION",
73+
"TOK_DATABASECOMMENT",
74+
"TOK_DATABASEPROPERTIES"), createDatabaseArgs)
75+
76+
val location = dbLocation.map {
77+
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
78+
}
79+
val comment = databaseComment.map {
80+
case Token("TOK_DATABASECOMMENT", Token(comment, Nil) :: Nil) => unquoteString(comment)
81+
}
82+
val props: Map[String, String] = dbprops.toSeq.flatMap {
83+
case Token("TOK_DATABASEPROPERTIES", propList) =>
84+
propList.flatMap {
85+
case Token("TOK_DBPROPLIST", props) =>
86+
props.map {
87+
case Token("TOK_TABLEPROPERTY", keysAndValue) =>
88+
val key = keysAndValue.init.map(x => unquoteString(x.text)).mkString(".")
89+
val value = unquoteString(keysAndValue.last.text)
90+
(key, value)
91+
}
92+
}
93+
}.toMap
94+
95+
CreateDataBase(databaseName, allowExisting.isDefined, location, comment, props)(node.source)
96+
97+
case Token("TOK_CREATEFUNCTION", func :: as :: createFuncArgs) =>
98+
val funcName = func.map(x => unquoteString(x.text)).mkString(".")
99+
val asName = unquoteString(as.text)
100+
val Seq(
101+
rList,
102+
temp) = getClauses(Seq(
103+
"TOK_RESOURCE_LIST",
104+
"TOK_TEMPORARY"), createFuncArgs)
105+
106+
val resourcesMap: Map[String, String] = rList.toSeq.flatMap {
107+
case Token("TOK_RESOURCE_LIST", resources) =>
108+
resources.map {
109+
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
110+
val resourceType = rType match {
111+
case Token("TOK_JAR", Nil) => "jar"
112+
case Token("TOK_FILE", Nil) => "file"
113+
case Token("TOK_ARCHIVE", Nil) => "archive"
114+
}
115+
(resourceType, unquoteString(rPath))
116+
}
117+
}.toMap
118+
CreateFunction(funcName, asName, resourcesMap, temp.isDefined)(node.source)
119+
65120
case Token("TOK_CREATETABLEUSING", createTableArgs) =>
66121
val Seq(
67122
temp,

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,32 @@ case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
418418

419419
override val output: Seq[Attribute] = Seq.empty
420420
}
421+
422+
case class CreateDataBase(
423+
databaseName: String,
424+
allowExisting: Boolean,
425+
path: Option[String],
426+
comment: Option[String],
427+
props: Map[String, String])(sql: String) extends RunnableCommand with Logging {
428+
429+
override def run(sqlContext: SQLContext): Seq[Row] = {
430+
sqlContext.catalog.runNativeCommand(sql)
431+
}
432+
433+
override val output: Seq[Attribute] =
434+
Seq(AttributeReference("result", StringType, nullable = false)())
435+
}
436+
437+
case class CreateFunction(
438+
functionName: String,
439+
asName: String,
440+
resourcesMap: Map[String, String],
441+
isTemp: Boolean)(sql: String) extends RunnableCommand with Logging {
442+
443+
override def run(sqlContext: SQLContext): Seq[Row] = {
444+
sqlContext.catalog.runNativeCommand(sql)
445+
}
446+
447+
override val output: Seq[Attribute] =
448+
Seq(AttributeReference("result", StringType, nullable = false)())
449+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.plans.PlanTest
21+
22+
class SparkQlSuite extends PlanTest {
23+
val parser = new SparkQl()
24+
25+
test("create database") {
26+
parser.parsePlan("CREATE DATABASE IF NOT EXISTS database_name " +
27+
"COMMENT 'database_comment' LOCATION '/home/user/db' " +
28+
"WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
29+
}
30+
31+
test("create function") {
32+
parser.parsePlan("CREATE TEMPORARY FUNCTION helloworld as " +
33+
"'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar', " +
34+
"FILE 'path/to/file'")
35+
}
36+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata._
3131
import org.apache.hadoop.hive.ql.plan.TableDesc
3232

3333
import org.apache.spark.Logging
34-
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
34+
import org.apache.spark.sql.{AnalysisException, SaveMode, Row, SQLContext}
3535
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
3636
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
3737
import org.apache.spark.sql.catalyst.expressions._
@@ -715,6 +715,10 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
715715
override def setCurrentDatabase(databaseName: String): Unit = {
716716
client.setCurrentDatabase(databaseName)
717717
}
718+
719+
override def runNativeCommand(sql: String): Seq[Row] = {
720+
hive.runSqlHive(sql).map(Row(_))
721+
}
718722
}
719723

720724
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
107107
"TOK_ALTERVIEW_PROPERTIES",
108108
"TOK_ALTERVIEW_RENAME",
109109

110-
"TOK_CREATEDATABASE",
111-
"TOK_CREATEFUNCTION",
112110
"TOK_CREATEINDEX",
113111
"TOK_CREATEMACRO",
114112
"TOK_CREATEROLE",

0 commit comments

Comments
 (0)