From 125f542da6d8f7d79646ed26bc805ddcfb5081a8 Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 16 Dec 2014 21:21:55 +0800 Subject: [PATCH 1/5] factory command in spark sql --- .../sql/catalyst/plans/logical/commands.scala | 17 ----- .../spark/sql/execution/SparkStrategies.scala | 11 ++-- .../apache/spark/sql/execution/commands.scala | 62 ++++++------------- .../apache/spark/sql/hive/HiveContext.scala | 10 +-- .../org/apache/spark/sql/hive/HiveQl.scala | 9 +++ .../spark/sql/hive/HiveStrategies.scala | 28 +++++---- .../org/apache/spark/sql/hive/TestHive.scala | 2 +- .../hive/execution/CreateTableAsSelect.scala | 21 ++----- .../execution/DescribeHiveTableCommand.scala | 16 ++--- .../hive/execution/InsertIntoHiveTable.scala | 15 +++-- .../sql/hive/execution/NativeCommand.scala | 16 ++--- .../spark/sql/hive/execution/commands.scala | 34 ++++------ .../spark/sql/hive/StatisticsSuite.scala | 1 - .../hive/execution/HiveComparisonTest.scala | 6 +- 14 files changed, 100 insertions(+), 148 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 1d513d778976..5a1863953eae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -29,23 +29,6 @@ abstract class Command extends LeafNode { def output: Seq[Attribute] = Seq.empty } -/** - * Returned for commands supported by a given parser, but not catalyst. In general these are DDL - * commands that are passed directly to another system. - */ -case class NativeCommand(cmd: String) extends Command { - override def output = - Seq(AttributeReference("result", StringType, nullable = false)()) -} - -/** - * Commands of the form "SET [key [= value] ]". - */ -case class DFSCommand(kv: Option[(String, Option[String])]) extends Command { - override def output = Seq( - AttributeReference("DFS output", StringType, nullable = false)()) -} - /** * * Commands of the form "SET [key [= value] ]". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1225d18857af..cad3eb0998d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -306,13 +306,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil case logical.SetCommand(kv) => - Seq(execution.SetCommand(kv, plan.output)(context)) + Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)(context))) case logical.ExplainCommand(logicalPlan, extended) => - Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) + Seq(ExecutedCommand( + execution.ExplainCommand(logicalPlan, plan.output, extended)(context))) case logical.CacheTableCommand(tableName, optPlan, isLazy) => - Seq(execution.CacheTableCommand(tableName, optPlan, isLazy)) + Seq(ExecutedCommand( + execution.CacheTableCommand(tableName, optPlan, isLazy))) case logical.UncacheTableCommand(tableName) => - Seq(execution.UncacheTableCommand(tableName)) + Seq(ExecutedCommand( + execution.UncacheTableCommand(tableName))) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index afe3f3f07440..15baf7b81d3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -26,27 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{SQLConf, SQLContext} -// TODO: DELETE ME... -trait Command { - this: SparkPlan => - - /** - * A concrete command should override this lazy field to wrap up any side effects caused by the - * command or any other computation that should be evaluated exactly once. The value of this field - * can be used as the contents of the corresponding RDD generated from the physical plan of this - * command. - * - * The `execute()` method of all the physical command classes should reference `sideEffectResult` - * so that the command can be executed eagerly right after the command query is created. - */ - protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row] - - override def executeCollect(): Array[Row] = sideEffectResult.toArray - - override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) -} - -// TODO: Replace command with runnable command. trait RunnableCommand extends logical.Command { self: Product => @@ -79,11 +58,12 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { * :: DeveloperApi :: */ @DeveloperApi -case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command with Logging { +case class SetCommand(kv: Option[(String, Option[String])], _output: Seq[Attribute])( + @transient context: SQLContext) extends RunnableCommand with Logging { + + override def output = _output - override protected lazy val sideEffectResult: Seq[Row] = kv match { + override def run(sqlContext: SQLContext) = kv match { // Configures the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => logWarning( @@ -114,8 +94,6 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut case Some((key, None)) => Seq(Row(s"$key=${context.getConf(key, "")}")) } - - override def otherCopyArgs = context :: Nil } /** @@ -128,12 +106,13 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut */ @DeveloperApi case class ExplainCommand( - logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)( - @transient context: SQLContext) - extends LeafNode with Command { + logicalPlan: LogicalPlan, _output: Seq[Attribute], extended: Boolean)( + @transient context: SQLContext) extends RunnableCommand { + + override def output = _output // Run through the optimizer to generate the physical plan. - override protected lazy val sideEffectResult: Seq[Row] = try { + override def run(sqlContext: SQLContext) = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. val queryExecution = context.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString @@ -142,8 +121,6 @@ case class ExplainCommand( } catch { case cause: TreeNodeException[_] => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } - - override def otherCopyArgs = context :: Nil } /** @@ -153,10 +130,9 @@ case class ExplainCommand( case class CacheTableCommand( tableName: String, plan: Option[LogicalPlan], - isLazy: Boolean) - extends LeafNode with Command { + isLazy: Boolean) extends RunnableCommand { - override protected lazy val sideEffectResult = { + override def run(sqlContext: SQLContext) = { import sqlContext._ plan.foreach(_.registerTempTable(tableName)) @@ -178,8 +154,9 @@ case class CacheTableCommand( * :: DeveloperApi :: */ @DeveloperApi -case class UncacheTableCommand(tableName: String) extends LeafNode with Command { - override protected lazy val sideEffectResult: Seq[Row] = { +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { sqlContext.table(tableName).unpersist() Seq.empty[Row] } @@ -191,11 +168,12 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command { +case class DescribeCommand(child: SparkPlan, _output: Seq[Attribute])( + @transient context: SQLContext) extends RunnableCommand { + + override def output = _output - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { Row("# Registered as a temporary table", null, null) +: child.output.map(field => Row(field.name, field.dataType.toString, null)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6008e46fefa8..c8c921f7d72b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} -import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand +import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} +import org.apache.spark.sql.hive.execution.{NativeCommand, DescribeHiveTableCommand} import org.apache.spark.sql.sources.DataSourceStrategy /** @@ -369,11 +369,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * execution is simply passed back to Hive. */ def stringResult(): Seq[String] = executedPlan match { - case describeHiveTableCommand: DescribeHiveTableCommand => + case ExecutedCommand(desc: DescribeHiveTableCommand) => // If it is a describe command for a Hive table, we want to have the output format // be similar with Hive. - describeHiveTableCommand.hiveString - case command: PhysicalCommand => + desc.hiveString + case command: ExecutedCommand => command.executeCollect().map(_.head.toString) case other => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index cd4e5a239ec6..26cce031b6cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -52,6 +52,15 @@ private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends private[hive] case class AnalyzeTable(tableName: String) extends Command +/** + * Returned for commands supported by a given parser, but not catalyst. In general these are DDL + * commands that are passed directly to another system. + */ +private[hive] case class NativeCommand(cmd: String) extends Command { + override def output = + Seq(AttributeReference("result", StringType, nullable = false)()) +} + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5f02e95ac3c3..38bf5f492010 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType -import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation @@ -183,19 +183,19 @@ private[hive] trait HiveStrategies { table, partition, planLater(child), overwrite)(hiveContext) :: Nil case logical.CreateTableAsSelect( Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) => - execution.CreateTableAsSelect( + ExecutedCommand(execution.CreateTableAsSelect( database, tableName, child, allowExisting, - Some(desc)) :: Nil + Some(desc))(hiveContext)) :: Nil case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) => - execution.CreateTableAsSelect( + ExecutedCommand(execution.CreateTableAsSelect( database, tableName, child, allowExisting, - None) :: Nil + None)(hiveContext)) :: Nil case _ => Nil } } @@ -226,23 +226,27 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil + case hive.NativeCommand(sql) => ExecutedCommand( + execution.NativeCommand(sql, plan.output)(context)) :: Nil - case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case hive.DropTable(tableName, ifExists) => ExecutedCommand( + execution.DropTable(tableName, ifExists)(context)) :: Nil - case hive.AddJar(path) => execution.AddJar(path) :: Nil + case hive.AddJar(path) => ExecutedCommand(execution.AddJar(path)(context)) :: Nil - case hive.AddFile(path) => execution.AddFile(path) :: Nil + case hive.AddFile(path) => ExecutedCommand(execution.AddFile(path)(context)) :: Nil - case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil + case hive.AnalyzeTable(tableName) => ExecutedCommand( + execution.AnalyzeTable(tableName)(context)) :: Nil case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { case t: MetastoreRelation => - Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) + ExecutedCommand( + DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) :: Nil case o: LogicalPlan => - Seq(DescribeCommand(planLater(o), describe.output)(context)) + ExecutedCommand(DescribeCommand(planLater(o), describe.output)(context)) :: Nil } case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index eedb57de52ba..792c9b88a7bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index b83689ceabb8..b7df5d3830f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.spark.annotation.Experimental -import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} -import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.MetastoreRelation @@ -44,11 +44,7 @@ case class CreateTableAsSelect( tableName: String, query: LogicalPlan, allowExisting: Boolean, - desc: Option[CreateTableDesc]) extends LeafNode with Command { - - def output = Seq.empty - - private[this] def sc = sqlContext.asInstanceOf[HiveContext] + desc: Option[CreateTableDesc])(@transient sc: HiveContext) extends RunnableCommand { // A lazy computing of the metastoreRelation private[this] lazy val metastoreRelation: MetastoreRelation = { @@ -61,7 +57,7 @@ case class CreateTableAsSelect( } } - override protected[sql] lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. @@ -78,13 +74,4 @@ case class CreateTableAsSelect( Seq.empty[Row] } - - override def execute(): RDD[Row] = { - sideEffectResult - sparkContext.emptyRDD[Row] - } - - override def argString: String = { - s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 5d98834c6fb3..18c29ddf106c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -22,11 +22,11 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} -import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.SQLContext /** * Implementation for "describe [extended] table". @@ -36,13 +36,13 @@ import org.apache.spark.sql.hive.HiveShim @DeveloperApi case class DescribeHiveTableCommand( table: MetastoreRelation, - output: Seq[Attribute], + _output: Seq[Attribute], isExtended: Boolean)( @transient context: HiveContext) - extends LeafNode with Command { + extends RunnableCommand { // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = sideEffectResult.map { + lazy val hiveString: Seq[String] = run(context).map { case Row(name: String, dataType: String, comment) => Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("")) @@ -50,7 +50,9 @@ case class DescribeHiveTableCommand( .mkString("\t") } - override protected lazy val sideEffectResult: Seq[Row] = { + override def output = _output + + override def run(sqlContext: SQLContext) = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil @@ -75,6 +77,4 @@ case class DescribeHiveTableCommand( Row(name, dataType, comment) } } - - override def otherCopyArgs = context :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 81390f626726..04b4d0bb7f56 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,7 +21,6 @@ import java.util import scala.collection.JavaConversions._ -import org.apache.hadoop.hive.common.`type`.HiveVarchar import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils @@ -31,14 +30,12 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ @@ -52,9 +49,7 @@ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], child: SparkPlan, - overwrite: Boolean) - (@transient sc: HiveContext) - extends UnaryNode with Command with HiveInspectors { + overwrite: Boolean)(@transient sc: HiveContext) extends UnaryNode with HiveInspectors { @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @@ -134,7 +129,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - override protected[sql] lazy val sideEffectResult: Seq[Row] = { + protected[sql] lazy val sideEffectResult: Seq[Row] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -256,4 +251,8 @@ case class InsertIntoHiveTable( // TODO: implement hive compatibility as rules. Seq.empty[Row] } + + override def executeCollect(): Array[Row] = sideEffectResult.toArray + + override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala index 6930c2babd11..3804377025b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala @@ -18,21 +18,21 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} -import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext /** * :: DeveloperApi :: */ @DeveloperApi case class NativeCommand( - sql: String, output: Seq[Attribute])( - @transient context: HiveContext) - extends LeafNode with Command { + sql: String, + _output: Seq[Attribute])(@transient context: HiveContext) + extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_)) + override def output = _output - override def otherCopyArgs = context :: Nil + override def run(sqlContext: SQLContext) = context.runSqlHive(sql).map(Row(_)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 903075edf7e0..9246ac535afb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext /** * :: DeveloperApi :: @@ -32,12 +32,9 @@ import org.apache.spark.sql.hive.HiveContext * in the Hive metastore. */ @DeveloperApi -case class AnalyzeTable(tableName: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] +case class AnalyzeTable(tableName: String)(hiveContext: HiveContext) extends RunnableCommand { - def output = Seq.empty - - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { hiveContext.analyze(tableName) Seq.empty[Row] } @@ -48,12 +45,11 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command { * Drops a table from the metastore and removes it if it is cached. */ @DeveloperApi -case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] - - def output = Seq.empty +case class DropTable( + tableName: String, + ifExists: Boolean)(hiveContext: HiveContext) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) @@ -65,12 +61,9 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with * :: DeveloperApi :: */ @DeveloperApi -case class AddJar(path: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] +case class AddJar(path: String)(hiveContext: HiveContext) extends RunnableCommand { - override def output = Seq.empty - - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { hiveContext.runSqlHive(s"ADD JAR $path") hiveContext.sparkContext.addJar(path) Seq.empty[Row] @@ -81,12 +74,9 @@ case class AddJar(path: String) extends LeafNode with Command { * :: DeveloperApi :: */ @DeveloperApi -case class AddFile(path: String) extends LeafNode with Command { - def hiveContext = sqlContext.asInstanceOf[HiveContext] - - override def output = Seq.empty +case class AddFile(path: String)(hiveContext: HiveContext) extends RunnableCommand { - override protected lazy val sideEffectResult: Seq[Row] = { + override def run(sqlContext: SQLContext) = { hiveContext.runSqlHive(s"ADD FILE $path") hiveContext.sparkContext.addFile(path) Seq.empty[Row] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a90fc023e67d..5192c2a2a2a8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -22,7 +22,6 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag import org.apache.spark.sql.{SQLConf, QueryTest} -import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 44eb4cfa5933..214eeb465d02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.hive.execution import java.io._ +import org.apache.spark.sql.hive import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand => LogicalNativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.test.TestHive @@ -142,14 +142,14 @@ abstract class HiveComparisonTest // Hack: Hive simply prints the result of a SET command to screen, // and does not return it as a query answer. case _: SetCommand => Seq("0") - case LogicalNativeCommand(c) if c.toLowerCase.contains("desc") => + case hive.NativeCommand(c) if c.toLowerCase.contains("desc") => answer .filterNot(nonDeterministicLine) .map(_.replaceAll("from deserializer", "")) .map(_.replaceAll("None", "")) .map(_.trim) .filterNot(_ == "") - case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") + case _: hive.NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case _: DescribeCommand => // Filter out non-deterministic lines and lines which do not have actual results but From 5d20010284a0969d04c6c5a0d68290ea514bfab8 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 17 Dec 2014 10:42:40 +0800 Subject: [PATCH 2/5] address comments --- .../org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../apache/spark/sql/execution/commands.scala | 47 +++++------ .../spark/sql/hive/ExtendedHiveQlParser.scala | 3 +- .../apache/spark/sql/hive/HiveContext.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +++- .../org/apache/spark/sql/hive/HiveQl.scala | 22 +----- .../spark/sql/hive/HiveStrategies.scala | 18 +---- .../org/apache/spark/sql/hive/TestHive.scala | 6 +- .../hive/execution/CreateTableAsSelect.scala | 4 + .../execution/DescribeHiveTableCommand.scala | 12 +-- ...eCommand.scala => HiveNativeCommand.scala} | 77 ++++++++++--------- .../spark/sql/hive/execution/commands.scala | 13 ++-- .../spark/sql/hive/StatisticsSuite.scala | 11 +-- .../hive/execution/HiveComparisonTest.scala | 5 +- 15 files changed, 118 insertions(+), 130 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/{NativeCommand.scala => HiveNativeCommand.scala} (70%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 31cc4170aa86..6c0a7b9bd36e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val strategies: Seq[Strategy] = extraStrategies ++ ( - CommandStrategy(self) :: + CommandStrategy :: DataSourceStrategy :: TakeOrdered :: HashAggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cad3eb0998d5..a203dbc7e0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -302,14 +302,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - case class CommandStrategy(context: SQLContext) extends Strategy { + case object CommandStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil case logical.SetCommand(kv) => - Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)(context))) + Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) case logical.ExplainCommand(logicalPlan, extended) => Seq(ExecutedCommand( - execution.ExplainCommand(logicalPlan, plan.output, extended)(context))) + execution.ExplainCommand(logicalPlan, plan.output, extended))) case logical.CacheTableCommand(tableName, optPlan, isLazy) => Seq(ExecutedCommand( execution.CacheTableCommand(tableName, optPlan, isLazy))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 15baf7b81d3a..b8fa4b019953 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -26,13 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{SQLConf, SQLContext} +/** + * A logical command that is executed for its side-effects. `RunnableCommand`s are + * wrapped in `ExecutedCommand` during execution. + */ trait RunnableCommand extends logical.Command { self: Product => - def output: Seq[Attribute] def run(sqlContext: SQLContext): Seq[Row] } +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + */ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { /** * A concrete command should override this lazy field to wrap up any side effects caused by the @@ -58,10 +65,9 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { * :: DeveloperApi :: */ @DeveloperApi -case class SetCommand(kv: Option[(String, Option[String])], _output: Seq[Attribute])( - @transient context: SQLContext) extends RunnableCommand with Logging { - - override def output = _output +case class SetCommand( + kv: Option[(String, Option[String])], + override val output: Seq[Attribute]) extends RunnableCommand with Logging { override def run(sqlContext: SQLContext) = kv match { // Configures the deprecated "mapred.reduce.tasks" property. @@ -69,30 +75,30 @@ case class SetCommand(kv: Option[(String, Option[String])], _output: Seq[Attribu logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - context.setConf(SQLConf.SHUFFLE_PARTITIONS, value) + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) // Configures a single property. case Some((key, Some(value))) => - context.setConf(key, value) + sqlContext.setConf(key, value) Seq(Row(s"$key=$value")) - // Queries all key-value pairs that are set in the SQLConf of the context. Notice that different - // from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties - // while "SET -v" returns all properties.) + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + // Notice that different from Hive, here "SET -v" is an alias of "SET". + // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) case Some(("-v", None)) | None => - context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq + sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq // Queries the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}")) // Queries a single property. case Some((key, None)) => - Seq(Row(s"$key=${context.getConf(key, "")}")) + Seq(Row(s"$key=${sqlContext.getConf(key, "")}")) } } @@ -106,15 +112,13 @@ case class SetCommand(kv: Option[(String, Option[String])], _output: Seq[Attribu */ @DeveloperApi case class ExplainCommand( - logicalPlan: LogicalPlan, _output: Seq[Attribute], extended: Boolean)( - @transient context: SQLContext) extends RunnableCommand { - - override def output = _output + logicalPlan: LogicalPlan, + override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand { // Run through the optimizer to generate the physical plan. override def run(sqlContext: SQLContext) = try { // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. - val queryExecution = context.executePlan(logicalPlan) + val queryExecution = sqlContext.executePlan(logicalPlan) val outputString = if (extended) queryExecution.toString else queryExecution.simpleString outputString.split("\n").map(Row(_)) @@ -168,10 +172,9 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand { * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, _output: Seq[Attribute])( - @transient context: SQLContext) extends RunnableCommand { - - override def output = _output +case class DescribeCommand( + child: SparkPlan, + override val output: Seq[Attribute]) extends RunnableCommand { override def run(sqlContext: SQLContext) = { Row("# Registered as a temporary table", null, null) +: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index 430ffb29989e..ebf7003ff9e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -21,6 +21,7 @@ import scala.language.implicitConversions import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical} +import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand} /** * A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions. @@ -52,7 +53,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser { protected lazy val dfs: Parser[LogicalPlan] = DFS ~> wholeInput ^^ { - case command => NativeCommand(command.trim) + case command => HiveNativeCommand(command.trim) } private lazy val addFile: Parser[LogicalPlan] = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c8c921f7d72b..177229d46766 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} -import org.apache.spark.sql.hive.execution.{NativeCommand, DescribeHiveTableCommand} +import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} import org.apache.spark.sql.sources.DataSourceStrategy /** @@ -340,7 +340,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override val strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, - CommandStrategy(self), + CommandStrategy, HiveCommandStrategy(self), TakeOrdered, ParquetOperations, @@ -386,7 +386,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def simpleString: String = logical match { - case _: NativeCommand => "" + case _: HiveNativeCommand => "" case _: SetCommand => "" case _ => super.simpleString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d8b10b78c6c5..140764fc550f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} +import org.apache.spark.sql.execution.SparkPlan + import scala.util.parsing.combinator.RegexParsers import org.apache.hadoop.util.ReflectionUtils @@ -286,14 +288,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(sa.getQB().getTableDesc) } - CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + desc)(SparkPlan.currentContext.get().asInstanceOf[HiveContext]) case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None) + execution.CreateTableAsSelect( + databaseName, + tableName, + child, + allowExisting, + None)(SparkPlan.currentContext.get().asInstanceOf[HiveContext]) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 26cce031b6cd..2c868df0eaaa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.types.decimal.Decimal +import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable} /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -44,23 +45,6 @@ import scala.collection.JavaConversions._ */ private[hive] case object NativePlaceholder extends Command -private[hive] case class AddFile(filePath: String) extends Command - -private[hive] case class AddJar(path: String) extends Command - -private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command - -private[hive] case class AnalyzeTable(tableName: String) extends Command - -/** - * Returned for commands supported by a given parser, but not catalyst. In general these are DDL - * commands that are passed directly to another system. - */ -private[hive] case class NativeCommand(cmd: String) extends Command { - override def output = - Seq(AttributeReference("result", StringType, nullable = false)()) -} - /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( @@ -248,10 +232,10 @@ private[hive] object HiveQl { try { val tree = getAst(sql) if (nativeCommands contains tree.getText) { - NativeCommand(sql) + HiveNativeCommand(sql) } else { nodeToPlan(tree) match { - case NativePlaceholder => NativeCommand(sql) + case NativePlaceholder => HiveNativeCommand(sql) case other => other } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 38bf5f492010..d6a3817d4066 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.ql.parse.ASTNode import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.spark.annotation.Experimental @@ -226,27 +225,14 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case hive.NativeCommand(sql) => ExecutedCommand( - execution.NativeCommand(sql, plan.output)(context)) :: Nil - - case hive.DropTable(tableName, ifExists) => ExecutedCommand( - execution.DropTable(tableName, ifExists)(context)) :: Nil - - case hive.AddJar(path) => ExecutedCommand(execution.AddJar(path)(context)) :: Nil - - case hive.AddFile(path) => ExecutedCommand(execution.AddFile(path)(context)) :: Nil - - case hive.AnalyzeTable(tableName) => ExecutedCommand( - execution.AnalyzeTable(tableName)(context)) :: Nil - case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { case t: MetastoreRelation => ExecutedCommand( - DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) :: Nil + DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil case o: LogicalPlan => - ExecutedCommand(DescribeCommand(planLater(o), describe.output)(context)) :: Nil + ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil } case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 792c9b88a7bf..b2149bd95a33 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.test import java.io.File import java.util.{Set => JavaSet} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.SessionState - import scala.collection.mutable import scala.language.implicitConversions @@ -41,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPl import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.hive.execution.HiveNativeCommand /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -161,7 +159,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { abstract class QueryExecution extends super.QueryExecution { override lazy val analyzed = { val describedTables = logical match { - case NativeCommand(describedTable(tbl)) => tbl :: Nil + case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil case CacheTableCommand(tbl, _, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index b7df5d3830f9..949146dc132d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -74,4 +74,8 @@ case class CreateTableAsSelect( Seq.empty[Row] } + + override def argString: String = { + s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index 18c29ddf106c..d3210337a24e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.{SparkPlan, RunnableCommand} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.SQLContext @@ -36,13 +36,11 @@ import org.apache.spark.sql.SQLContext @DeveloperApi case class DescribeHiveTableCommand( table: MetastoreRelation, - _output: Seq[Attribute], - isExtended: Boolean)( - @transient context: HiveContext) - extends RunnableCommand { + override val output: Seq[Attribute], + isExtended: Boolean) extends RunnableCommand { // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = run(context).map { + lazy val hiveString: Seq[String] = run(SparkPlan.currentContext.get()).map { case Row(name: String, dataType: String, comment) => Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("")) @@ -50,8 +48,6 @@ case class DescribeHiveTableCommand( .mkString("\t") } - override def output = _output - override def run(sqlContext: SQLContext) = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala similarity index 70% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala index 3804377025b1..d8469c61770f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -1,38 +1,39 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.SQLContext - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class NativeCommand( - sql: String, - _output: Seq[Attribute])(@transient context: HiveContext) - extends RunnableCommand { - - override def output = _output - - override def run(sqlContext: SQLContext) = context.runSqlHive(sql).map(Row(_)) -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.StringType + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class HiveNativeCommand( + sql: String) extends RunnableCommand { + + override def output = + Seq(AttributeReference("result", StringType, nullable = false)()) + + override def run(sqlContext: SQLContext) = + sqlContext.asInstanceOf[HiveContext].runSqlHive(sql).map(Row(_)) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 9246ac535afb..6fc4153f6a5d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.SQLContext * in the Hive metastore. */ @DeveloperApi -case class AnalyzeTable(tableName: String)(hiveContext: HiveContext) extends RunnableCommand { +case class AnalyzeTable(tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - hiveContext.analyze(tableName) + sqlContext.asInstanceOf[HiveContext].analyze(tableName) Seq.empty[Row] } } @@ -47,9 +47,10 @@ case class AnalyzeTable(tableName: String)(hiveContext: HiveContext) extends Run @DeveloperApi case class DropTable( tableName: String, - ifExists: Boolean)(hiveContext: HiveContext) extends RunnableCommand { + ifExists: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) @@ -61,9 +62,10 @@ case class DropTable( * :: DeveloperApi :: */ @DeveloperApi -case class AddJar(path: String)(hiveContext: HiveContext) extends RunnableCommand { +case class AddJar(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] hiveContext.runSqlHive(s"ADD JAR $path") hiveContext.sparkContext.addJar(path) Seq.empty[Row] @@ -74,9 +76,10 @@ case class AddJar(path: String)(hiveContext: HiveContext) extends RunnableComman * :: DeveloperApi :: */ @DeveloperApi -case class AddFile(path: String)(hiveContext: HiveContext) extends RunnableCommand { +case class AddFile(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] hiveContext.runSqlHive(s"ADD FILE $path") hiveContext.sparkContext.addFile(path) Seq.empty[Row] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5192c2a2a2a8..ff4071d8e2f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.{SQLConf, QueryTest} import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.execution._ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { TestHive.reset() @@ -50,19 +51,19 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan", - classOf[NativeCommand]) + classOf[HiveNativeCommand]) assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 214eeb465d02..8011f9b8773b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution import java.io._ -import org.apache.spark.sql.hive import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.Logging @@ -142,14 +141,14 @@ abstract class HiveComparisonTest // Hack: Hive simply prints the result of a SET command to screen, // and does not return it as a query answer. case _: SetCommand => Seq("0") - case hive.NativeCommand(c) if c.toLowerCase.contains("desc") => + case HiveNativeCommand(c) if c.toLowerCase.contains("desc") => answer .filterNot(nonDeterministicLine) .map(_.replaceAll("from deserializer", "")) .map(_.replaceAll("None", "")) .map(_.trim) .filterNot(_ == "") - case _: hive.NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") + case _: HiveNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case _: DescribeCommand => // Filter out non-deterministic lines and lines which do not have actual results but From 4033bed9934cb5b53b0119de8037f8584e5fc414 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 17 Dec 2014 22:04:32 +0800 Subject: [PATCH 3/5] remove CreateTableAsSelect in hivestrategy --- .../apache/spark/sql/hive/HiveStrategies.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d6a3817d4066..9ebb627715b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -180,21 +180,6 @@ private[hive] trait HiveStrategies { case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) => execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite)(hiveContext) :: Nil - case logical.CreateTableAsSelect( - Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) => - ExecutedCommand(execution.CreateTableAsSelect( - database, - tableName, - child, - allowExisting, - Some(desc))(hiveContext)) :: Nil - case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) => - ExecutedCommand(execution.CreateTableAsSelect( - database, - tableName, - child, - allowExisting, - None)(hiveContext)) :: Nil case _ => Nil } } From 0e03be86e595c0edac3923b8795aded706f28410 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 18 Dec 2014 09:44:38 +0800 Subject: [PATCH 4/5] address comments --- .../apache/spark/sql/hive/HiveContext.scala | 10 ++++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +-- .../spark/sql/hive/HiveStrategies.scala | 6 ++--- .../hive/execution/CreateTableAsSelect.scala | 25 +++++++++---------- .../execution/DescribeHiveTableCommand.scala | 9 ------- .../hive/execution/HiveNativeCommand.scala | 3 +-- .../hive/execution/InsertIntoHiveTable.scala | 5 ++-- 7 files changed, 27 insertions(+), 35 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 177229d46766..860738d9421f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} +import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} import org.apache.spark.sql.sources.DataSourceStrategy @@ -372,7 +372,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { case ExecutedCommand(desc: DescribeHiveTableCommand) => // If it is a describe command for a Hive table, we want to have the output format // be similar with Hive. - desc.hiveString + desc.run(self).map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") + } case command: ExecutedCommand => command.executeCollect().map(_.head.toString) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 140764fc550f..b31a3ec25096 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -293,7 +293,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tableName, child, allowExisting, - desc)(SparkPlan.currentContext.get().asInstanceOf[HiveContext]) + desc) case p: LogicalPlan if p.resolved => p @@ -305,7 +305,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tableName, child, allowExisting, - None)(SparkPlan.currentContext.get().asInstanceOf[HiveContext]) + None) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9ebb627715b3..16073e59ffe3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.ql.plan.CreateTableDesc - import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ @@ -176,10 +174,10 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite)(hiveContext) :: Nil + table, partition, planLater(child), overwrite) :: Nil case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite)(hiveContext) :: Nil + table, partition, planLater(child), overwrite) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 949146dc132d..6868c72bcc8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -44,24 +44,23 @@ case class CreateTableAsSelect( tableName: String, query: LogicalPlan, allowExisting: Boolean, - desc: Option[CreateTableDesc])(@transient sc: HiveContext) extends RunnableCommand { + desc: Option[CreateTableDesc]) extends RunnableCommand { - // A lazy computing of the metastoreRelation - private[this] lazy val metastoreRelation: MetastoreRelation = { - // Create Hive Table - sc.catalog.createTable(database, tableName, query.output, allowExisting, desc) + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + val metastoreRelation: MetastoreRelation = { + // Create Hive Table + hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc) - // Get the Metastore Relation - sc.catalog.lookupRelation(Some(database), tableName, None) match { - case r: MetastoreRelation => r + // Get the Metastore Relation + hiveContext.catalog.lookupRelation(Some(database), tableName, None) match { + case r: MetastoreRelation => r + } } - } - - override def run(sqlContext: SQLContext) = { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (sc.catalog.tableExists(Some(database), tableName)) { + if (hiveContext.catalog.tableExists(Some(database), tableName)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { @@ -69,7 +68,7 @@ case class CreateTableAsSelect( new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName") } } else { - sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd + hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index d3210337a24e..bfacc51ef57a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -39,15 +39,6 @@ case class DescribeHiveTableCommand( override val output: Seq[Attribute], isExtended: Boolean) extends RunnableCommand { - // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = run(SparkPlan.currentContext.get()).map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - override def run(sqlContext: SQLContext) = { // Trying to mimic the format of Hive's output. But not exactly the same. var results: Seq[(String, String, String)] = Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala index d8469c61770f..8ba818af5f9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -28,8 +28,7 @@ import org.apache.spark.sql.catalyst.types.StringType * :: DeveloperApi :: */ @DeveloperApi -case class HiveNativeCommand( - sql: String) extends RunnableCommand { +case class HiveNativeCommand(sql: String) extends RunnableCommand { override def output = Seq(AttributeReference("result", StringType, nullable = false)()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 04b4d0bb7f56..ca0ec1513917 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -49,8 +49,9 @@ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], child: SparkPlan, - overwrite: Boolean)(@transient sc: HiveContext) extends UnaryNode with HiveInspectors { + overwrite: Boolean) extends UnaryNode with HiveInspectors { + @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val db = Hive.get(sc.hiveconf) @@ -61,8 +62,6 @@ case class InsertIntoHiveTable( serializer } - override def otherCopyArgs = sc :: Nil - def output = child.output def saveAsHiveFile( From 51a82f2ae3fe9d28455940d953de7b76306f49b2 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 18 Dec 2014 16:53:18 +0800 Subject: [PATCH 5/5] fix test failure --- .../apache/spark/sql/hive/execution/CreateTableAsSelect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 6868c72bcc8a..fe21454e7fb3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -48,7 +48,7 @@ case class CreateTableAsSelect( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - val metastoreRelation: MetastoreRelation = { + lazy val metastoreRelation: MetastoreRelation = { // Create Hive Table hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc)