Skip to content

Commit 15ec6d2

Browse files
committed
Added "(UN)CACHE TABLE" SQL/HiveQL statements
1 parent a9a461c commit 15ec6d2

File tree

8 files changed

+78
-8
lines changed

8 files changed

+78
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
111111
protected val APPROXIMATE = Keyword("APPROXIMATE")
112112
protected val AVG = Keyword("AVG")
113113
protected val BY = Keyword("BY")
114+
protected val CACHE = Keyword("CACHE")
114115
protected val CAST = Keyword("CAST")
115116
protected val COUNT = Keyword("COUNT")
116117
protected val DESC = Keyword("DESC")
@@ -149,7 +150,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
149150
protected val SEMI = Keyword("SEMI")
150151
protected val STRING = Keyword("STRING")
151152
protected val SUM = Keyword("SUM")
153+
protected val TABLE = Keyword("TABLE")
152154
protected val TRUE = Keyword("TRUE")
155+
protected val UNCACHE = Keyword("UNCACHE")
153156
protected val UNION = Keyword("UNION")
154157
protected val WHERE = Keyword("WHERE")
155158

@@ -189,7 +192,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
189192
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
190193
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
191194
)
192-
| insert
195+
| insert | cache
193196
)
194197

195198
protected lazy val select: Parser[LogicalPlan] =
@@ -220,6 +223,11 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
220223
InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
221224
}
222225

226+
protected lazy val cache: Parser[LogicalPlan] =
227+
(CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
228+
case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
229+
}
230+
223231
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
224232

225233
protected lazy val projection: Parser[Expression] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
129129
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
130130
}
131131

132+
/**
133+
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
134+
*/
135+
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
136+
132137
/**
133138
* A logical plan node with single child.
134139
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
239239
case logical.SetCommand(key, value) =>
240240
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
241241
case logical.ExplainCommand(child) =>
242-
val qe = context.executePlan(child)
243-
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
242+
val executedPlan = context.executePlan(child).executedPlan
243+
Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
244+
case logical.CacheCommand(tableName, cache) =>
245+
Seq(execution.CacheCommandPhysical(tableName, cache)(context))
244246
case _ => Nil
245247
}
246248
}
247-
248249
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
6565

6666
override def otherCopyArgs = context :: Nil
6767
}
68+
69+
/**
70+
* :: DeveloperApi ::
71+
*/
72+
@DeveloperApi
73+
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
74+
extends LeafNode {
75+
76+
lazy val commandSideEffect = {
77+
if (doCache) {
78+
context.cacheTable(tableName)
79+
} else {
80+
context.uncacheTable(tableName)
81+
}
82+
}
83+
84+
override def execute(): RDD[Row] = {
85+
commandSideEffect
86+
context.emptyResult
87+
}
88+
89+
override def output: Seq[Attribute] = Seq.empty
90+
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,18 @@ class CachedTableSuite extends QueryTest {
7070
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
7171
TestSQLContext.uncacheTable("testData")
7272
}
73+
74+
test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
75+
TestSQLContext.sql("CACHE TABLE testData")
76+
TestSQLContext.table("testData").queryExecution.executedPlan match {
77+
case _: InMemoryColumnarTableScan => // Found evidence of caching
78+
case _ => fail(s"Table 'testData' should be cached")
79+
}
80+
81+
TestSQLContext.sql("UNCACHE TABLE testData")
82+
TestSQLContext.table("testData").queryExecution.executedPlan match {
83+
case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached")
84+
case _ => // Found evidence of uncaching
85+
}
86+
}
7387
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ private[hive] object HiveQl {
218218
case Array(key, value) => // "set key=value"
219219
SetCommand(Some(key), Some(value))
220220
}
221+
} else if (sql.toLowerCase.startsWith("cache table")) {
222+
CacheCommand(sql.drop(12).trim, true)
223+
} else if (sql.toLowerCase.startsWith("uncache table")) {
224+
CacheCommand(sql.drop(14).trim, false)
221225
} else if (sql.toLowerCase.startsWith("add jar")) {
222226
AddJar(sql.drop(8))
223227
} else if (sql.toLowerCase.startsWith("add file")) {
@@ -839,11 +843,11 @@ private[hive] object HiveQl {
839843
case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg))
840844
case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg))
841845
case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg))
842-
846+
843847
/* System functions about string operations */
844848
case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg))
845849
case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg))
846-
850+
847851
/* Casts */
848852
case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
849853
Cast(nodeToExpr(arg), StringType)

sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
3232

3333
import org.apache.spark.{SparkConf, SparkContext}
3434
import org.apache.spark.sql.catalyst.analysis._
35-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
35+
import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
3636
import org.apache.spark.sql.catalyst.util._
3737
import org.apache.spark.sql.hive._
3838

@@ -103,7 +103,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
103103
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
104104
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
105105
} else {
106-
new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
106+
new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
107107
File.separator + "resources")
108108
}
109109

@@ -130,6 +130,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
130130
override lazy val analyzed = {
131131
val describedTables = logical match {
132132
case NativeCommand(describedTable(tbl)) => tbl :: Nil
133+
case CacheCommand(tbl, _) => tbl :: Nil
133134
case _ => Nil
134135
}
135136

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,18 @@ class CachedTableSuite extends HiveComparisonTest {
5656
TestHive.uncacheTable("src")
5757
}
5858
}
59+
60+
test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
61+
TestHive.hql("CACHE TABLE src")
62+
TestHive.table("src").queryExecution.executedPlan match {
63+
case _: InMemoryColumnarTableScan => // Found evidence of caching
64+
case _ => fail(s"Table 'src' should be cached")
65+
}
66+
67+
TestHive.hql("UNCACHE TABLE src")
68+
TestHive.table("src").queryExecution.executedPlan match {
69+
case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached")
70+
case _ => // Found evidence of uncaching
71+
}
72+
}
5973
}

0 commit comments

Comments
 (0)