@@ -22,14 +22,15 @@ import java.sql.Date
2222import scala .collection .mutable .ArrayBuffer
2323
2424import org .apache .hadoop .hive .conf .HiveConf
25- import org .apache .hadoop .hive .ql .Context
25+ import org .apache .hadoop .hive .serde .serdeConstants
26+ import org .apache .hadoop .hive .ql .{ErrorMsg , Context }
2627import org .apache .hadoop .hive .ql .exec .{FunctionRegistry , FunctionInfo }
2728import org .apache .hadoop .hive .ql .lib .Node
28- import org .apache .hadoop .hive .ql .metadata .Table
2929import org .apache .hadoop .hive .ql .parse ._
3030import org .apache .hadoop .hive .ql .plan .PlanUtils
31- import org .apache .spark . sql . AnalysisException
31+ import org .apache .hadoop . hive . ql . session . SessionState
3232
33+ import org .apache .spark .sql .AnalysisException
3334import org .apache .spark .sql .catalyst .analysis ._
3435import org .apache .spark .sql .catalyst .expressions ._
3536import org .apache .spark .sql .catalyst .plans ._
@@ -62,7 +63,13 @@ case class CreateTableAsSelect(
6263 allowExisting : Boolean ) extends UnaryNode with Command {
6364
6465 override def output : Seq [Attribute ] = Seq .empty[Attribute ]
65- override lazy val resolved : Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
66+ override lazy val resolved : Boolean =
67+ tableDesc.specifiedDatabase.isDefined &&
68+ tableDesc.schema.size > 0 &&
69+ tableDesc.serde.isDefined &&
70+ tableDesc.inputFormat.isDefined &&
71+ tableDesc.outputFormat.isDefined &&
72+ childrenResolved
6673}
6774
6875/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -240,12 +247,23 @@ private[hive] object HiveQl {
240247 * Otherwise, there will be Null pointer exception,
241248 * when retrieving properties form HiveConf.
242249 */
243- val hContext = new Context (new HiveConf () )
250+ val hContext = new Context (hiveConf )
244251 val node = ParseUtils .findRootNonNullToken((new ParseDriver ).parse(sql, hContext))
245252 hContext.clear()
246253 node
247254 }
248255
256+ /**
257+ * Returns the HiveConf
258+ */
259+ private [this ] def hiveConf (): HiveConf = {
260+ val ss = SessionState .get() // SessionState is lazy initializaion, it can be null here
261+ if (ss == null ) {
262+ new HiveConf ()
263+ } else {
264+ ss.getConf
265+ }
266+ }
249267
250268 /** Returns a LogicalPlan for a given HiveQL string. */
251269 def parseSql (sql : String ): LogicalPlan = hqlParser.parse(sql)
@@ -476,8 +494,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
476494 DropTable (tableName, ifExists.nonEmpty)
477495 // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
478496 case Token (" TOK_ANALYZE" ,
479- Token (" TOK_TAB" , Token (" TOK_TABNAME" , tableNameParts) :: partitionSpec) ::
480- isNoscan) =>
497+ Token (" TOK_TAB" , Token (" TOK_TABNAME" , tableNameParts) :: partitionSpec) ::
498+ isNoscan) =>
481499 // Reference:
482500 // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
483501 if (partitionSpec.nonEmpty) {
@@ -547,13 +565,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
547565 val (
548566 Some (tableNameParts) ::
549567 _ /* likeTable */ ::
568+ externalTable ::
550569 Some (query) ::
551570 allowExisting +:
552571 ignores) =
553572 getClauses(
554573 Seq (
555574 " TOK_TABNAME" ,
556575 " TOK_LIKETABLE" ,
576+ " EXTERNAL" ,
557577 " TOK_QUERY" ,
558578 " TOK_IFNOTEXISTS" ,
559579 " TOK_TABLECOMMENT" ,
@@ -576,43 +596,153 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
576596 children)
577597 val (db, tableName) = extractDbNameTableName(tableNameParts)
578598
579- var tableDesc =
580- HiveTable (
581- specifiedDatabase = db,
582- name = tableName,
583- schema = Seq .empty,
584- partitionColumns = Seq .empty,
585- properties = Map .empty,
586- serdeProperties = Map .empty,
587- tableType = ManagedTable ,
588- location = None ,
589- inputFormat = None ,
590- outputFormat = None ,
591- serde = None )
592-
593- // TODO: Handle all the cases here...
594- children.foreach {
595- case Token (" TOK_TBLRCFILE" , Nil ) =>
596- import org .apache .hadoop .hive .ql .io .{RCFileInputFormat , RCFileOutputFormat }
599+ // TODO add bucket support
600+ var tableDesc : HiveTable = HiveTable (
601+ specifiedDatabase = db,
602+ name = tableName,
603+ schema = Seq .empty[HiveColumn ],
604+ partitionColumns = Seq .empty[HiveColumn ],
605+ properties = Map [String , String ](),
606+ serdeProperties = Map [String , String ](),
607+ tableType = if (externalTable.isDefined) ExternalTable else ManagedTable ,
608+ location = None ,
609+ inputFormat = None ,
610+ outputFormat = None ,
611+ serde = None ,
612+ viewText = None )
613+
614+ // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
615+ val defaultStorageType = hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTFILEFORMAT )
616+ // handle the default format for the storage type abbriviation
617+ tableDesc = if (" SequenceFile" .equalsIgnoreCase(defaultStorageType)) {
618+ tableDesc.copy(
619+ inputFormat = Option (" org.apache.hadoop.mapred.SequenceFileInputFormat" ),
620+ outputFormat = Option (" org.apache.hadoop.mapred.SequenceFileOutputFormat" ))
621+ } else if (" RCFile" .equalsIgnoreCase(defaultStorageType)) {
622+ tableDesc.copy(
623+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileInputFormat" ),
624+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileOutputFormat" ),
625+ serde = Option (hiveConf.getVar(HiveConf .ConfVars .HIVEDEFAULTRCFILESERDE )))
626+ } else if (" ORC" .equalsIgnoreCase(defaultStorageType)) {
627+ tableDesc.copy(
628+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" ),
629+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ),
630+ serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
631+ } else if (" PARQUET" .equalsIgnoreCase(defaultStorageType)) {
632+ tableDesc.copy(
633+ inputFormat =
634+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
635+ outputFormat =
636+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ),
637+ serde =
638+ Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
639+ } else {
640+ tableDesc.copy(
641+ inputFormat =
642+ Option (" org.apache.hadoop.mapred.TextInputFormat" ),
643+ outputFormat =
644+ Option (" org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat" ))
645+ }
646+
647+ children.collect {
648+ case list @ Token (" TOK_TABCOLLIST" , _) =>
649+ val cols = BaseSemanticAnalyzer .getColumns(list, true )
650+ if (cols != null ) {
651+ tableDesc = tableDesc.copy(
652+ schema = cols.map { field =>
653+ HiveColumn (field.getName, field.getType, field.getComment)
654+ })
655+ }
656+ case Token (" TOK_TABLECOMMENT" , child :: Nil ) =>
657+ val comment = BaseSemanticAnalyzer .unescapeSQLString(child.getText)
658+ // TODO support the sql text
659+ tableDesc = tableDesc.copy(viewText = Option (comment))
660+ case Token (" TOK_TABLEPARTCOLS" , list @ Token (" TOK_TABCOLLIST" , _) :: Nil ) =>
661+ val cols = BaseSemanticAnalyzer .getColumns(list(0 ), false )
662+ if (cols != null ) {
663+ tableDesc = tableDesc.copy(
664+ partitionColumns = cols.map { field =>
665+ HiveColumn (field.getName, field.getType, field.getComment)
666+ })
667+ }
668+ case Token (" TOK_TABLEROWFORMAT" , Token (" TOK_SERDEPROPS" , child :: Nil ) :: Nil )=>
669+ val serdeParams = new java.util.HashMap [String , String ]()
670+ child match {
671+ case Token (" TOK_TABLEROWFORMATFIELD" , rowChild1 :: rowChild2) =>
672+ val fieldDelim = BaseSemanticAnalyzer .unescapeSQLString (rowChild1.getText())
673+ serdeParams.put(serdeConstants.FIELD_DELIM , fieldDelim)
674+ serdeParams.put(serdeConstants.SERIALIZATION_FORMAT , fieldDelim)
675+ if (rowChild2.length > 1 ) {
676+ val fieldEscape = BaseSemanticAnalyzer .unescapeSQLString (rowChild2(0 ).getText)
677+ serdeParams.put(serdeConstants.ESCAPE_CHAR , fieldEscape)
678+ }
679+ case Token (" TOK_TABLEROWFORMATCOLLITEMS" , rowChild :: Nil ) =>
680+ val collItemDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
681+ serdeParams.put(serdeConstants.COLLECTION_DELIM , collItemDelim)
682+ case Token (" TOK_TABLEROWFORMATMAPKEYS" , rowChild :: Nil ) =>
683+ val mapKeyDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
684+ serdeParams.put(serdeConstants.MAPKEY_DELIM , mapKeyDelim)
685+ case Token (" TOK_TABLEROWFORMATLINES" , rowChild :: Nil ) =>
686+ val lineDelim = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
687+ if (! (lineDelim == " \n " ) && ! (lineDelim == " 10" )) {
688+ throw new AnalysisException (
689+ SemanticAnalyzer .generateErrorMessage(
690+ rowChild,
691+ ErrorMsg .LINES_TERMINATED_BY_NON_NEWLINE .getMsg))
692+ }
693+ serdeParams.put(serdeConstants.LINE_DELIM , lineDelim)
694+ case Token (" TOK_TABLEROWFORMATNULL" , rowChild :: Nil ) =>
695+ val nullFormat = BaseSemanticAnalyzer .unescapeSQLString(rowChild.getText)
696+ // TODO support the nullFormat
697+ case _ => assert(false )
698+ }
699+ tableDesc = tableDesc.copy(
700+ serdeProperties = tableDesc.serdeProperties ++ serdeParams)
701+ case Token (" TOK_TABLELOCATION" , child :: Nil ) =>
702+ var location = BaseSemanticAnalyzer .unescapeSQLString(child.getText)
703+ location = EximUtil .relativeToAbsolutePath(hiveConf, location)
704+ tableDesc = tableDesc.copy(location = Option (location))
705+ case Token (" TOK_TABLESERIALIZER" , child :: Nil ) =>
597706 tableDesc = tableDesc.copy(
598- outputFormat = Option (classOf [RCFileOutputFormat ].getName),
599- inputFormat = Option (classOf [RCFileInputFormat [_, _]].getName))
707+ serde = Option (BaseSemanticAnalyzer .unescapeSQLString(child.getChild(0 ).getText)))
708+ if (child.getChildCount == 2 ) {
709+ val serdeParams = new java.util.HashMap [String , String ]()
710+ BaseSemanticAnalyzer .readProps(
711+ (child.getChild(1 ).getChild(0 )).asInstanceOf [ASTNode ], serdeParams)
712+ tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
713+ }
714+ case Token (" TOK_FILEFORMAT_GENERIC" , child :: Nil ) =>
715+ throw new SemanticException (
716+ " Unrecognized file format in STORED AS clause:${child.getText}" )
600717
718+ case Token (" TOK_TBLRCFILE" , Nil ) =>
719+ tableDesc = tableDesc.copy(
720+ inputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileInputFormat" ),
721+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.RCFileOutputFormat" ))
601722 if (tableDesc.serde.isEmpty) {
602723 tableDesc = tableDesc.copy(
603724 serde = Option (" org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe" ))
604725 }
726+
605727 case Token (" TOK_TBLORCFILE" , Nil ) =>
606728 tableDesc = tableDesc.copy(
607729 inputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" ),
608- outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ),
609- serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
730+ outputFormat = Option (" org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" ))
731+ if (tableDesc.serde.isEmpty) {
732+ tableDesc = tableDesc.copy(
733+ serde = Option (" org.apache.hadoop.hive.ql.io.orc.OrcSerde" ))
734+ }
610735
611736 case Token (" TOK_TBLPARQUETFILE" , Nil ) =>
612737 tableDesc = tableDesc.copy(
613- inputFormat = Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
614- outputFormat = Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ),
615- serde = Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
738+ inputFormat =
739+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" ),
740+ outputFormat =
741+ Option (" org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" ))
742+ if (tableDesc.serde.isEmpty) {
743+ tableDesc = tableDesc.copy(
744+ serde = Option (" org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ))
745+ }
616746
617747 case Token (" TOK_TABLESERIALIZER" ,
618748 Token (" TOK_SERDENAME" , Token (serdeName, Nil ) :: otherProps) :: Nil ) =>
@@ -627,13 +757,20 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
627757
628758 case Token (" TOK_TABLEPROPERTIES" , list :: Nil ) =>
629759 tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
630-
631- case _ =>
760+ case list @ Token (" TOK_TABLEFILEFORMAT" , _) =>
761+ tableDesc = tableDesc.copy(
762+ inputFormat =
763+ Option (BaseSemanticAnalyzer .unescapeSQLString(list.getChild(0 ).getText)),
764+ outputFormat =
765+ Option (BaseSemanticAnalyzer .unescapeSQLString(list.getChild(1 ).getText)))
766+ case Token (" TOK_STORAGEHANDLER" , _) =>
767+ throw new AnalysisException (ErrorMsg .CREATE_NON_NATIVE_AS .getMsg())
768+ case _ => // Unsupport features
632769 }
633770
634771 CreateTableAsSelect (tableDesc, nodeToPlan(query), allowExisting != None )
635772
636- // If its not a "CREATE TABLE AS " like above then just pass it back to hive as a native command.
773+ // If its not a "CTAS " like above then take it as a native command
637774 case Token (" TOK_CREATETABLE" , _) => NativePlaceholder
638775
639776 // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
0 commit comments