From a8d3756b88414e1c85acb2a0c6ce6abe7c894a58 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 1 Mar 2015 19:39:54 +0800 Subject: [PATCH 1/5] Combine two CreateTableAsSelect case blocks. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 58 ++++++------------- .../org/apache/spark/sql/hive/HiveQl.scala | 7 ++- 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d3ad36432826..ba5546540f18 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -523,7 +523,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // TODO extra is in type of ASTNode which means the logical plan is not resolved // Need to think about how to implement the CreateTableAsSelect.resolved - case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => + case CreateTableAsSelect(db, tableName, child, allowExisting, extraInfo: Option[ASTNode]) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) @@ -531,19 +531,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) { None } else { - val sa = new SemanticAnalyzer(hive.hiveconf) { - override def analyzeInternal(ast: ASTNode) { - // A hack to intercept the SemanticAnalyzer.analyzeInternal, - // to ignore the SELECT clause of the CTAS - val method = classOf[SemanticAnalyzer].getDeclaredMethod( - "analyzeCreateTable", classOf[ASTNode], classOf[QB]) - method.setAccessible(true) - method.invoke(this, ast, this.getQB) - } + extraInfo match { + case Some(extra: ASTNode) => + val sa = new SemanticAnalyzer(hive.hiveconf) { + override def analyzeInternal(ast: ASTNode) { + // A hack to intercept the SemanticAnalyzer.analyzeInternal, + // to ignore the SELECT clause of the CTAS + val method = classOf[SemanticAnalyzer].getDeclaredMethod( + "analyzeCreateTable", classOf[ASTNode], classOf[QB]) + method.setAccessible(true) + method.invoke(this, ast, this.getQB) + } + } + + sa.analyze(extra, new Context(hive.hiveconf)) + Some(sa.getQB().getTableDesc) + case None => None } - - sa.analyze(extra, new Context(hive.hiveconf)) - Some(sa.getQB().getTableDesc) } // Check if the query specifies file format or storage handler. @@ -581,34 +585,6 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } case p: LogicalPlan if p.resolved => p - - case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => - val (dbName, tblName) = processDatabaseAndTableName(db, tableName) - if (hive.convertCTAS) { - if (dbName.isDefined) { - throw new AnalysisException( - "Cannot specify database name in a CTAS statement " + - "when spark.sql.hive.convertCTAS is set to true.") - } - - val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableUsingAsSelect( - tblName, - hive.conf.defaultDataSourceName, - temporary = false, - mode, - options = Map.empty[String, String], - child - ) - } else { - val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - execution.CreateTableAsSelect( - databaseName, - tableName, - child, - allowExisting, - None) - } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 98263f602e9e..be66fe70b0ca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -556,8 +556,11 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLEPROPERTIES"), children) val (db, tableName) = extractDbNameTableName(tableNameParts) - - CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) + if (ignores.exists(_ != None)) { + CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) + } else { + CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, None) + } // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. case Token("TOK_CREATETABLE", _) => NativePlaceholder From 5b459994251c73a42aef7193743b4dd91f9bc49b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 18 Mar 2015 11:58:48 +0800 Subject: [PATCH 2/5] For comment. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 30 ++++++++----------- .../org/apache/spark/sql/hive/HiveQl.scala | 6 +--- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 227918359cb4..e29b61c61b20 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // TODO extra is in type of ASTNode which means the logical plan is not resolved // Need to think about how to implement the CreateTableAsSelect.resolved - case CreateTableAsSelect(db, tableName, child, allowExisting, extraInfo: Option[ASTNode]) => + case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) @@ -535,23 +535,19 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val desc: Option[CreateTableDesc] = if (tableExists(Seq(databaseName, tblName))) { None } else { - extraInfo match { - case Some(extra: ASTNode) => - val sa = new SemanticAnalyzer(hive.hiveconf) { - override def analyzeInternal(ast: ASTNode) { - // A hack to intercept the SemanticAnalyzer.analyzeInternal, - // to ignore the SELECT clause of the CTAS - val method = classOf[SemanticAnalyzer].getDeclaredMethod( - "analyzeCreateTable", classOf[ASTNode], classOf[QB]) - method.setAccessible(true) - method.invoke(this, ast, this.getQB) - } - } - - sa.analyze(extra, new Context(hive.hiveconf)) - Some(sa.getQB().getTableDesc) - case None => None + val sa = new SemanticAnalyzer(hive.hiveconf) { + override def analyzeInternal(ast: ASTNode) { + // A hack to intercept the SemanticAnalyzer.analyzeInternal, + // to ignore the SELECT clause of the CTAS + val method = classOf[SemanticAnalyzer].getDeclaredMethod( + "analyzeCreateTable", classOf[ASTNode], classOf[QB]) + method.setAccessible(true) + method.invoke(this, ast, this.getQB) + } } + + sa.analyze(extra, new Context(hive.hiveconf)) + Some(sa.getQB().getTableDesc) } // Check if the query specifies file format or storage handler. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 15ffde43a370..df39e4eecc8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -557,11 +557,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLEPROPERTIES"), children) val (db, tableName) = extractDbNameTableName(tableNameParts) - if (ignores.exists(_ != None)) { - CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) - } else { - CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, None) - } + CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. case Token("TOK_CREATETABLE", _) => NativePlaceholder From 5b611cb5b3cbdcd39ce08c15ead83921866d1c5d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 24 Mar 2015 00:54:41 +0800 Subject: [PATCH 3/5] For comment. --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 384fe53a6836..e530b49e86ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -142,7 +142,7 @@ case class CreateTableAsSelect[T]( tableName: String, child: LogicalPlan, allowExisting: Boolean, - desc: Option[T] = None) extends UnaryNode { + desc: T) extends UnaryNode { override def output = Seq.empty[Attribute] override lazy val resolved = databaseName != None && childrenResolved } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index efe28e53fde3..ac0a31094ffe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // TODO extra is in type of ASTNode which means the logical plan is not resolved // Need to think about how to implement the CreateTableAsSelect.resolved - case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => + case CreateTableAsSelect(db, tableName, child, allowExisting, extra: ASTNode) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) @@ -545,7 +545,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with method.invoke(this, ast, this.getQB) } } - + sa.analyze(extra, new Context(hive.hiveconf)) Some(sa.getQB().getTableDesc) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 0abaa55efa91..28b11993b169 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -557,7 +557,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLEPROPERTIES"), children) val (db, tableName) = extractDbNameTableName(tableNameParts) - CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node)) + CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, node) // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. case Token("TOK_CREATETABLE", _) => NativePlaceholder From 4389607fc63dc4bf00942f991f7bfa394c809f22 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 24 Mar 2015 10:02:01 +0800 Subject: [PATCH 4/5] Add specific node CreateHiveTableAsSelect and remove type parameter of CreateTableAsSelect. --- .../sql/catalyst/plans/logical/basicOperators.scala | 13 +++++++------ .../main/scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +++++++++++-- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e530b49e86ce..627829be2f68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -137,12 +137,13 @@ case class InsertIntoTable( } } -case class CreateTableAsSelect[T]( - databaseName: Option[String], - tableName: String, - child: LogicalPlan, - allowExisting: Boolean, - desc: T) extends UnaryNode { +trait CreateTableAsSelect extends UnaryNode { + self: Product => + def databaseName: Option[String] + def tableName: String + def child: LogicalPlan + def allowExisting: Boolean + override def output = Seq.empty[Attribute] override lazy val resolved = databaseName != None && childrenResolved } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5aece166aad2..2ade95d8bb5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -141,7 +141,7 @@ class DataFrame private[sql]( // happen right away to let these side effects take place eagerly. case _: Command | _: InsertIntoTable | - _: CreateTableAsSelect[_] | + _: CreateTableAsSelect | _: CreateTableUsingAsSelect | _: WriteToFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ac0a31094ffe..0573bfb1ac62 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -24,6 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable} import org.apache.hadoop.hive.metastore.{TableType, Warehouse} import org.apache.hadoop.hive.ql.metadata._ +import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe @@ -527,7 +528,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // TODO extra is in type of ASTNode which means the logical plan is not resolved // Need to think about how to implement the CreateTableAsSelect.resolved - case CreateTableAsSelect(db, tableName, child, allowExisting, extra: ASTNode) => + case CreateHiveTableAsSelect(db, tableName, child, allowExisting, extra) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) @@ -546,7 +547,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } - sa.analyze(extra, new Context(hive.hiveconf)) + sa.analyze(extra.asInstanceOf[ASTNode], new Context(hive.hiveconf)) Some(sa.getQB().getTableDesc) } @@ -663,6 +664,14 @@ private[hive] case class InsertIntoHiveTable( } } +private[hive] case class CreateHiveTableAsSelect( + databaseName: Option[String], + tableName: String, + child: LogicalPlan, + allowExisting: Boolean, + desc: Node) extends CreateTableAsSelect { +} + private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 28b11993b169..4d895ade33da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -557,7 +557,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLEPROPERTIES"), children) val (db, tableName) = extractDbNameTableName(tableNameParts) - CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, node) + CreateHiveTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, node) // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. case Token("TOK_CREATETABLE", _) => NativePlaceholder From 7d82ede1d0ca00259b741e241e90705017a68056 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 12 Apr 2015 14:37:40 +0800 Subject: [PATCH 5/5] Add UnaryCommand and make CreateTableAsSelect below it. --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- .../spark/sql/catalyst/plans/logical/commands.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d600355c0b23..3a5c2eef28b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -137,7 +137,7 @@ case class InsertIntoTable( } } -trait CreateTableAsSelect extends UnaryNode { +abstract class CreateTableAsSelect extends UnaryCommand { self: Product => def databaseName: Option[String] def tableName: String diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 45905f8ef98c..980ba007b5fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -27,3 +27,11 @@ abstract class Command extends LeafNode { self: Product => def output: Seq[Attribute] = Seq.empty } + +/** + * An UnaryNode that represents a non-query command to be executed by the system. + */ +abstract class UnaryCommand extends UnaryNode { + self: Product => + def output: Seq[Attribute] = Seq.empty +}