From 90a14a7e98a7934817dba292cd1bc35ad49ddbba Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 26 Jan 2015 23:29:10 +0800 Subject: [PATCH 1/4] Implement Describe Table for SQLContext --- .../org/apache/spark/sql/SparkSQLParser.scala | 14 ++++++++++++-- .../apache/spark/sql/execution/commands.scala | 15 +++++++++++---- .../org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ .../org/apache/spark/sql/hive/HiveQl.scala | 17 +---------------- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index f1a4053b7911..7a00cb202778 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + import scala.util.parsing.combinator.RegexParsers import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand} +import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand, DescribeCommand} import org.apache.spark.sql.types.StringType @@ -57,12 +59,14 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val AS = Keyword("AS") protected val CACHE = Keyword("CACHE") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") protected val TABLE = Keyword("TABLE") protected val UNCACHE = Keyword("UNCACHE") - override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others + override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | describe | others private lazy val cache: Parser[LogicalPlan] = CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { @@ -80,6 +84,12 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr case input => SetCommandParser(input) } + private lazy val describe: Parser[LogicalPlan] = + DESCRIBE ~> EXTENDED.? ~ ident ^^ { + case isExtended ~ tableIdent => + DescribeCommand((UnresolvedRelation(Seq(tableIdent),None)),false) + } + private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 52a31f01a435..788cf0fa7cfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -171,10 +172,16 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { */ @DeveloperApi case class DescribeCommand( - child: SparkPlan, - override val output: Seq[Attribute]) extends RunnableCommand { + logicalPlan: LogicalPlan, + isExtended: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - child.output.map(field => Row(field.name, field.dataType.toString, null)) + val plan = sqlContext.executePlan(logicalPlan).analyzed + plan.output.map(field => Row(field.name, field.dataType.toString, null)) } + + override val output: Seq[Attribute] = Seq( + AttributeReference("col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("comment", StringType, nullable = true)()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 03b44ca1d669..b46cfdf22036 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1034,4 +1034,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { rdd.registerTempTable("distinctData") checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2)) } + + test("describe table") { + checkAnswer(sql("DESCRIBE EXTENDED testData"),Seq( + Row("key","IntegerType",null), Row("value","StringType",null) + )) + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 5e29e57d9358..64c105b171ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.DescribeCommand import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable} import org.apache.spark.sql.types._ @@ -46,22 +47,6 @@ import scala.collection.JavaConversions._ */ private[hive] case object NativePlaceholder extends Command -/** - * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. - * @param table The table to be described. - * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. - * It is effective only when the table is a Hive table. - */ -case class DescribeCommand( - table: LogicalPlan, - isExtended: Boolean) extends Command { - override def output = Seq( - // Column names are based on Hive. - AttributeReference("col_name", StringType, nullable = false)(), - AttributeReference("data_type", StringType, nullable = false)(), - AttributeReference("comment", StringType, nullable = false)()) -} - /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( From 34508a3dcd8d3d9b212c15767ac096845e76b44d Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 27 Jan 2015 00:10:54 +0800 Subject: [PATCH 2/4] modify HiveStrategies to adapt DescribeCommand interface --- .../main/scala/org/apache/spark/sql/execution/commands.scala | 4 ++-- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 788cf0fa7cfa..110fab335839 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -172,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { */ @DeveloperApi case class DescribeCommand( - logicalPlan: LogicalPlan, + table: LogicalPlan, isExtended: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - val plan = sqlContext.executePlan(logicalPlan).analyzed + val plan = sqlContext.executePlan(table).analyzed plan.output.map(field => Row(field.name, field.dataType.toString, null)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6952b126cf89..53b58c2d1868 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -229,7 +229,7 @@ private[hive] trait HiveStrategies { ExecutedCommand( DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil case o: LogicalPlan => - ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil + ExecutedCommand(RunnableDescribeCommand(o, describe.isExtended)) :: Nil } case _ => Nil From a3bc0c92ba31eea47fdad694c856de71e50db165 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 27 Jan 2015 17:23:49 +0800 Subject: [PATCH 3/4] Move Describe command parser to ddl.scala --- .../org/apache/spark/sql/SparkSQLParser.scala | 15 ++------------- .../scala/org/apache/spark/sql/sources/ddl.scala | 15 +++++++++++++-- .../sql/hive/execution/HiveComparisonTest.scala | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index 7a00cb202778..8aaa83def965 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql - -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation - import scala.util.parsing.combinator.RegexParsers import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand, DescribeCommand} +import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand} import org.apache.spark.sql.types.StringType @@ -59,14 +56,12 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr protected val AS = Keyword("AS") protected val CACHE = Keyword("CACHE") - protected val DESCRIBE = Keyword("DESCRIBE") - protected val EXTENDED = Keyword("EXTENDED") protected val LAZY = Keyword("LAZY") protected val SET = Keyword("SET") protected val TABLE = Keyword("TABLE") protected val UNCACHE = Keyword("UNCACHE") - override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | describe | others + override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others private lazy val cache: Parser[LogicalPlan] = CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { @@ -84,12 +79,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr case input => SetCommandParser(input) } - private lazy val describe: Parser[LogicalPlan] = - DESCRIBE ~> EXTENDED.? ~ ident ^^ { - case isExtended ~ tableIdent => - DescribeCommand((UnresolvedRelation(Seq(tableIdent),None)),false) - } - private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 171b816a2633..38b470cadf88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.sources +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + import scala.language.implicitConversions import org.apache.spark.Logging import org.apache.spark.sql.{SchemaRDD, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.AbstractSparkSQLParser -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.{DescribeCommand, RunnableCommand} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -59,6 +61,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val TABLE = Keyword("TABLE") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") // Data types. protected val STRING = Keyword("STRING") @@ -80,7 +84,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected lazy val ddl: Parser[LogicalPlan] = createTable - protected def start: Parser[LogicalPlan] = ddl + protected def start: Parser[LogicalPlan] = ddl | describe /** * `CREATE [TEMPORARY] TABLE avroTable @@ -164,6 +168,13 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { mapType | structType | primitiveType + + private lazy val describe: Parser[LogicalPlan] = + DESCRIBE ~> EXTENDED.? ~ rep1sep(ident, ".") ^^ { + case isExtended ~ tableIdent => + DescribeCommand((UnresolvedRelation(tableIdent,None)),false) + } + } object ResolvedDataSource { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index f8a957d55d57..ad1d9f8ec342 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -23,7 +23,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.Logging import org.apache.spark.sql.execution.{SetCommand, ExplainCommand} -import org.apache.spark.sql.hive.DescribeCommand +import org.apache.spark.sql.execution.DescribeCommand import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ From 27c449d9e2862c10a31298310ef9d2e51c2cb781 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 27 Jan 2015 17:25:39 +0800 Subject: [PATCH 4/4] Update SparkSQLParser.scala --- .../src/main/scala/org/apache/spark/sql/SparkSQLParser.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index 8aaa83def965..f1a4053b7911 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql + import scala.util.parsing.combinator.RegexParsers import org.apache.spark.sql.catalyst.AbstractSparkSQLParser