Skip to content

Commit 5b7ae19

Browse files
patch
1 parent a79a9f9 commit 5b7ae19

File tree

5 files changed

+209
-8
lines changed

5 files changed

+209
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ abstract class DataType {
224224
def json: String = compact(render(jsonValue))
225225

226226
def prettyJson: String = pretty(render(jsonValue))
227-
}
228227

228+
def toSimpleString: String = typeName
229+
}
229230

230231
/**
231232
* :: DeveloperApi ::
@@ -235,8 +236,9 @@ abstract class DataType {
235236
* @group dataType
236237
*/
237238
@DeveloperApi
238-
case object NullType extends DataType
239-
239+
case object NullType extends DataType {
240+
override def toSimpleString = "null"
241+
}
240242

241243
object NativeType {
242244
val all = Seq(
@@ -300,6 +302,7 @@ case object StringType extends NativeType with PrimitiveType {
300302
private[sql] type JvmType = String
301303
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
302304
private[sql] val ordering = implicitly[Ordering[JvmType]]
305+
override def toSimpleString = "string"
303306
}
304307

305308

@@ -324,6 +327,7 @@ case object BinaryType extends NativeType with PrimitiveType {
324327
x.length - y.length
325328
}
326329
}
330+
override def toSimpleString = "binary"
327331
}
328332

329333

@@ -339,6 +343,7 @@ case object BooleanType extends NativeType with PrimitiveType {
339343
private[sql] type JvmType = Boolean
340344
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
341345
private[sql] val ordering = implicitly[Ordering[JvmType]]
346+
override def toSimpleString = "boolean"
342347
}
343348

344349

@@ -359,6 +364,7 @@ case object TimestampType extends NativeType {
359364
private[sql] val ordering = new Ordering[JvmType] {
360365
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
361366
}
367+
override def toSimpleString = "timestamp"
362368
}
363369

364370

@@ -379,6 +385,7 @@ case object DateType extends NativeType {
379385
private[sql] val ordering = new Ordering[JvmType] {
380386
def compare(x: Date, y: Date) = x.compareTo(y)
381387
}
388+
override def toSimpleString = "date"
382389
}
383390

384391

@@ -425,6 +432,7 @@ case object LongType extends IntegralType {
425432
private[sql] val numeric = implicitly[Numeric[Long]]
426433
private[sql] val integral = implicitly[Integral[Long]]
427434
private[sql] val ordering = implicitly[Ordering[JvmType]]
435+
override def toSimpleString = "bigint"
428436
}
429437

430438

@@ -442,6 +450,7 @@ case object IntegerType extends IntegralType {
442450
private[sql] val numeric = implicitly[Numeric[Int]]
443451
private[sql] val integral = implicitly[Integral[Int]]
444452
private[sql] val ordering = implicitly[Ordering[JvmType]]
453+
override def toSimpleString = "int"
445454
}
446455

447456

@@ -459,6 +468,7 @@ case object ShortType extends IntegralType {
459468
private[sql] val numeric = implicitly[Numeric[Short]]
460469
private[sql] val integral = implicitly[Integral[Short]]
461470
private[sql] val ordering = implicitly[Ordering[JvmType]]
471+
override def toSimpleString = "smallint"
462472
}
463473

464474

@@ -476,6 +486,7 @@ case object ByteType extends IntegralType {
476486
private[sql] val numeric = implicitly[Numeric[Byte]]
477487
private[sql] val integral = implicitly[Integral[Byte]]
478488
private[sql] val ordering = implicitly[Ordering[JvmType]]
489+
override def toSimpleString = "tinyint"
479490
}
480491

481492

@@ -530,6 +541,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
530541
case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
531542
case None => "DecimalType()"
532543
}
544+
545+
override def toSimpleString = precisionInfo match {
546+
case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
547+
case None => "decimal(10,0)"
548+
}
533549
}
534550

535551

@@ -580,6 +596,7 @@ case object DoubleType extends FractionalType {
580596
private[sql] val fractional = implicitly[Fractional[Double]]
581597
private[sql] val ordering = implicitly[Ordering[JvmType]]
582598
private[sql] val asIntegral = DoubleAsIfIntegral
599+
override def toSimpleString = "double"
583600
}
584601

585602

@@ -598,6 +615,7 @@ case object FloatType extends FractionalType {
598615
private[sql] val fractional = implicitly[Fractional[Float]]
599616
private[sql] val ordering = implicitly[Ordering[JvmType]]
600617
private[sql] val asIntegral = FloatAsIfIntegral
618+
override def toSimpleString = "float"
601619
}
602620

603621

@@ -636,6 +654,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
636654
("type" -> typeName) ~
637655
("elementType" -> elementType.jsonValue) ~
638656
("containsNull" -> containsNull)
657+
658+
override def toSimpleString = s"array<${elementType.toSimpleString}>"
639659
}
640660

641661

@@ -805,6 +825,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
805825
override def length: Int = fields.length
806826

807827
override def iterator: Iterator[StructField] = fields.iterator
828+
829+
override def toSimpleString = {
830+
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.toSimpleString}")
831+
s"struct<${fieldTypes.mkString(",")}>"
832+
}
808833
}
809834

810835

@@ -848,6 +873,8 @@ case class MapType(
848873
("keyType" -> keyType.jsonValue) ~
849874
("valueType" -> valueType.jsonValue) ~
850875
("valueContainsNull" -> valueContainsNull)
876+
877+
override def toSimpleString = s"map<${keyType.toSimpleString},${valueType.toSimpleString}>"
851878
}
852879

853880

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import org.apache.spark.Logging
2121
import org.apache.spark.annotation.DeveloperApi
2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
24+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2425
import org.apache.spark.sql.catalyst.errors.TreeNodeException
25-
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
26+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute}
2627
import org.apache.spark.sql.catalyst.plans.logical
2728
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29+
import scala.collection.mutable.ArrayBuffer
2830

2931
/**
3032
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -178,3 +180,34 @@ case class DescribeCommand(
178180
child.output.map(field => Row(field.name, field.dataType.toString, null))
179181
}
180182
}
183+
184+
/**
185+
* :: DeveloperApi ::
186+
*/
187+
@DeveloperApi
188+
case class DDLDescribeCommand(
189+
dbName: Option[String],
190+
tableName: String, isExtended: Boolean) extends RunnableCommand {
191+
192+
override def run(sqlContext: SQLContext) = {
193+
val tblRelation = dbName match {
194+
case Some(db) => UnresolvedRelation(Seq(db, tableName))
195+
case None => UnresolvedRelation(Seq(tableName))
196+
}
197+
val logicalRelation = sqlContext.executePlan(tblRelation).analyzed
198+
val rows = new ArrayBuffer[Row]()
199+
rows ++= logicalRelation.schema.fields.map{field =>
200+
Row(field.name, field.dataType.toSimpleString, null)}
201+
202+
/*
203+
* TODO if future support partition table, add header below:
204+
* # Partition Information
205+
* # col_name data_type comment
206+
*/
207+
if (isExtended) { // TODO describe extended table
208+
// rows += Row("# extended", null, null)
209+
}
210+
rows
211+
}
212+
}
213+

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.spark.Logging
2525
import org.apache.spark.sql.{SchemaRDD, SQLContext}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.SqlLexical
28-
import org.apache.spark.sql.execution.RunnableCommand
28+
import org.apache.spark.sql.execution.{DDLDescribeCommand, RunnableCommand}
2929
import org.apache.spark.sql.types._
3030
import org.apache.spark.util.Utils
31+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3132

3233
/**
3334
* A parser for foreign DDL commands.
@@ -61,6 +62,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
6162
protected val TABLE = Keyword("TABLE")
6263
protected val USING = Keyword("USING")
6364
protected val OPTIONS = Keyword("OPTIONS")
65+
protected val DESCRIBE = Keyword("DESCRIBE")
66+
protected val EXTENDED = Keyword("EXTENDED")
6467

6568
// Data types.
6669
protected val STRING = Keyword("STRING")
@@ -89,7 +92,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
8992

9093
override val lexical = new SqlLexical(reservedWords)
9194

92-
protected lazy val ddl: Parser[LogicalPlan] = createTable
95+
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
9396

9497
/**
9598
* `CREATE [TEMPORARY] TABLE avroTable
@@ -112,6 +115,16 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
112115

113116
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
114117

118+
/*
119+
* describe [extended] table avroTable
120+
* This will display all columns of table `avroTable` includes column_name,column_type,nullable
121+
*/
122+
protected lazy val describeTable: Parser[LogicalPlan] =
123+
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
124+
case e ~ db ~ tbl =>
125+
DDLDescribeCommand(db, tbl, e.nonEmpty)
126+
}
127+
115128
protected lazy val options: Parser[Map[String, String]] =
116129
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
117130

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources
19+
20+
import org.apache.spark.sql._
21+
import org.apache.spark.sql.types._
22+
23+
class DDLScanSource extends RelationProvider {
24+
override def createRelation(
25+
sqlContext: SQLContext,
26+
parameters: Map[String, String]): BaseRelation = {
27+
SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
28+
}
29+
}
30+
31+
case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
32+
extends TableScan {
33+
34+
override def schema =
35+
StructType(Seq(
36+
StructField("intType", IntegerType, nullable = false),
37+
StructField("stringType", StringType, nullable = false),
38+
StructField("dateType", DateType, nullable = false),
39+
StructField("timestampType", TimestampType, nullable = false),
40+
StructField("doubleType", DoubleType, nullable = false),
41+
StructField("bigintType", LongType, nullable = false),
42+
StructField("tinyintType", ByteType, nullable = false),
43+
StructField("decimalType", DecimalType.Unlimited, nullable = false),
44+
StructField("fixedDecimalType", DecimalType(5,1), nullable = false),
45+
StructField("binaryType", BinaryType, nullable = false),
46+
StructField("booleanType", BooleanType, nullable = false),
47+
StructField("smallIntType", ShortType, nullable = false),
48+
StructField("floatType", FloatType, nullable = false),
49+
StructField("mapType", MapType(StringType, StringType)),
50+
StructField("arrayType", ArrayType(StringType)),
51+
StructField("structType",
52+
StructType(StructField("f1",StringType) ::
53+
(StructField("f2",IntegerType)) :: Nil
54+
)
55+
)
56+
))
57+
58+
59+
override def buildScan() = sqlContext.sparkContext.parallelize(from to to).
60+
map(e => Row(s"people$e",e*2))
61+
}
62+
63+
class DDLTestSuit extends DataSourceTest {
64+
import caseInsensisitiveContext._
65+
66+
before {
67+
sql(
68+
"""
69+
|CREATE TEMPORARY TABLE ddlPeople
70+
|USING org.apache.spark.sql.sources.DDLScanSource
71+
|OPTIONS (
72+
| From '1',
73+
| To '10'
74+
|)
75+
""".stripMargin)
76+
}
77+
78+
sqlTest(
79+
"describe ddlPeople",
80+
Seq(
81+
Row("intType", "int", null),
82+
Row("stringType", "string", null),
83+
Row("dateType", "date", null),
84+
Row("timestampType", "timestamp", null),
85+
Row("doubleType", "double", null),
86+
Row("bigintType", "bigint", null),
87+
Row("tinyintType", "tinyint", null),
88+
Row("decimalType", "decimal(10,0)", null),
89+
Row("fixedDecimalType", "decimal(5,1)", null),
90+
Row("binaryType", "binary", null),
91+
Row("booleanType", "boolean", null),
92+
Row("smallIntType", "smallint", null),
93+
Row("floatType", "float", null),
94+
Row("mapType", "map<string,string>", null),
95+
Row("arrayType", "array<string>", null),
96+
Row("structType", "struct<f1:string,f2:int>", null)
97+
))
98+
99+
sqlTest(
100+
"describe extended ddlPeople",
101+
Seq(
102+
Row("intType", "int", null),
103+
Row("stringType", "string", null),
104+
Row("dateType", "date", null),
105+
Row("timestampType", "timestamp", null),
106+
Row("doubleType", "double", null),
107+
Row("bigintType", "bigint", null),
108+
Row("tinyintType", "tinyint", null),
109+
Row("decimalType", "decimal(10,0)", null),
110+
Row("fixedDecimalType", "decimal(5,1)", null),
111+
Row("binaryType", "binary", null),
112+
Row("booleanType", "boolean", null),
113+
Row("smallIntType", "smallint", null),
114+
Row("floatType", "float", null),
115+
Row("mapType", "map<string,string>", null),
116+
Row("arrayType", "array<string>", null),
117+
Row("structType", "struct<f1:string,f2:int>", null)
118+
// Row("# extended", null, null)
119+
))
120+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
4040
import org.apache.spark.sql.catalyst.plans.logical._
4141
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
4242
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
43+
import org.apache.spark.sql.hive.HiveQl.ParseException
4344
import org.apache.spark.sql.sources.DataSourceStrategy
4445
import org.apache.spark.sql.types._
4546

@@ -70,8 +71,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7071
if (conf.dialect == "sql") {
7172
super.sql(sqlText)
7273
} else if (conf.dialect == "hiveql") {
73-
new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText)))
74-
} else {
74+
val ddlPlan = ddlParser(sqlText)
75+
val basicPlan = try {
76+
HiveQl.parseSql(sqlText)
77+
}catch {
78+
case e: Exception if ddlPlan.nonEmpty => ddlPlan.get
79+
case e: Throwable => throw e
80+
}
81+
new SchemaRDD(this, basicPlan)
82+
} else {
7583
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
7684
}
7785
}

0 commit comments

Comments
 (0)