From b803fc80efec026784b87c468b2597e5efbb6708 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Thu, 11 Sep 2014 15:53:45 +0530 Subject: [PATCH 01/18] Add CACHE TABLE AS SELECT ... This feature allows user to add cache table from the select query. Example : ADD CACHE TABLE AS SELECT * FROM TEST_TABLE. Spark takes this type of SQL as command and it does eager caching. It can be executed from SQLContext and HiveContext. Signed-off-by: ravipesala --- .../apache/spark/sql/catalyst/SqlParser.scala | 9 +++++++- .../sql/catalyst/plans/logical/commands.scala | 5 +++++ .../spark/sql/execution/SparkStrategies.scala | 2 ++ .../apache/spark/sql/execution/commands.scala | 21 +++++++++++++++++++ .../apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++ .../org/apache/spark/sql/hive/HiveQl.scala | 13 +++++++++++- 6 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index ca69531c69a7..c0f1314a2349 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -127,6 +127,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val SUBSTRING = Keyword("SUBSTRING") protected val SQRT = Keyword("SQRT") protected val ABS = Keyword("ABS") + protected val ADD = Keyword("ADD") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -151,7 +152,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) - | insert | cache + | insert | cache | addCache ) protected lazy val select: Parser[LogicalPlan] = @@ -181,6 +182,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) } + + protected lazy val addCache: Parser[LogicalPlan] = + ADD ~ CACHE ~ TABLE ~> ident ~ AS ~ select <~ opt(";") ^^ { + case tableName ~ as ~ s => + CacheTableAsSelectCommand(tableName,s) + } protected lazy val cache: Parser[LogicalPlan] = (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index a01809c1fc5e..948fd53a3d8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -75,3 +75,8 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } + +/** + * Returned for the "ADD CACHE TABLE tableName AS SELECT .." command. + */ +case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7943d6e1b6fb..aa205d278b5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) + case logical.CacheTableAsSelectCommand(tableName,plan) => + Seq(execution.CacheTableAsSelectCommand(tableName, plan)(context)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 94543fc95b47..5bbf783ea875 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -166,3 +166,24 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( child.output.map(field => Row(field.name, field.dataType.toString, null)) } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class CacheTableAsSelectCommand(tableName: String,plan: LogicalPlan)( + @transient context: SQLContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult = { + context.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) + context.cacheTable(tableName) + //It does the caching eager. + //TODO : Does it really require to collect? + context.table(tableName).collect + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index befef46d9397..a5bc74170833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -119,4 +119,20 @@ class CachedTableSuite extends QueryTest { } assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") } + + test("ADD CACHE TABLE tableName AS SELECT Star Table") { + TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") + TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() + TestSQLContext.uncacheTable("testCacheTable") + } + + test("'ADD CACHE TABLE tableName AS SELECT ..'") { + TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") + TestSQLContext.table("testCacheTable").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'testCacheTable' should be cached") + } + assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + TestSQLContext.uncacheTable("testCacheTable") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index c98287c6aa66..104a81680a05 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,6 +216,17 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { + if (sql.trim.toLowerCase.startsWith("add cache table")) { + sql.trim.drop(16).split(" ").toSeq match { + case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,processSql(sql.trim.drop(16+tableName.length()+as.length()+1))) + } + } else { + processSql(sql) + } + } + + /** Returns a LogicalPlan for a given HiveQL string. */ + def processSql(sql : String) = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" @@ -233,7 +244,7 @@ private[hive] object HiveQl { } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - AddJar(sql.trim.drop(8).trim) + NativeCommand(sql) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { From 4e858d83b0020a1701ed65eac7047ee2978329db Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 18:06:49 +0530 Subject: [PATCH 02/18] Updated parser to support add cache table command --- .../org/apache/spark/sql/hive/HiveQl.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 104a81680a05..ae2256cda412 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -214,19 +214,9 @@ private[hive] object HiveQl { */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) + /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql: String): LogicalPlan = { - if (sql.trim.toLowerCase.startsWith("add cache table")) { - sql.trim.drop(16).split(" ").toSeq match { - case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,processSql(sql.trim.drop(16+tableName.length()+as.length()+1))) - } - } else { - processSql(sql) - } - } - - /** Returns a LogicalPlan for a given HiveQL string. */ - def processSql(sql : String) = { + def parseSql(sql : String) = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" @@ -254,14 +244,12 @@ private[hive] object HiveQl { } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { - val tree = getAst(sql) - if (nativeCommands contains tree.getText) { - NativeCommand(sql) + if (sql.trim.toLowerCase.startsWith("add cache table")) { + sql.trim.drop(16).split(" ").toSeq match { + case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,createPlan(sql.trim.drop(16+tableName.length()+as.length()+1))) + } } else { - nodeToPlan(tree) match { - case NativePlaceholder => NativeCommand(sql) - case other => other - } + createPlan(sql) } } } catch { @@ -274,6 +262,18 @@ private[hive] object HiveQl { } } + def createPlan(sql: String) ={ + val tree = getAst(sql) + if (nativeCommands contains tree.getText) { + NativeCommand(sql) + } else { + nodeToPlan(tree) match { + case NativePlaceholder => NativeCommand(sql) + case other => other + } + } + } + def parseDdl(ddl: String): Seq[Attribute] = { val tree = try { From 13c8e27c33e8934bbd6fb458536675e97c3d8798 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 22:45:10 +0530 Subject: [PATCH 03/18] Updated parser to support add cache table command --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ae2256cda412..deecd5d95e79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,7 +216,7 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql : String) = { + def parseSql(sql : String): LogicalPlan = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" From 7459ce36775126f4c0636585c1d29f30ab35fd06 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 23:09:28 +0530 Subject: [PATCH 04/18] Added comment --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index deecd5d95e79..6f945d90673c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -262,6 +262,7 @@ private[hive] object HiveQl { } } + /** Creates LogicalPlan for a given HiveQL string. */ def createPlan(sql: String) ={ val tree = getAst(sql) if (nativeCommands contains tree.getText) { From 6758f808d14ec7a3da0953f7720f7f5b9a4e8a85 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 23:37:25 +0530 Subject: [PATCH 05/18] Changed style --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 6f945d90673c..b3258b62d592 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,7 +216,7 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql : String): LogicalPlan = { + def parseSql(sql: String): LogicalPlan = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" From eebc0c17f039d5a281aa4fef07d255daca3b8862 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Thu, 11 Sep 2014 15:53:45 +0530 Subject: [PATCH 06/18] Add CACHE TABLE AS SELECT ... This feature allows user to add cache table from the select query. Example : ADD CACHE TABLE AS SELECT * FROM TEST_TABLE. Spark takes this type of SQL as command and it does eager caching. It can be executed from SQLContext and HiveContext. Signed-off-by: ravipesala --- .../apache/spark/sql/catalyst/SqlParser.scala | 9 +++++++- .../sql/catalyst/plans/logical/commands.scala | 5 +++++ .../spark/sql/execution/SparkStrategies.scala | 2 ++ .../apache/spark/sql/execution/commands.scala | 21 +++++++++++++++++++ .../apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++ .../org/apache/spark/sql/hive/HiveQl.scala | 13 +++++++++++- 6 files changed, 64 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index ca69531c69a7..c0f1314a2349 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -127,6 +127,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val SUBSTRING = Keyword("SUBSTRING") protected val SQRT = Keyword("SQRT") protected val ABS = Keyword("ABS") + protected val ADD = Keyword("ADD") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -151,7 +152,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) - | insert | cache + | insert | cache | addCache ) protected lazy val select: Parser[LogicalPlan] = @@ -181,6 +182,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) } + + protected lazy val addCache: Parser[LogicalPlan] = + ADD ~ CACHE ~ TABLE ~> ident ~ AS ~ select <~ opt(";") ^^ { + case tableName ~ as ~ s => + CacheTableAsSelectCommand(tableName,s) + } protected lazy val cache: Parser[LogicalPlan] = (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index a01809c1fc5e..948fd53a3d8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -75,3 +75,8 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } + +/** + * Returned for the "ADD CACHE TABLE tableName AS SELECT .." command. + */ +case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7943d6e1b6fb..aa205d278b5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) + case logical.CacheTableAsSelectCommand(tableName,plan) => + Seq(execution.CacheTableAsSelectCommand(tableName, plan)(context)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 94543fc95b47..5bbf783ea875 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -166,3 +166,24 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( child.output.map(field => Row(field.name, field.dataType.toString, null)) } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class CacheTableAsSelectCommand(tableName: String,plan: LogicalPlan)( + @transient context: SQLContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult = { + context.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) + context.cacheTable(tableName) + //It does the caching eager. + //TODO : Does it really require to collect? + context.table(tableName).collect + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index befef46d9397..a5bc74170833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -119,4 +119,20 @@ class CachedTableSuite extends QueryTest { } assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") } + + test("ADD CACHE TABLE tableName AS SELECT Star Table") { + TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") + TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() + TestSQLContext.uncacheTable("testCacheTable") + } + + test("'ADD CACHE TABLE tableName AS SELECT ..'") { + TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") + TestSQLContext.table("testCacheTable").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'testCacheTable' should be cached") + } + assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + TestSQLContext.uncacheTable("testCacheTable") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 21ecf17028db..c62fd04e7da1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,6 +216,17 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { + if (sql.trim.toLowerCase.startsWith("add cache table")) { + sql.trim.drop(16).split(" ").toSeq match { + case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,processSql(sql.trim.drop(16+tableName.length()+as.length()+1))) + } + } else { + processSql(sql) + } + } + + /** Returns a LogicalPlan for a given HiveQL string. */ + def processSql(sql : String) = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" @@ -233,7 +244,7 @@ private[hive] object HiveQl { } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - AddJar(sql.trim.drop(8).trim) + NativeCommand(sql) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { From b5276b22c8e0c271e98f445079ea2e3cf61db6dc Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 18:06:49 +0530 Subject: [PATCH 07/18] Updated parser to support add cache table command --- .../org/apache/spark/sql/hive/HiveQl.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index c62fd04e7da1..a9b12ce13764 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -214,19 +214,9 @@ private[hive] object HiveQl { */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) + /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql: String): LogicalPlan = { - if (sql.trim.toLowerCase.startsWith("add cache table")) { - sql.trim.drop(16).split(" ").toSeq match { - case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,processSql(sql.trim.drop(16+tableName.length()+as.length()+1))) - } - } else { - processSql(sql) - } - } - - /** Returns a LogicalPlan for a given HiveQL string. */ - def processSql(sql : String) = { + def parseSql(sql : String) = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" @@ -254,14 +244,12 @@ private[hive] object HiveQl { } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { - val tree = getAst(sql) - if (nativeCommands contains tree.getText) { - NativeCommand(sql) + if (sql.trim.toLowerCase.startsWith("add cache table")) { + sql.trim.drop(16).split(" ").toSeq match { + case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,createPlan(sql.trim.drop(16+tableName.length()+as.length()+1))) + } } else { - nodeToPlan(tree) match { - case NativePlaceholder => NativeCommand(sql) - case other => other - } + createPlan(sql) } } } catch { @@ -274,6 +262,18 @@ private[hive] object HiveQl { } } + def createPlan(sql: String) ={ + val tree = getAst(sql) + if (nativeCommands contains tree.getText) { + NativeCommand(sql) + } else { + nodeToPlan(tree) match { + case NativePlaceholder => NativeCommand(sql) + case other => other + } + } + } + def parseDdl(ddl: String): Seq[Attribute] = { val tree = try { From dc3389557d3c14ccbc713a745fcb1a0c97bf8726 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 22:45:10 +0530 Subject: [PATCH 08/18] Updated parser to support add cache table command --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a9b12ce13764..f49fc88f4b75 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,7 +216,7 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql : String) = { + def parseSql(sql : String): LogicalPlan = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" From aaf5b59ea71a9ccdc33a8cda7ee33c3341020c4d Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 23:09:28 +0530 Subject: [PATCH 09/18] Added comment --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index f49fc88f4b75..46fbfd595c0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -262,6 +262,7 @@ private[hive] object HiveQl { } } + /** Creates LogicalPlan for a given HiveQL string. */ def createPlan(sql: String) ={ val tree = getAst(sql) if (nativeCommands contains tree.getText) { From 724b9db63258936bf0d00cda44ca4d4ea4ff2dc5 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 13 Sep 2014 23:37:25 +0530 Subject: [PATCH 10/18] Changed style --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 46fbfd595c0d..b8c5d903c5c3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,7 +216,7 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql : String): LogicalPlan = { + def parseSql(sql: String): LogicalPlan = { try { if (sql.trim.toLowerCase.startsWith("set")) { // Split in two parts since we treat the part before the first "=" From e3265d0773515821b1a908bb94025ac79807e325 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 15 Sep 2014 03:16:09 +0530 Subject: [PATCH 11/18] Updated the code as per the comments by Admin in pull request. --- .../apache/spark/sql/catalyst/SqlParser.scala | 20 ++++++++------- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../apache/spark/sql/execution/commands.scala | 12 ++++----- .../apache/spark/sql/CachedTableSuite.scala | 12 +++------ .../org/apache/spark/sql/hive/HiveQl.scala | 25 +++++++++---------- 5 files changed, 33 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index c0f1314a2349..861ab81523d3 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -152,7 +152,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) - | insert | cache | addCache + | insert | cache | unCache ) protected lazy val select: Parser[LogicalPlan] = @@ -182,17 +182,19 @@ class SqlParser extends StandardTokenParsers with PackratParsers { val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) } - - protected lazy val addCache: Parser[LogicalPlan] = - ADD ~ CACHE ~ TABLE ~> ident ~ AS ~ select <~ opt(";") ^^ { - case tableName ~ as ~ s => - CacheTableAsSelectCommand(tableName,s) - } protected lazy val cache: Parser[LogicalPlan] = - (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { - case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache) + CACHE ~ TABLE ~> ident ~ opt(AS) ~ opt(select) <~ opt(";") ^^ { + case tableName ~ None ~ None => + CacheCommand(tableName, true) + case tableName ~ as ~ Some(plan) => + CacheTableAsSelectCommand(tableName,plan) } + + protected lazy val unCache: Parser[LogicalPlan] = + UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ { + case tableName => CacheCommand(tableName, false) + } protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index aa205d278b5b..2b84fc988911 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -306,7 +306,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) case logical.CacheTableAsSelectCommand(tableName,plan) => - Seq(execution.CacheTableAsSelectCommand(tableName, plan)(context)) + Seq(execution.CacheTableAsSelectCommand(tableName, plan)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 5bbf783ea875..1535291f7908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -171,16 +171,14 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( * :: DeveloperApi :: */ @DeveloperApi -case class CacheTableAsSelectCommand(tableName: String,plan: LogicalPlan)( - @transient context: SQLContext) +case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends LeafNode with Command { override protected[sql] lazy val sideEffectResult = { - context.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) - context.cacheTable(tableName) - //It does the caching eager. - //TODO : Does it really require to collect? - context.table(tableName).collect + sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) + sqlContext.cacheTable(tableName) + // It does the caching eager. + sqlContext.table(tableName).count Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index a5bc74170833..5c9a4750eb45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -120,18 +120,14 @@ class CachedTableSuite extends QueryTest { assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") } - test("ADD CACHE TABLE tableName AS SELECT Star Table") { - TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") + test("CACHE TABLE tableName AS SELECT Star Table") { + TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() TestSQLContext.uncacheTable("testCacheTable") } - test("'ADD CACHE TABLE tableName AS SELECT ..'") { - TestSQLContext.sql("ADD CACHE TABLE testCacheTable AS SELECT * FROM testData") - TestSQLContext.table("testCacheTable").queryExecution.executedPlan match { - case _: InMemoryColumnarTableScan => // Found evidence of caching - case _ => fail(s"Table 'testCacheTable' should be cached") - } + test("'CACHE TABLE tableName AS SELECT ..'") { + TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") TestSQLContext.uncacheTable("testCacheTable") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b8c5d903c5c3..6fa5e11df357 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -214,7 +214,6 @@ private[hive] object HiveQl { */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) - /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { try { @@ -230,11 +229,17 @@ private[hive] object HiveQl { SetCommand(Some(key), Some(value)) } } else if (sql.trim.toLowerCase.startsWith("cache table")) { - CacheCommand(sql.trim.drop(12).trim, true) + sql.trim.drop(12).trim.split(" ").toSeq match { + case Seq(tableName) => + CacheCommand(tableName, true) + case Seq(tableName,as, select@_*) => + CacheTableAsSelectCommand(tableName, + createPlan(sql.trim.drop(12 + tableName.length() + as.length() + 2))) + } } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - NativeCommand(sql) + AddJar(sql.trim.drop(8).trim) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { @@ -244,13 +249,7 @@ private[hive] object HiveQl { } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { - if (sql.trim.toLowerCase.startsWith("add cache table")) { - sql.trim.drop(16).split(" ").toSeq match { - case Seq(tableName,as, xs@_*) => CacheTableAsSelectCommand(tableName,createPlan(sql.trim.drop(16+tableName.length()+as.length()+1))) - } - } else { - createPlan(sql) - } + createPlan(sql) } } catch { case e: Exception => throw new ParseException(sql, e) @@ -261,9 +260,9 @@ private[hive] object HiveQl { """.stripMargin) } } - + /** Creates LogicalPlan for a given HiveQL string. */ - def createPlan(sql: String) ={ + def createPlan(sql: String) = { val tree = getAst(sql) if (nativeCommands contains tree.getText) { NativeCommand(sql) @@ -1109,7 +1108,7 @@ private[hive] object HiveQl { case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) => HiveGenericUdtf(functionName, attributes, children.map(nodeToExpr)) - + case a: ASTNode => throw new NotImplementedError( s"""No parse rules for ASTNode type: ${a.getType}, text: ${a.getText}, tree: From d8b37b25cb893bf130d403011425161ae89dd187 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 15 Sep 2014 11:32:55 +0530 Subject: [PATCH 12/18] Updated as per the comments by Admin --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 12 +++--------- .../spark/sql/catalyst/plans/logical/commands.scala | 2 +- .../apache/spark/sql/execution/SparkStrategies.scala | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index be86fbdcaf3c..144cd1d22a24 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -182,18 +182,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { val overwrite: Boolean = o.getOrElse("") == "OVERWRITE" InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) } - - protected lazy val addCache: Parser[LogicalPlan] = - ADD ~ CACHE ~ TABLE ~> ident ~ AS ~ select <~ opt(";") ^^ { - case tableName ~ as ~ s => - CacheTableAsSelectCommand(tableName,s) - } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS) ~ opt(select) <~ opt(";") ^^ { - case tableName ~ None ~ None => + CACHE ~ TABLE ~> ident ~ opt(AS ~ select) <~ opt(";") ^^ { + case tableName ~ None => CacheCommand(tableName, true) - case tableName ~ as ~ Some(plan) => + case tableName ~ Some(as ~ plan) => CacheTableAsSelectCommand(tableName,plan) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 948fd53a3d8c..8366639fa0e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -77,6 +77,6 @@ case class DescribeCommand( } /** - * Returned for the "ADD CACHE TABLE tableName AS SELECT .." command. + * Returned for the "CACHE TABLE tableName AS SELECT .." command. */ case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2b84fc988911..45687d960404 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -305,7 +305,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) case logical.CacheCommand(tableName, cache) => Seq(execution.CacheCommand(tableName, cache)(context)) - case logical.CacheTableAsSelectCommand(tableName,plan) => + case logical.CacheTableAsSelectCommand(tableName, plan) => Seq(execution.CacheTableAsSelectCommand(tableName, plan)) case _ => Nil } From 8c9993cb2786a5c23bdb2328eb46a28823e1f9c6 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 15 Sep 2014 11:38:24 +0530 Subject: [PATCH 13/18] Changed the style --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 144cd1d22a24..7c7116cd2737 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -188,7 +188,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { case tableName ~ None => CacheCommand(tableName, true) case tableName ~ Some(as ~ plan) => - CacheTableAsSelectCommand(tableName,plan) + CacheTableAsSelectCommand(tableName, plan) } protected lazy val unCache: Parser[LogicalPlan] = From fb1759bc4f4db17a321041c2167d86d431b0132e Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 15 Sep 2014 11:56:54 +0530 Subject: [PATCH 14/18] Updated as per Admin comments --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 1 - .../test/scala/org/apache/spark/sql/CachedTableSuite.scala | 1 + .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 5 ++--- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 7c7116cd2737..068cb49ef6d3 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -127,7 +127,6 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val SUBSTRING = Keyword("SUBSTRING") protected val SQRT = Keyword("SQRT") protected val ABS = Keyword("ABS") - protected val ADD = Keyword("ADD") // Use reflection to find the reserved words defined in this class. protected val reservedWords = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 5c9a4750eb45..591592841e9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -123,6 +123,7 @@ class CachedTableSuite extends QueryTest { test("CACHE TABLE tableName AS SELECT Star Table") { TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect() + assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached") TestSQLContext.uncacheTable("testCacheTable") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4db533cdf496..7977494f9b6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -213,7 +213,6 @@ private[hive] object HiveQl { * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) - /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { @@ -240,7 +239,7 @@ private[hive] object HiveQl { } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - NativeCommand(sql) + AddJar(sql.trim.drop(8).trim) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { @@ -1109,7 +1108,7 @@ private[hive] object HiveQl { case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) => HiveGenericUdtf(functionName, attributes, children.map(nodeToExpr)) - + case a: ASTNode => throw new NotImplementedError( s"""No parse rules for ASTNode type: ${a.getType}, text: ${a.getText}, tree: From 394d5ca28fd39a5785b6eca7f6c476701df31702 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 15 Sep 2014 12:00:30 +0530 Subject: [PATCH 15/18] Changed style --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 7977494f9b6e..86d2aad71607 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -213,7 +213,7 @@ private[hive] object HiveQl { * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) - + /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { try { From d6e469dde1f6362029b1a8bd66db06508c333b35 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Tue, 16 Sep 2014 16:09:16 +0530 Subject: [PATCH 16/18] Code review comments by Admin are handled. --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 068cb49ef6d3..862f78702c4e 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -183,10 +183,10 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~ select) <~ opt(";") ^^ { + CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ { case tableName ~ None => CacheCommand(tableName, true) - case tableName ~ Some(as ~ plan) => + case tableName ~ Some(plan) => CacheTableAsSelectCommand(tableName, plan) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 86d2aad71607..0aa6292c0184 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -232,9 +232,8 @@ private[hive] object HiveQl { sql.trim.drop(12).trim.split(" ").toSeq match { case Seq(tableName) => CacheCommand(tableName, true) - case Seq(tableName,as, select@_*) => - CacheTableAsSelectCommand(tableName, - createPlan(sql.trim.drop(12 + tableName.length() + as.length() + 2))) + case Seq(tableName, _, select @ _*) => + CacheTableAsSelectCommand(tableName, createPlan(select.mkString(" ").trim)) } } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) From 8059cd261dd079ab583531484dd02af452a74a18 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Wed, 17 Sep 2014 01:39:11 +0530 Subject: [PATCH 17/18] Changed the behaviour from eager caching to lazy caching. --- .../main/scala/org/apache/spark/sql/execution/commands.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 1535291f7908..fd711d8ffb73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -177,8 +177,6 @@ case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) override protected[sql] lazy val sideEffectResult = { sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) sqlContext.cacheTable(tableName) - // It does the caching eager. - sqlContext.table(tableName).count Seq.empty[Row] } From a5f0beb395836c76b3e7883ef7f1f61433645500 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Thu, 18 Sep 2014 12:24:33 +0530 Subject: [PATCH 18/18] Simplified the code as per Admin comment. --- .../scala/org/apache/spark/sql/execution/commands.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index fd711d8ffb73..c2f48a902a3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -171,12 +171,13 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( * :: DeveloperApi :: */ @DeveloperApi -case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) +case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan) extends LeafNode with Command { override protected[sql] lazy val sideEffectResult = { - sqlContext.catalog.registerTable(None, tableName, sqlContext.executePlan(plan).analyzed) - sqlContext.cacheTable(tableName) + import sqlContext._ + logicalPlan.registerTempTable(tableName) + cacheTable(tableName) Seq.empty[Row] }