diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index 9f30f40a173e..c879325c2c71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -227,8 +227,9 @@ abstract class DataType { def json: String = compact(render(jsonValue)) def prettyJson: String = pretty(render(jsonValue)) -} + def toSimpleString: String = typeName +} /** * :: DeveloperApi :: @@ -240,9 +241,9 @@ abstract class DataType { @DeveloperApi case object NullType extends DataType { override def defaultSize: Int = 1 + override def toSimpleString = "null" } - protected[sql] object NativeType { val all = Seq( IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) @@ -300,6 +301,8 @@ case object StringType extends NativeType with PrimitiveType { * The default size of a value of the StringType is 4096 bytes. */ override def defaultSize: Int = 4096 + + override def toSimpleString = "string" } @@ -329,6 +332,8 @@ case object BinaryType extends NativeType with PrimitiveType { * The default size of a value of the BinaryType is 4096 bytes. */ override def defaultSize: Int = 4096 + + override def toSimpleString = "binary" } @@ -349,6 +354,8 @@ case object BooleanType extends NativeType with PrimitiveType { * The default size of a value of the BooleanType is 1 byte. */ override def defaultSize: Int = 1 + + override def toSimpleString = "boolean" } @@ -374,6 +381,8 @@ case object TimestampType extends NativeType { * The default size of a value of the TimestampType is 8 bytes. */ override def defaultSize: Int = 8 + + override def toSimpleString = "timestamp" } @@ -399,6 +408,8 @@ case object DateType extends NativeType { * The default size of a value of the DateType is 8 bytes. */ override def defaultSize: Int = 8 + + override def toSimpleString = "date" } @@ -450,6 +461,8 @@ case object LongType extends IntegralType { * The default size of a value of the LongType is 8 bytes. */ override def defaultSize: Int = 8 + + override def toSimpleString = "bigint" } @@ -472,6 +485,8 @@ case object IntegerType extends IntegralType { * The default size of a value of the IntegerType is 4 bytes. */ override def defaultSize: Int = 4 + + override def toSimpleString = "int" } @@ -494,6 +509,8 @@ case object ShortType extends IntegralType { * The default size of a value of the ShortType is 2 bytes. */ override def defaultSize: Int = 2 + + override def toSimpleString = "smallint" } @@ -516,6 +533,8 @@ case object ByteType extends IntegralType { * The default size of a value of the ByteType is 1 byte. */ override def defaultSize: Int = 1 + + override def toSimpleString = "tinyint" } @@ -575,6 +594,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT * The default size of a value of the DecimalType is 4096 bytes. */ override def defaultSize: Int = 4096 + + override def toSimpleString = precisionInfo match { + case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)" + case None => "decimal(10,0)" + } } @@ -630,6 +654,8 @@ case object DoubleType extends FractionalType { * The default size of a value of the DoubleType is 8 bytes. */ override def defaultSize: Int = 8 + + override def toSimpleString = "double" } @@ -653,6 +679,8 @@ case object FloatType extends FractionalType { * The default size of a value of the FloatType is 4 bytes. */ override def defaultSize: Int = 4 + + override def toSimpleString = "float" } @@ -697,6 +725,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT * (We assume that there are 100 elements). */ override def defaultSize: Int = 100 * elementType.defaultSize + + override def toSimpleString = s"array<${elementType.toSimpleString}>" } @@ -871,6 +901,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru * The default size of a value of the StructType is the total default sizes of all field types. */ override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum + + override def toSimpleString = { + val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.toSimpleString}") + s"struct<${fieldTypes.mkString(",")}>" + } } @@ -921,6 +956,8 @@ case class MapType( * (We assume that there are 100 elements). */ override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize) + + override def toSimpleString = s"map<${keyType.toSimpleString},${valueType.toSimpleString}>" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 52a31f01a435..701b69aa3d05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -21,10 +21,12 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import scala.collection.mutable.ArrayBuffer /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -178,3 +180,34 @@ case class DescribeCommand( child.output.map(field => Row(field.name, field.dataType.toString, null)) } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class DDLDescribeCommand( + dbName: Option[String], + tableName: String, isExtended: Boolean) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val tblRelation = dbName match { + case Some(db) => UnresolvedRelation(Seq(db, tableName)) + case None => UnresolvedRelation(Seq(tableName)) + } + val logicalRelation = sqlContext.executePlan(tblRelation).analyzed + val rows = new ArrayBuffer[Row]() + rows ++= logicalRelation.schema.fields.map{field => + Row(field.name, field.dataType.toSimpleString, null)} + + /* + * TODO if future support partition table, add header below: + * # Partition Information + * # col_name data_type comment + */ + if (isExtended) { // TODO describe extended table + // rows += Row("# extended", null, null) + } + rows + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 381298caba6f..34c7a57187b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -25,9 +25,10 @@ import org.apache.spark.Logging import org.apache.spark.sql.{SchemaRDD, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.{DDLDescribeCommand, RunnableCommand} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation /** * A parser for foreign DDL commands. @@ -61,6 +62,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected val TABLE = Keyword("TABLE") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") // Data types. protected val STRING = Keyword("STRING") @@ -89,7 +92,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi override val lexical = new SqlLexical(reservedWords) - protected lazy val ddl: Parser[LogicalPlan] = createTable + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable /** * `CREATE [TEMPORARY] TABLE avroTable @@ -112,6 +115,16 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" + /* + * describe [extended] table avroTable + * This will display all columns of table `avroTable` includes column_name,column_type,nullable + */ + protected lazy val describeTable: Parser[LogicalPlan] = + (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ { + case e ~ db ~ tbl => + DDLDescribeCommand(db, tbl, e.nonEmpty) + } + protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala new file mode 100644 index 000000000000..b0a5af3c52d8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -0,0 +1,121 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +class DDLScanSource extends RelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) + } +} + +case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = + StructType(Seq( + StructField("intType", IntegerType, nullable = false), + StructField("stringType", StringType, nullable = false), + StructField("dateType", DateType, nullable = false), + StructField("timestampType", TimestampType, nullable = false), + StructField("doubleType", DoubleType, nullable = false), + StructField("bigintType", LongType, nullable = false), + StructField("tinyintType", ByteType, nullable = false), + StructField("decimalType", DecimalType.Unlimited, nullable = false), + StructField("fixedDecimalType", DecimalType(5,1), nullable = false), + StructField("binaryType", BinaryType, nullable = false), + StructField("booleanType", BooleanType, nullable = false), + StructField("smallIntType", ShortType, nullable = false), + StructField("floatType", FloatType, nullable = false), + StructField("mapType", MapType(StringType, StringType)), + StructField("arrayType", ArrayType(StringType)), + StructField("structType", + StructType(StructField("f1",StringType) :: + StructField("f2",IntegerType) :: Nil + ) + ) + )) + + + override def buildScan() = sqlContext.sparkContext.parallelize(from to to).map { e => + Row(s"people$e", e * 2) + } +} + +class DDLTestSuite extends DataSourceTest { + import caseInsensisitiveContext._ + + before { + sql( + """ + |CREATE TEMPORARY TABLE ddlPeople + |USING org.apache.spark.sql.sources.DDLScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + + sqlTest( + "describe ddlPeople", + Seq( + Row("intType", "int", null), + Row("stringType", "string", null), + Row("dateType", "date", null), + Row("timestampType", "timestamp", null), + Row("doubleType", "double", null), + Row("bigintType", "bigint", null), + Row("tinyintType", "tinyint", null), + Row("decimalType", "decimal(10,0)", null), + Row("fixedDecimalType", "decimal(5,1)", null), + Row("binaryType", "binary", null), + Row("booleanType", "boolean", null), + Row("smallIntType", "smallint", null), + Row("floatType", "float", null), + Row("mapType", "map", null), + Row("arrayType", "array", null), + Row("structType", "struct", null) + )) + + sqlTest( + "describe extended ddlPeople", + Seq( + Row("intType", "int", null), + Row("stringType", "string", null), + Row("dateType", "date", null), + Row("timestampType", "timestamp", null), + Row("doubleType", "double", null), + Row("bigintType", "bigint", null), + Row("tinyintType", "tinyint", null), + Row("decimalType", "decimal(10,0)", null), + Row("fixedDecimalType", "decimal(5,1)", null), + Row("binaryType", "binary", null), + Row("booleanType", "boolean", null), + Row("smallIntType", "smallint", null), + Row("floatType", "float", null), + Row("mapType", "map", null), + Row("arrayType", "array", null), + Row("structType", "struct", null) + // Row("# extended", null, null) + )) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 3e26fe367576..61c25de7508e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} +import org.apache.spark.sql.hive.HiveQl.ParseException import org.apache.spark.sql.sources.DataSourceStrategy import org.apache.spark.sql.types._ @@ -70,9 +71,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { if (conf.dialect == "sql") { super.sql(sqlText) } else if (conf.dialect == "hiveql") { - new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText))) - } else { - sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") + val ddlPlan = ddlParser(sqlText) + val basicPlan = try { + HiveQl.parseSql(sqlText) + } catch { + case e: Exception if ddlPlan.nonEmpty => ddlPlan.get + case e: Throwable => throw e + } + new SchemaRDD(this, basicPlan) + } else { + sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") } }