@@ -22,14 +22,14 @@ import java.sql.Date
2222import scala .collection .mutable .ArrayBuffer
2323
2424import org .apache .hadoop .hive .conf .HiveConf
25- import org .apache .hadoop .hive .ql .Context
25+ import org .apache .hadoop .hive .serde .serdeConstants
26+ import org .apache .hadoop .hive .ql .{ErrorMsg , Context }
2627import org .apache .hadoop .hive .ql .exec .{FunctionRegistry , FunctionInfo }
2728import org .apache .hadoop .hive .ql .lib .Node
28- import org .apache .hadoop .hive .ql .metadata .Table
2929import org .apache .hadoop .hive .ql .parse ._
30- import org .apache .hadoop .hive .ql .plan .PlanUtils
31- import org .apache .spark .sql .AnalysisException
30+ import org .apache .hadoop .hive .ql .session .SessionState
3231
32+ import org .apache .spark .sql .{AnalysisException , SparkSQLParser }
3333import org .apache .spark .sql .catalyst .analysis ._
3434import org .apache .spark .sql .catalyst .expressions ._
3535import org .apache .spark .sql .catalyst .plans ._
@@ -62,7 +62,8 @@ case class CreateTableAsSelect(
6262 allowExisting : Boolean ) extends UnaryNode with Command {
6363
6464 override def output : Seq [Attribute ] = Seq .empty[Attribute ]
65- override lazy val resolved : Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
65+ override lazy val resolved : Boolean =
66+ tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
6667}
6768
6869/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +241,24 @@ private[hive] object HiveQl {
240241 * Otherwise, there will be Null pointer exception,
241242 * when retrieving properties form HiveConf.
242243 */
243- val hContext = new Context (new HiveConf () )
244+ val hContext = new Context (hiveConf )
244245 val node = ParseUtils .findRootNonNullToken((new ParseDriver ).parse(sql, hContext))
245246 hContext.clear()
246247 node
247248 }
248249
250+ /**
251+ * Returns the HiveConf
252+ * TODO get it from HiveContext?
253+ */
254+ private [this ] def hiveConf (): HiveConf = {
255+ val ss = SessionState .get() // SessionState is lazy initializaion, it can be null here
256+ if (ss == null ) {
257+ new HiveConf ()
258+ } else {
259+ ss.getConf
260+ }
261+ }
249262
250263 /** Returns a LogicalPlan for a given HiveQL string. */
251264 def parseSql (sql : String ): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +489,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
476489 DropTable (tableName, ifExists.nonEmpty)
477490 // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
478491 case Token (" TOK_ANALYZE" ,
479- Token (" TOK_TAB" , Token (" TOK_TABNAME" , tableNameParts) :: partitionSpec) ::
480- isNoscan) =>
492+ Token (" TOK_TAB" , Token (" TOK_TABNAME" , tableNameParts) :: partitionSpec) ::
493+ isNoscan) =>
481494 // Reference:
482495 // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
483496 if (partitionSpec.nonEmpty) {
@@ -547,13 +560,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
547560 val (
548561 Some (tableNameParts) ::
549562 _ /* likeTable */ ::
563+ externalTable ::
550564 Some (query) ::
551565 allowExisting +:
552566 ignores) =
553567 getClauses(
554568 Seq (
555569 " TOK_TABNAME" ,
556570 " TOK_LIKETABLE" ,
571+ " EXTERNAL" ,
557572 " TOK_QUERY" ,
558573 " TOK_IFNOTEXISTS" ,
559574 " TOK_TABLECOMMENT" ,
@@ -576,43 +591,156 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
576591 children)
577592 val (db, tableName) = extractDbNameTableName(tableNameParts)
578593
579- var tableDesc =
580- HiveTable (
581- specifiedDatabase = db,
582- name = tableName,
583- schema = Seq .empty,
584- partitionColumns = Seq .empty,
585- properties = Map .empty,
586- serdeProperties = Map .empty,
587- tableType = ManagedTable ,
588- location = None ,
589- inputFormat = None ,
590- outputFormat = None ,
591- serde = None )
592-
593- // TODO: Handle all the cases here...
594- children.foreach {
595- case Token (" TOK_TBLRCFILE" , Nil ) =>
596- import org .apache .hadoop .hive .ql .io .{RCFileInputFormat , RCFileOutputFormat }
594+ // TODO add bucket support
595+ var tableDesc : HiveTable = HiveTable (
596+ specifiedDatabase = db,
597+ name = tableName,
598+ schema = Seq .empty[HiveColumn ],
599+ partitionColumns = Seq .empty[HiveColumn ],
600+ properties = Map [String , String ](),
601+ serdeProperties = Map [String , String ](),
602+ tableType = if (externalTable.isDefined) ExternalTable else ManagedTable ,
603+ location = None ,
604+ inputFormat = None ,
605+ outputFormat = None ,
606+ serde = None ,
607+ viewText = None )
608+
609+ // default serde & input/output format
610+ tableDesc = if (" SequenceFile" .equalsIgnoreCase(
611+ hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTFILEFORMAT ))) {
612+ tableDesc.copy(
613+ inputFormat = Option (" org.apache.hadoop.mapred.SequenceFileInputFormat" ),
614+ outputFormat = Option (" org.apache.hadoop.mapred.SequenceFileOutputFormat" ),
615+ serde = Option (" org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ))
616+ } else if (" RCFile" .equalsIgnoreCase(
617+ hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTFILEFORMAT ))) {
618+ tableDesc.copy(
619+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileInputFormat" ),
620+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileOutputFormat" ),
621+ serde = Option (hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTRCFILESERDE )))
622+ } else if (" ORC" .equalsIgnoreCase(
623+ hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTFILEFORMAT ))) {
624+ tableDesc.copy(
625+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" ),
626+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ),
627+ serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
628+ } else if (" PARQUET" .equalsIgnoreCase(
629+ hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTFILEFORMAT ))) {
630+ tableDesc.copy(
631+ inputFormat =
632+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
633+ outputFormat =
634+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ),
635+ serde =
636+ Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
637+ } else {
638+ tableDesc.copy(
639+ inputFormat =
640+ Option (" org.apache.hadoop.mapred.TextInputFormat" ),
641+ outputFormat =
642+ Option (" org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat" ))
643+ }
644+
645+ children.collect {
646+ case list @ Token (" TOK_TABCOLLIST" , _) =>
647+ val cols = BaseSemanticAnalyzer .getColumns(list, true )
648+ if (cols != null ) {
649+ tableDesc = tableDesc.copy(
650+ schema = cols.map { field =>
651+ HiveColumn (field.getName, field.getType, field.getComment)
652+ })
653+ }
654+ case Token (" TOK_TABLECOMMENT" , child :: Nil ) =>
655+ val comment = BaseSemanticAnalyzer .unescapeSQLString(child.getText)
656+ // TODO support the sql text
657+ tableDesc = tableDesc.copy(viewText = Option (comment))
658+ case Token (" TOK_TABLEPARTCOLS" , list @ Token (" TOK_TABCOLLIST" , _) :: Nil ) =>
659+ val cols = BaseSemanticAnalyzer .getColumns(list(0 ), false )
660+ if (cols != null ) {
661+ tableDesc = tableDesc.copy(
662+ partitionColumns = cols.map { field =>
663+ HiveColumn (field.getName, field.getType, field.getComment)
664+ })
665+ }
666+ case Token (" TOK_TABLEROWFORMAT" , Token (" TOK_SERDEPROPS" , child :: Nil ) :: Nil )=>
667+ val serdeParams = new java.util.HashMap [String , String ]()
668+ child match {
669+ case Token (" TOK_TABLEROWFORMATFIELD" , rowChild1 :: rowChild2) =>
670+ val fieldDelim = BaseSemanticAnalyzer .unescapeSQLString (rowChild1.getText())
671+ serdeParams.put(serdeConstants.FIELD_DELIM , fieldDelim)
672+ serdeParams.put(serdeConstants.SERIALIZATION_FORMAT , fieldDelim)
673+ if (rowChild2.length > 1 ) {
674+ val fieldEscape = BaseSemanticAnalyzer .unescapeSQLString (rowChild2(0 ).getText)
675+ serdeParams.put(serdeConstants.ESCAPE_CHAR , fieldEscape)
676+ }
677+ case Token (" TOK_TABLEROWFORMATCOLLITEMS" , rowChild :: Nil ) =>
678+ val collItemDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
679+ serdeParams.put(serdeConstants.COLLECTION_DELIM , collItemDelim)
680+ case Token (" TOK_TABLEROWFORMATMAPKEYS" , rowChild :: Nil ) =>
681+ val mapKeyDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
682+ serdeParams.put(serdeConstants.MAPKEY_DELIM , mapKeyDelim)
683+ case Token (" TOK_TABLEROWFORMATLINES" , rowChild :: Nil ) =>
684+ val lineDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
685+ if (! (lineDelim == " \n " ) && ! (lineDelim == " 10" )) {
686+ throw new AnalysisException (
687+ SemanticAnalyzer .generateErrorMessage(
688+ rowChild,
689+ ErrorMsg .LINES_TERMINATED_BY_NON_NEWLINE .getMsg))
690+ }
691+ serdeParams.put(serdeConstants.LINE_DELIM , lineDelim)
692+ case Token (" TOK_TABLEROWFORMATNULL" , rowChild :: Nil ) =>
693+ val nullFormat = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
694+ // TODO support the nullFormat
695+ case _ => assert(false )
696+ }
597697 tableDesc = tableDesc.copy(
598- outputFormat = Option (classOf [RCFileOutputFormat ].getName),
599- inputFormat = Option (classOf [RCFileInputFormat [_, _]].getName))
698+ serdeProperties = tableDesc.serdeProperties ++ serdeParams)
699+ case Token (" TOK_TABLELOCATION" , child :: Nil ) =>
700+ var location = BaseSemanticAnalyzer .unescapeSQLString(child.getText)
701+ location = EximUtil .relativeToAbsolutePath(hiveConf, location)
702+ tableDesc = tableDesc.copy(location = Option (location))
703+ case Token (" TOK_TABLESERIALIZER" , child :: Nil ) =>
704+ tableDesc = tableDesc.copy(
705+ serde = Option (BaseSemanticAnalyzer .unescapeSQLString(child.getChild(0 ).getText)))
706+ if (child.getChildCount == 2 ) {
707+ val serdeParams = new java.util.HashMap [String , String ]()
708+ BaseSemanticAnalyzer .readProps(
709+ (child.getChild(1 ).getChild(0 )).asInstanceOf [ASTNode ], serdeParams)
710+ tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
711+ }
712+ case Token (" TOK_FILEFORMAT_GENERIC" , child :: Nil ) =>
713+ throw new SemanticException (
714+ " Unrecognized file format in STORED AS clause:${child.getText}" )
600715
716+ case Token (" TOK_TBLRCFILE" , Nil ) =>
717+ tableDesc = tableDesc.copy(
718+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileInputFormat" ),
719+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileOutputFormat" ))
601720 if (tableDesc.serde.isEmpty) {
602721 tableDesc = tableDesc.copy(
603722 serde = Option (" org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe" ))
604723 }
724+
605725 case Token (" TOK_TBLORCFILE" , Nil ) =>
606726 tableDesc = tableDesc.copy(
607727 inputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" ),
608- outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ),
609- serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
728+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ))
729+ if (tableDesc.serde.isEmpty) {
730+ tableDesc = tableDesc.copy(
731+ serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
732+ }
610733
611734 case Token (" TOK_TBLPARQUETFILE" , Nil ) =>
612735 tableDesc = tableDesc.copy(
613- inputFormat = Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
614- outputFormat = Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ),
615- serde = Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
736+ inputFormat =
737+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
738+ outputFormat =
739+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ))
740+ if (tableDesc.serde.isEmpty) {
741+ tableDesc = tableDesc.copy(
742+ serde = Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
743+ }
616744
617745 case Token (" TOK_TABLESERIALIZER" ,
618746 Token (" TOK_SERDENAME" , Token (serdeName, Nil ) :: otherProps) :: Nil ) =>
@@ -627,13 +755,26 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
627755
628756 case Token (" TOK_TABLEPROPERTIES" , list :: Nil ) =>
629757 tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
758+ case list @ Token (" TOK_TABLEFILEFORMAT" , _) =>
759+ tableDesc = tableDesc.copy(
760+ inputFormat =
761+ Option (BaseSemanticAnalyzer .unescapeSQLString(list.getChild(0 ).getText)),
762+ outputFormat =
763+ Option (BaseSemanticAnalyzer .unescapeSQLString(list.getChild(1 ).getText)))
764+ case Token (" TOK_STORAGEHANDLER" , _) =>
765+ throw new AnalysisException (ErrorMsg .CREATE_NON_NATIVE_AS .getMsg())
766+ case _ => // Unsupport features
767+ }
630768
631- case _ =>
769+ if (tableDesc.serde.isEmpty) {
770+ // add default serde
771+ tableDesc = tableDesc.copy(
772+ serde = Some (" org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" ))
632773 }
633774
634775 CreateTableAsSelect (tableDesc, nodeToPlan(query), allowExisting != None )
635776
636- // If its not a "CREATE TABLE AS " like above then just pass it back to hive as a native command.
777+ // If its not a "CTAS " like above then take it as a native command
637778 case Token (" TOK_CREATETABLE" , _) => NativePlaceholder
638779
639780 // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
0 commit comments