Skip to content

Commit 326b388

Browse files
committed
Merge branch 'master' of github.com:apache/spark into equals
Conflicts: sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
2 parents bd19807 + f479cf3 commit 326b388

File tree

23 files changed

+2409
-427
lines changed

23 files changed

+2409
-427
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 56 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
6666
protected case class Keyword(str: String)
6767

6868
protected implicit def asParser(k: Keyword): Parser[String] =
69-
allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
70-
71-
protected class SqlLexical extends StdLexical {
72-
case class FloatLit(chars: String) extends Token {
73-
override def toString = chars
74-
}
75-
override lazy val token: Parser[Token] = (
76-
identChar ~ rep( identChar | digit ) ^^
77-
{ case first ~ rest => processIdent(first :: rest mkString "") }
78-
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
79-
case i ~ None => NumericLit(i mkString "")
80-
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
81-
}
82-
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
83-
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
84-
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
85-
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
86-
| EofCh ^^^ EOF
87-
| '\'' ~> failure("unclosed string literal")
88-
| '\"' ~> failure("unclosed string literal")
89-
| delim
90-
| failure("illegal character")
91-
)
92-
93-
override def identChar = letter | elem('.') | elem('_')
94-
95-
override def whitespace: Parser[Any] = rep(
96-
whitespaceChar
97-
| '/' ~ '*' ~ comment
98-
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
99-
| '#' ~ rep( chrExcept(EofCh, '\n') )
100-
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
101-
| '/' ~ '*' ~ failure("unclosed comment")
102-
)
103-
}
104-
105-
override val lexical = new SqlLexical
69+
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
10670

10771
protected val ALL = Keyword("ALL")
10872
protected val AND = Keyword("AND")
@@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
161125
this.getClass
162126
.getMethods
163127
.filter(_.getReturnType == classOf[Keyword])
164-
.map(_.invoke(this).asInstanceOf[Keyword])
165-
166-
/** Generate all variations of upper and lower case of a given string */
167-
private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
168-
if (s == "") {
169-
Stream(prefix)
170-
} else {
171-
allCaseVersions(s.tail, prefix + s.head.toLower) ++
172-
allCaseVersions(s.tail, prefix + s.head.toUpper)
173-
}
174-
}
128+
.map(_.invoke(this).asInstanceOf[Keyword].str)
175129

176-
lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))
177-
178-
lexical.delimiters += (
179-
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
180-
",", ";", "%", "{", "}", ":", "[", "]"
181-
)
130+
override val lexical = new SqlLexical(reservedWords)
182131

183132
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
184133
exprs.zipWithIndex.map {
@@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
383332
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
384333

385334
protected lazy val baseExpression: PackratParser[Expression] =
386-
expression ~ "[" ~ expression <~ "]" ^^ {
335+
expression ~ "[" ~ expression <~ "]" ^^ {
387336
case base ~ _ ~ ordinal => GetItem(base, ordinal)
388337
} |
389338
TRUE ^^^ Literal(true, BooleanType) |
@@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
399348
protected lazy val dataType: Parser[DataType] =
400349
STRING ^^^ StringType
401350
}
351+
352+
class SqlLexical(val keywords: Seq[String]) extends StdLexical {
353+
case class FloatLit(chars: String) extends Token {
354+
override def toString = chars
355+
}
356+
357+
reserved ++= keywords.flatMap(w => allCaseVersions(w))
358+
359+
delimiters += (
360+
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
361+
",", ";", "%", "{", "}", ":", "[", "]"
362+
)
363+
364+
override lazy val token: Parser[Token] = (
365+
identChar ~ rep( identChar | digit ) ^^
366+
{ case first ~ rest => processIdent(first :: rest mkString "") }
367+
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
368+
case i ~ None => NumericLit(i mkString "")
369+
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
370+
}
371+
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
372+
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
373+
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
374+
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
375+
| EofCh ^^^ EOF
376+
| '\'' ~> failure("unclosed string literal")
377+
| '\"' ~> failure("unclosed string literal")
378+
| delim
379+
| failure("illegal character")
380+
)
381+
382+
override def identChar = letter | elem('_') | elem('.')
383+
384+
override def whitespace: Parser[Any] = rep(
385+
whitespaceChar
386+
| '/' ~ '*' ~ comment
387+
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
388+
| '#' ~ rep( chrExcept(EofCh, '\n') )
389+
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
390+
| '/' ~ '*' ~ failure("unclosed comment")
391+
)
392+
393+
/** Generate all variations of upper and lower case of a given string */
394+
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
395+
if (s == "") {
396+
Stream(prefix)
397+
} else {
398+
allCaseVersions(s.tail, prefix + s.head.toLower) ++
399+
allCaseVersions(s.tail, prefix + s.head.toUpper)
400+
}
401+
}
402+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
5050
null
5151
} else {
5252
if (child.dataType.isInstanceOf[ArrayType]) {
53+
// TODO: consider using Array[_] for ArrayType child to avoid
54+
// boxing of primitives
5355
val baseValue = value.asInstanceOf[Seq[_]]
5456
val o = key.asInstanceOf[Int]
5557
if (o >= baseValue.size || o < 0) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,19 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
6060
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
6161
*/
6262
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
63+
64+
/**
65+
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
66+
* @param table The table to be described.
67+
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
68+
* It is effective only when the table is a Hive table.
69+
*/
70+
case class DescribeCommand(
71+
table: LogicalPlan,
72+
isExtended: Boolean) extends Command {
73+
override def output = Seq(
74+
// Column names are based on Hive.
75+
BoundReference(0, AttributeReference("col_name", StringType, nullable = false)()),
76+
BoundReference(1, AttributeReference("data_type", StringType, nullable = false)()),
77+
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
78+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,108 @@ package org.apache.spark.sql.catalyst.types
1919

2020
import java.sql.Timestamp
2121

22-
import scala.reflect.runtime.universe.{typeTag, TypeTag}
22+
import scala.util.parsing.combinator.RegexParsers
2323

24-
import org.apache.spark.sql.catalyst.expressions.Expression
24+
import scala.reflect.ClassTag
25+
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
26+
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
28+
import org.apache.spark.util.Utils
29+
30+
/**
31+
*
32+
*/
33+
object DataType extends RegexParsers {
34+
protected lazy val primitiveType: Parser[DataType] =
35+
"StringType" ^^^ StringType |
36+
"FloatType" ^^^ FloatType |
37+
"IntegerType" ^^^ IntegerType |
38+
"ByteType" ^^^ ByteType |
39+
"ShortType" ^^^ ShortType |
40+
"DoubleType" ^^^ DoubleType |
41+
"LongType" ^^^ LongType |
42+
"BinaryType" ^^^ BinaryType |
43+
"BooleanType" ^^^ BooleanType |
44+
"DecimalType" ^^^ DecimalType |
45+
"TimestampType" ^^^ TimestampType
46+
47+
protected lazy val arrayType: Parser[DataType] =
48+
"ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType
49+
50+
protected lazy val mapType: Parser[DataType] =
51+
"MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
52+
case t1 ~ _ ~ t2 => MapType(t1, t2)
53+
}
54+
55+
protected lazy val structField: Parser[StructField] =
56+
("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
57+
case name ~ tpe ~ nullable =>
58+
StructField(name, tpe, nullable = nullable)
59+
}
60+
61+
protected lazy val boolVal: Parser[Boolean] =
62+
"true" ^^^ true |
63+
"false" ^^^ false
64+
65+
66+
protected lazy val structType: Parser[DataType] =
67+
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
68+
case fields => new StructType(fields)
69+
}
70+
71+
protected lazy val dataType: Parser[DataType] =
72+
arrayType |
73+
mapType |
74+
structType |
75+
primitiveType
76+
77+
/**
78+
* Parses a string representation of a DataType.
79+
*
80+
* TODO: Generate parser as pickler...
81+
*/
82+
def apply(asString: String): DataType = parseAll(dataType, asString) match {
83+
case Success(result, _) => result
84+
case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
85+
}
86+
}
2587

2688
abstract class DataType {
2789
/** Matches any expression that evaluates to this DataType */
2890
def unapply(a: Expression): Boolean = a match {
2991
case e: Expression if e.dataType == this => true
3092
case _ => false
3193
}
94+
95+
def isPrimitive: Boolean = false
3296
}
3397

3498
case object NullType extends DataType
3599

100+
trait PrimitiveType extends DataType {
101+
override def isPrimitive = true
102+
}
103+
36104
abstract class NativeType extends DataType {
37105
type JvmType
38106
@transient val tag: TypeTag[JvmType]
39107
val ordering: Ordering[JvmType]
108+
109+
@transient val classTag = {
110+
val mirror = runtimeMirror(Utils.getSparkClassLoader)
111+
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
112+
}
40113
}
41114

42-
case object StringType extends NativeType {
115+
case object StringType extends NativeType with PrimitiveType {
43116
type JvmType = String
44117
@transient lazy val tag = typeTag[JvmType]
45118
val ordering = implicitly[Ordering[JvmType]]
46119
}
47-
case object BinaryType extends DataType {
120+
case object BinaryType extends DataType with PrimitiveType {
48121
type JvmType = Array[Byte]
49122
}
50-
case object BooleanType extends NativeType {
123+
case object BooleanType extends NativeType with PrimitiveType {
51124
type JvmType = Boolean
52125
@transient lazy val tag = typeTag[JvmType]
53126
val ordering = implicitly[Ordering[JvmType]]
@@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
63136
}
64137
}
65138

66-
abstract class NumericType extends NativeType {
139+
abstract class NumericType extends NativeType with PrimitiveType {
67140
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
68141
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
69142
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
@@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
154227
case class ArrayType(elementType: DataType) extends DataType
155228

156229
case class StructField(name: String, dataType: DataType, nullable: Boolean)
157-
case class StructType(fields: Seq[StructField]) extends DataType
230+
231+
object StructType {
232+
def fromAttributes(attributes: Seq[Attribute]): StructType = {
233+
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
234+
}
235+
236+
// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
237+
}
238+
239+
case class StructType(fields: Seq[StructField]) extends DataType {
240+
def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
241+
}
158242

159243
case class MapType(keyType: DataType, valueType: DataType) extends DataType

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
9494
* @group userf
9595
*/
9696
def parquetFile(path: String): SchemaRDD =
97-
new SchemaRDD(this, parquet.ParquetRelation(path))
97+
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))
9898

9999
/**
100100
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
9999
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
100100
*/
101101
def parquetFile(path: String): JavaSchemaRDD =
102-
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
102+
new JavaSchemaRDD(
103+
sqlContext,
104+
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))
103105

104106
/**
105107
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
154154
case logical.WriteToFile(path, child) =>
155155
val relation =
156156
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
157-
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
157+
// Note: overwrite=false because otherwise the metadata we just created will be deleted
158+
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
158159
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
159160
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
160161
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,24 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
121121

122122
override def output: Seq[Attribute] = Seq.empty
123123
}
124+
125+
/**
126+
* :: DeveloperApi ::
127+
*/
128+
@DeveloperApi
129+
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
130+
@transient context: SQLContext)
131+
extends LeafNode with Command {
132+
133+
override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
134+
Seq(("# Registered as a temporary table", null, null)) ++
135+
child.output.map(field => (field.name, field.dataType.toString, null))
136+
}
137+
138+
override def execute(): RDD[Row] = {
139+
val rows = sideEffectResult.map {
140+
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
141+
}
142+
context.sparkContext.parallelize(rows, 1)
143+
}
144+
}

0 commit comments

Comments
 (0)