diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index ca69531c69a7..862f78702c4e 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -151,7 +151,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) - | insert | cache + | insert | cache | unCache ) protected lazy val select: Parser[LogicalPlan] = @@ -183,9 +183,17 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } protected lazy val cache: Parser[LogicalPlan] = - (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { - case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache) + CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ { + case tableName ~ None => + CacheCommand(tableName, true) + case tableName ~ Some(plan) => + CacheTableAsSelectCommand(tableName, plan) } + + protected lazy val unCache: Parser[LogicalPlan] = + UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ { + case tableName => CacheCommand(tableName, false) + } protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index a01809c1fc5e..8366639fa0e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -75,3 +75,8 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } + +/** + * Returned for the "CACHE TABLE tableName AS SELECT .." command. + */ +case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7943d6e1b6fb..45687d960404 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) + case logical.CacheTableAsSelectCommand(tableName, plan) => + Seq(execution.CacheTableAsSelectCommand(tableName, plan)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 94543fc95b47..c2f48a902a3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -166,3 +166,21 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( child.output.map(field => Row(field.name, field.dataType.toString, null)) } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult = { + import sqlContext._ + logicalPlan.registerTempTable(tableName) + cacheTable(tableName) + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index befef46d9397..591592841e9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -119,4 +119,17 @@ class CachedTableSuite extends QueryTest { } assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") } + + test("CACHE TABLE tableName AS SELECT Star Table") { + TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") + TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() + assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + TestSQLContext.uncacheTable("testCacheTable") + } + + test("'CACHE TABLE tableName AS SELECT ..'") { + TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") + assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + TestSQLContext.uncacheTable("testCacheTable") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 21ecf17028db..0aa6292c0184 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -229,7 +229,12 @@ private[hive] object HiveQl { SetCommand(Some(key), Some(value)) } } else if (sql.trim.toLowerCase.startsWith("cache table")) { - CacheCommand(sql.trim.drop(12).trim, true) + sql.trim.drop(12).trim.split(" ").toSeq match { + case Seq(tableName) => + CacheCommand(tableName, true) + case Seq(tableName, _, select @ _*) => + CacheTableAsSelectCommand(tableName, createPlan(select.mkString(" ").trim)) + } } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { @@ -243,15 +248,7 @@ private[hive] object HiveQl { } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { - val tree = getAst(sql) - if (nativeCommands contains tree.getText) { - NativeCommand(sql) - } else { - nodeToPlan(tree) match { - case NativePlaceholder => NativeCommand(sql) - case other => other - } - } + createPlan(sql) } } catch { case e: Exception => throw new ParseException(sql, e) @@ -262,6 +259,19 @@ private[hive] object HiveQl { """.stripMargin) } } + + /** Creates LogicalPlan for a given HiveQL string. */ + def createPlan(sql: String) = { + val tree = getAst(sql) + if (nativeCommands contains tree.getText) { + NativeCommand(sql) + } else { + nodeToPlan(tree) match { + case NativePlaceholder => NativeCommand(sql) + case other => other + } + } + } def parseDdl(ddl: String): Seq[Attribute] = { val tree =