Skip to content

Commit 0266a0c

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian <[email protected]> Closes apache#1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements
1 parent 0402bd7 commit 0266a0c

File tree

10 files changed

+152
-47
lines changed

10 files changed

+152
-47
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
111111
protected val APPROXIMATE = Keyword("APPROXIMATE")
112112
protected val AVG = Keyword("AVG")
113113
protected val BY = Keyword("BY")
114+
protected val CACHE = Keyword("CACHE")
114115
protected val CAST = Keyword("CAST")
115116
protected val COUNT = Keyword("COUNT")
116117
protected val DESC = Keyword("DESC")
@@ -149,7 +150,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
149150
protected val SEMI = Keyword("SEMI")
150151
protected val STRING = Keyword("STRING")
151152
protected val SUM = Keyword("SUM")
153+
protected val TABLE = Keyword("TABLE")
152154
protected val TRUE = Keyword("TRUE")
155+
protected val UNCACHE = Keyword("UNCACHE")
153156
protected val UNION = Keyword("UNION")
154157
protected val WHERE = Keyword("WHERE")
155158

@@ -189,7 +192,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
189192
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
190193
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
191194
)
192-
| insert
195+
| insert | cache
193196
)
194197

195198
protected lazy val select: Parser[LogicalPlan] =
@@ -220,6 +223,11 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
220223
InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
221224
}
222225

226+
protected lazy val cache: Parser[LogicalPlan] =
227+
(CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
228+
case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
229+
}
230+
223231
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
224232

225233
protected lazy val projection: Parser[Expression] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.plans.QueryPlan
23-
import org.apache.spark.sql.catalyst.types.{StringType, StructType}
23+
import org.apache.spark.sql.catalyst.types.StructType
2424
import org.apache.spark.sql.catalyst.trees
2525

2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
@@ -96,39 +96,6 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
9696
def references = Set.empty
9797
}
9898

99-
/**
100-
* A logical node that represents a non-query command to be executed by the system. For example,
101-
* commands can be used by parsers to represent DDL operations.
102-
*/
103-
abstract class Command extends LeafNode {
104-
self: Product =>
105-
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
106-
}
107-
108-
/**
109-
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
110-
* commands that are passed directly to another system.
111-
*/
112-
case class NativeCommand(cmd: String) extends Command
113-
114-
/**
115-
* Commands of the form "SET (key) (= value)".
116-
*/
117-
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
118-
override def output = Seq(
119-
AttributeReference("key", StringType, nullable = false)(),
120-
AttributeReference("value", StringType, nullable = false)()
121-
)
122-
}
123-
124-
/**
125-
* Returned by a parser when the users only wants to see what query plan would be executed, without
126-
* actually performing the execution.
127-
*/
128-
case class ExplainCommand(plan: LogicalPlan) extends Command {
129-
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
130-
}
131-
13299
/**
133100
* A logical plan node with single child.
134101
*/
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
21+
import org.apache.spark.sql.catalyst.types.StringType
22+
23+
/**
24+
* A logical node that represents a non-query command to be executed by the system. For example,
25+
* commands can be used by parsers to represent DDL operations.
26+
*/
27+
abstract class Command extends LeafNode {
28+
self: Product =>
29+
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
30+
}
31+
32+
/**
33+
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
34+
* commands that are passed directly to another system.
35+
*/
36+
case class NativeCommand(cmd: String) extends Command
37+
38+
/**
39+
* Commands of the form "SET (key) (= value)".
40+
*/
41+
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
42+
override def output = Seq(
43+
AttributeReference("key", StringType, nullable = false)(),
44+
AttributeReference("value", StringType, nullable = false)()
45+
)
46+
}
47+
48+
/**
49+
* Returned by a parser when the users only wants to see what query plan would be executed, without
50+
* actually performing the execution.
51+
*/
52+
case class ExplainCommand(plan: LogicalPlan) extends Command {
53+
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
54+
}
55+
56+
/**
57+
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
58+
*/
59+
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
60+

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
188188
}
189189
}
190190

191+
/** Returns true if the table is currently cached in-memory. */
192+
def isCached(tableName: String): Boolean = {
193+
val relation = catalog.lookupRelation(None, tableName)
194+
EliminateAnalysisOperators(relation) match {
195+
case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
196+
case _ => false
197+
}
198+
}
199+
191200
protected[sql] class SparkPlanner extends SparkStrategies {
192201
val sparkContext = self.sparkContext
193202

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
239239
case logical.SetCommand(key, value) =>
240240
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
241241
case logical.ExplainCommand(child) =>
242-
val qe = context.executePlan(child)
243-
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
242+
val executedPlan = context.executePlan(child).executedPlan
243+
Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
244+
case logical.CacheCommand(tableName, cache) =>
245+
Seq(execution.CacheCommandPhysical(tableName, cache)(context))
244246
case _ => Nil
245247
}
246248
}
247-
248249
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
6565

6666
override def otherCopyArgs = context :: Nil
6767
}
68+
69+
/**
70+
* :: DeveloperApi ::
71+
*/
72+
@DeveloperApi
73+
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
74+
extends LeafNode {
75+
76+
lazy val commandSideEffect = {
77+
if (doCache) {
78+
context.cacheTable(tableName)
79+
} else {
80+
context.uncacheTable(tableName)
81+
}
82+
}
83+
84+
override def execute(): RDD[Row] = {
85+
commandSideEffect
86+
context.emptyResult
87+
}
88+
89+
override def output: Seq[Attribute] = Seq.empty
90+
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest {
7070
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
7171
TestSQLContext.uncacheTable("testData")
7272
}
73+
74+
test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
75+
TestSQLContext.sql("CACHE TABLE testData")
76+
TestSQLContext.table("testData").queryExecution.executedPlan match {
77+
case _: InMemoryColumnarTableScan => // Found evidence of caching
78+
case _ => fail(s"Table 'testData' should be cached")
79+
}
80+
assert(TestSQLContext.isCached("testData"), "Table 'testData' should be cached")
81+
82+
TestSQLContext.sql("UNCACHE TABLE testData")
83+
TestSQLContext.table("testData").queryExecution.executedPlan match {
84+
case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached")
85+
case _ => // Found evidence of uncaching
86+
}
87+
assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
88+
}
7389
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,15 +218,19 @@ private[hive] object HiveQl {
218218
case Array(key, value) => // "set key=value"
219219
SetCommand(Some(key), Some(value))
220220
}
221-
} else if (sql.toLowerCase.startsWith("add jar")) {
221+
} else if (sql.trim.toLowerCase.startsWith("cache table")) {
222+
CacheCommand(sql.drop(12).trim, true)
223+
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
224+
CacheCommand(sql.drop(14).trim, false)
225+
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
222226
AddJar(sql.drop(8))
223-
} else if (sql.toLowerCase.startsWith("add file")) {
227+
} else if (sql.trim.toLowerCase.startsWith("add file")) {
224228
AddFile(sql.drop(9))
225-
} else if (sql.startsWith("dfs")) {
229+
} else if (sql.trim.startsWith("dfs")) {
226230
DfsCommand(sql)
227-
} else if (sql.startsWith("source")) {
231+
} else if (sql.trim.startsWith("source")) {
228232
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
229-
} else if (sql.startsWith("!")) {
233+
} else if (sql.trim.startsWith("!")) {
230234
ShellCommand(sql.drop(1))
231235
} else {
232236
val tree = getAst(sql)
@@ -839,11 +843,11 @@ private[hive] object HiveQl {
839843
case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg))
840844
case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg))
841845
case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg))
842-
846+
843847
/* System functions about string operations */
844848
case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg))
845849
case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg))
846-
850+
847851
/* Casts */
848852
case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
849853
Cast(nodeToExpr(arg), StringType)

sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
3232

3333
import org.apache.spark.{SparkConf, SparkContext}
3434
import org.apache.spark.sql.catalyst.analysis._
35-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
35+
import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
3636
import org.apache.spark.sql.catalyst.util._
3737
import org.apache.spark.sql.hive._
3838

@@ -103,7 +103,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
103103
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
104104
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
105105
} else {
106-
new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
106+
new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
107107
File.separator + "resources")
108108
}
109109

@@ -130,6 +130,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
130130
override lazy val analyzed = {
131131
val describedTables = logical match {
132132
case NativeCommand(describedTable(tbl)) => tbl :: Nil
133+
case CacheCommand(tbl, _) => tbl :: Nil
133134
case _ => Nil
134135
}
135136

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest {
5656
TestHive.uncacheTable("src")
5757
}
5858
}
59+
60+
test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
61+
TestHive.hql("CACHE TABLE src")
62+
TestHive.table("src").queryExecution.executedPlan match {
63+
case _: InMemoryColumnarTableScan => // Found evidence of caching
64+
case _ => fail(s"Table 'src' should be cached")
65+
}
66+
assert(TestHive.isCached("src"), "Table 'src' should be cached")
67+
68+
TestHive.hql("UNCACHE TABLE src")
69+
TestHive.table("src").queryExecution.executedPlan match {
70+
case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached")
71+
case _ => // Found evidence of uncaching
72+
}
73+
assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
74+
}
5975
}

0 commit comments

Comments
 (0)