Skip to content

Commit 4d8d070

Browse files
OopsOutOfMemoryrxin
authored andcommitted
[SPARK-5135][SQL] Add support for describe table to DDL in SQLContext
Hi, rxin marmbrus I considered your suggestion (in apache#4127) and now re-write it. This is now up-to-date. Could u please review it ? Author: OopsOutOfMemory <[email protected]> Closes apache#4227 from OopsOutOfMemory/describe and squashes the following commits: 053826f [OopsOutOfMemory] describe
1 parent a83936e commit 4d8d070

File tree

10 files changed

+190
-28
lines changed

10 files changed

+190
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,9 @@ abstract class DataType {
227227
def json: String = compact(render(jsonValue))
228228

229229
def prettyJson: String = pretty(render(jsonValue))
230-
}
231230

231+
def simpleString: String = typeName
232+
}
232233

233234
/**
234235
* :: DeveloperApi ::
@@ -242,7 +243,6 @@ case object NullType extends DataType {
242243
override def defaultSize: Int = 1
243244
}
244245

245-
246246
protected[sql] object NativeType {
247247
val all = Seq(
248248
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
@@ -448,6 +448,8 @@ case object LongType extends IntegralType {
448448
* The default size of a value of the LongType is 8 bytes.
449449
*/
450450
override def defaultSize: Int = 8
451+
452+
override def simpleString = "bigint"
451453
}
452454

453455

@@ -470,6 +472,8 @@ case object IntegerType extends IntegralType {
470472
* The default size of a value of the IntegerType is 4 bytes.
471473
*/
472474
override def defaultSize: Int = 4
475+
476+
override def simpleString = "int"
473477
}
474478

475479

@@ -492,6 +496,8 @@ case object ShortType extends IntegralType {
492496
* The default size of a value of the ShortType is 2 bytes.
493497
*/
494498
override def defaultSize: Int = 2
499+
500+
override def simpleString = "smallint"
495501
}
496502

497503

@@ -514,6 +520,8 @@ case object ByteType extends IntegralType {
514520
* The default size of a value of the ByteType is 1 byte.
515521
*/
516522
override def defaultSize: Int = 1
523+
524+
override def simpleString = "tinyint"
517525
}
518526

519527

@@ -573,6 +581,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
573581
* The default size of a value of the DecimalType is 4096 bytes.
574582
*/
575583
override def defaultSize: Int = 4096
584+
585+
override def simpleString = precisionInfo match {
586+
case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
587+
case None => "decimal(10,0)"
588+
}
576589
}
577590

578591

@@ -695,6 +708,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
695708
* (We assume that there are 100 elements).
696709
*/
697710
override def defaultSize: Int = 100 * elementType.defaultSize
711+
712+
override def simpleString = s"array<${elementType.simpleString}>"
698713
}
699714

700715

@@ -870,6 +885,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
870885
* The default size of a value of the StructType is the total default sizes of all field types.
871886
*/
872887
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
888+
889+
override def simpleString = {
890+
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
891+
s"struct<${fieldTypes.mkString(",")}>"
892+
}
873893
}
874894

875895

@@ -920,6 +940,8 @@ case class MapType(
920940
* (We assume that there are 100 elements).
921941
*/
922942
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
943+
944+
override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>"
923945
}
924946

925947

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
2727
import org.apache.spark.sql.parquet._
2828
import org.apache.spark.sql.types._
29+
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
30+
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
2931
import org.apache.spark.sql.sources._
3032

31-
3233
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3334
self: SQLContext#SparkPlanner =>
3435

@@ -337,6 +338,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
337338
case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
338339
sys.error("allowExisting should be set to false when creating a temporary table.")
339340

341+
case LogicalDescribeCommand(table, isExtended) =>
342+
val resultPlan = self.sqlContext.executePlan(table).executedPlan
343+
ExecutedCommand(
344+
RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
345+
346+
case LogicalDescribeCommand(table, isExtended) =>
347+
val resultPlan = self.sqlContext.executePlan(table).executedPlan
348+
ExecutedCommand(
349+
RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil
350+
340351
case _ => Nil
341352
}
342353
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
2525
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
2626
import org.apache.spark.sql.catalyst.plans.logical
2727
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28+
import scala.collection.mutable.ArrayBuffer
2829

2930
/**
3031
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -176,9 +177,14 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
176177
@DeveloperApi
177178
case class DescribeCommand(
178179
child: SparkPlan,
179-
override val output: Seq[Attribute]) extends RunnableCommand {
180+
override val output: Seq[Attribute],
181+
isExtended: Boolean) extends RunnableCommand {
180182

181183
override def run(sqlContext: SQLContext) = {
182-
child.output.map(field => Row(field.name, field.dataType.toString, null))
184+
child.schema.fields.map { field =>
185+
val cmtKey = "comment"
186+
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
187+
Row(field.name, field.dataType.simpleString, comment)
188+
}
183189
}
184190
}

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.apache.spark.Logging
2323
import org.apache.spark.sql.{DataFrame, SQLContext}
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
26+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
27+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2628
import org.apache.spark.sql.execution.RunnableCommand
2729
import org.apache.spark.sql.types._
2830
import org.apache.spark.util.Utils
@@ -50,7 +52,6 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
5052
}
5153
}
5254

53-
5455
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
5556
// properties via reflection the class in runtime for constructing the SqlLexical object
5657
protected val CREATE = Keyword("CREATE")
@@ -61,6 +62,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
6162
protected val EXISTS = Keyword("EXISTS")
6263
protected val USING = Keyword("USING")
6364
protected val OPTIONS = Keyword("OPTIONS")
65+
protected val DESCRIBE = Keyword("DESCRIBE")
66+
protected val EXTENDED = Keyword("EXTENDED")
6467
protected val AS = Keyword("AS")
6568
protected val COMMENT = Keyword("COMMENT")
6669

@@ -82,7 +85,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
8285
protected val MAP = Keyword("MAP")
8386
protected val STRUCT = Keyword("STRUCT")
8487

85-
protected lazy val ddl: Parser[LogicalPlan] = createTable
88+
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
8689

8790
protected def start: Parser[LogicalPlan] = ddl
8891

@@ -136,6 +139,22 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
136139

137140
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
138141

142+
/*
143+
* describe [extended] table avroTable
144+
* This will display all columns of table `avroTable` includes column_name,column_type,nullable
145+
*/
146+
protected lazy val describeTable: Parser[LogicalPlan] =
147+
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
148+
case e ~ db ~ tbl =>
149+
val tblIdentifier = db match {
150+
case Some(dbName) =>
151+
Seq(dbName, tbl)
152+
case None =>
153+
Seq(tbl)
154+
}
155+
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
156+
}
157+
139158
protected lazy val options: Parser[Map[String, String]] =
140159
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
141160

@@ -274,6 +293,22 @@ object ResolvedDataSource {
274293

275294
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
276295

296+
/**
297+
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
298+
* @param table The table to be described.
299+
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
300+
* It is effective only when the table is a Hive table.
301+
*/
302+
private[sql] case class DescribeCommand(
303+
table: LogicalPlan,
304+
isExtended: Boolean) extends Command {
305+
override def output = Seq(
306+
// Column names are based on Hive.
307+
AttributeReference("col_name", StringType, nullable = false)(),
308+
AttributeReference("data_type", StringType, nullable = false)(),
309+
AttributeReference("comment", StringType, nullable = false)())
310+
}
311+
277312
private[sql] case class CreateTableUsing(
278313
tableName: String,
279314
userSpecifiedSchema: Option[StructType],
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources
19+
20+
import org.apache.spark.sql._
21+
import org.apache.spark.sql.types._
22+
23+
class DDLScanSource extends RelationProvider {
24+
override def createRelation(
25+
sqlContext: SQLContext,
26+
parameters: Map[String, String]): BaseRelation = {
27+
SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
28+
}
29+
}
30+
31+
case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
32+
extends TableScan {
33+
34+
override def schema =
35+
StructType(Seq(
36+
StructField("intType", IntegerType, nullable = false,
37+
new MetadataBuilder().putString("comment", "test comment").build()),
38+
StructField("stringType", StringType, nullable = false),
39+
StructField("dateType", DateType, nullable = false),
40+
StructField("timestampType", TimestampType, nullable = false),
41+
StructField("doubleType", DoubleType, nullable = false),
42+
StructField("bigintType", LongType, nullable = false),
43+
StructField("tinyintType", ByteType, nullable = false),
44+
StructField("decimalType", DecimalType.Unlimited, nullable = false),
45+
StructField("fixedDecimalType", DecimalType(5,1), nullable = false),
46+
StructField("binaryType", BinaryType, nullable = false),
47+
StructField("booleanType", BooleanType, nullable = false),
48+
StructField("smallIntType", ShortType, nullable = false),
49+
StructField("floatType", FloatType, nullable = false),
50+
StructField("mapType", MapType(StringType, StringType)),
51+
StructField("arrayType", ArrayType(StringType)),
52+
StructField("structType",
53+
StructType(StructField("f1",StringType) ::
54+
(StructField("f2",IntegerType)) :: Nil
55+
)
56+
)
57+
))
58+
59+
60+
override def buildScan() = sqlContext.sparkContext.parallelize(from to to).
61+
map(e => Row(s"people$e", e * 2))
62+
}
63+
64+
class DDLTestSuite extends DataSourceTest {
65+
import caseInsensisitiveContext._
66+
67+
before {
68+
sql(
69+
"""
70+
|CREATE TEMPORARY TABLE ddlPeople
71+
|USING org.apache.spark.sql.sources.DDLScanSource
72+
|OPTIONS (
73+
| From '1',
74+
| To '10'
75+
|)
76+
""".stripMargin)
77+
}
78+
79+
sqlTest(
80+
"describe ddlPeople",
81+
Seq(
82+
Row("intType", "int", "test comment"),
83+
Row("stringType", "string", ""),
84+
Row("dateType", "date", ""),
85+
Row("timestampType", "timestamp", ""),
86+
Row("doubleType", "double", ""),
87+
Row("bigintType", "bigint", ""),
88+
Row("tinyintType", "tinyint", ""),
89+
Row("decimalType", "decimal(10,0)", ""),
90+
Row("fixedDecimalType", "decimal(5,1)", ""),
91+
Row("binaryType", "binary", ""),
92+
Row("booleanType", "boolean", ""),
93+
Row("smallIntType", "smallint", ""),
94+
Row("floatType", "float", ""),
95+
Row("mapType", "map<string,string>", ""),
96+
Row("arrayType", "array<string>", ""),
97+
Row("structType", "struct<f1:string,f2:int>", "")
98+
))
99+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7575
DataFrame(this,
7676
ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
7777
} else {
78-
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
78+
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
7979
}
8080
}
8181

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans._
3434
import org.apache.spark.sql.catalyst.plans.logical
3535
import org.apache.spark.sql.catalyst.plans.logical._
3636
import org.apache.spark.sql.execution.ExplainCommand
37+
import org.apache.spark.sql.sources.DescribeCommand
3738
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
3839
import org.apache.spark.sql.types._
3940

@@ -47,22 +48,6 @@ import scala.collection.JavaConversions._
4748
*/
4849
private[hive] case object NativePlaceholder extends Command
4950

50-
/**
51-
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
52-
* @param table The table to be described.
53-
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
54-
* It is effective only when the table is a Hive table.
55-
*/
56-
case class DescribeCommand(
57-
table: LogicalPlan,
58-
isExtended: Boolean) extends Command {
59-
override def output = Seq(
60-
// Column names are based on Hive.
61-
AttributeReference("col_name", StringType, nullable = false)(),
62-
AttributeReference("data_type", StringType, nullable = false)(),
63-
AttributeReference("comment", StringType, nullable = false)())
64-
}
65-
6651
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
6752
private[hive] object HiveQl {
6853
protected val nativeCommands = Seq(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
2929
import org.apache.spark.sql.catalyst.planning._
3030
import org.apache.spark.sql.catalyst.plans._
3131
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
32+
import org.apache.spark.sql.sources.DescribeCommand
3233
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
3334
import org.apache.spark.sql.execution._
3435
import org.apache.spark.sql.hive.execution._
@@ -240,8 +241,11 @@ private[hive] trait HiveStrategies {
240241
case t: MetastoreRelation =>
241242
ExecutedCommand(
242243
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
244+
243245
case o: LogicalPlan =>
244-
ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil
246+
val resultPlan = context.executePlan(o).executedPlan
247+
ExecutedCommand(RunnableDescribeCommand(
248+
resultPlan, describe.output, describe.isExtended)) :: Nil
245249
}
246250

247251
case _ => Nil

0 commit comments

Comments
 (0)