Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,6 @@ abstract class Command extends LeafNode {
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(AttributeReference("result", StringType, nullable = false)())
}

/**
* Commands of the form "SET [key [= value] ]".
*/
case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq(
AttributeReference("DFS output", StringType, nullable = false)())
}

/**
*
* Commands of the form "SET [key [= value] ]".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class SQLContext(@transient val sparkContext: SparkContext)

val strategies: Seq[Strategy] =
extraStrategies ++ (
CommandStrategy(self) ::
CommandStrategy ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

case class CommandStrategy(context: SQLContext) extends Strategy {
case object CommandStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we get rid of strategy entirely and just have this single line added to BasicOperators. That was kind of my goal with this refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try, maybe we can add a rule to translate logical.XXXCommand to RunnableCommand in analyzer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don think it needs to be in the Analyzer. I think just this single line in the BasicOperators Strategy should be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for set command, catalyst first parse it as logical.SetCommand, when to make it as execution.SetCommand (RunnableCommand)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not avoid this step to translate logical.XXXCommand to RunnableCommand, so keep CommandStrategy here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see the problem now. These commands still live inside of catalyst and thus cannot be RunnableCommands. I would argue that that should be changed. My whole goal here was to eliminate boilerplate from the planner and this still seems to have a bunch of it. That said, this could be done in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think we can parse command in sql/core instead of parsing in catalyst in the follow up PR

case logical.SetCommand(kv) =>
Seq(execution.SetCommand(kv, plan.output)(context))
Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
case logical.ExplainCommand(logicalPlan, extended) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
Seq(ExecutedCommand(
execution.ExplainCommand(logicalPlan, plan.output, extended)))
case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
Seq(ExecutedCommand(
execution.CacheTableCommand(tableName, optPlan, isLazy)))
case logical.UncacheTableCommand(tableName) =>
Seq(execution.UncacheTableCommand(tableName))
Seq(ExecutedCommand(
execution.UncacheTableCommand(tableName)))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,20 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SQLConf, SQLContext}

// TODO: DELETE ME...
trait Command {
this: SparkPlan =>

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]

override def executeCollect(): Array[Row] = sideEffectResult.toArray

override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

// TODO: Replace command with runnable command.
/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some Scala doc:

/**
 * A logical command that is executed for its side-effects.  `RunnableCommand`s are 
 * wrapped in `ExecutedCommand` during execution.
 */

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And below on ExecutedCommand.

/**
 * A physical operator that executes the run method of a `RunnableCommand` and
 * saves the result to prevent multiple executions.
 */

self: Product =>

def output: Seq[Attribute]
def run(sqlContext: SQLContext): Seq[Row]
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*/
case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
Expand All @@ -79,43 +65,41 @@ case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command with Logging {
case class SetCommand(
kv: Option[(String, Option[String])],
override val output: Seq[Attribute]) extends RunnableCommand with Logging {

override protected lazy val sideEffectResult: Seq[Row] = kv match {
override def run(sqlContext: SQLContext) = kv match {
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))

// Configures a single property.
case Some((key, Some(value))) =>
context.setConf(key, value)
sqlContext.setConf(key, value)
Seq(Row(s"$key=$value"))

// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
// while "SET -v" returns all properties.)
// Queries all key-value pairs that are set in the SQLConf of the sqlContext.
// Notice that different from Hive, here "SET -v" is an alias of "SET".
// (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
case Some(("-v", None)) | None =>
context.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq

// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))

// Queries a single property.
case Some((key, None)) =>
Seq(Row(s"$key=${context.getConf(key, "<undefined>")}"))
Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
}

override def otherCopyArgs = context :: Nil
}

/**
Expand All @@ -128,22 +112,19 @@ case class SetCommand(kv: Option[(String, Option[String])], output: Seq[Attribut
*/
@DeveloperApi
case class ExplainCommand(
logicalPlan: LogicalPlan, output: Seq[Attribute], extended: Boolean)(
@transient context: SQLContext)
extends LeafNode with Command {
logicalPlan: LogicalPlan,
override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {

// Run through the optimizer to generate the physical plan.
override protected lazy val sideEffectResult: Seq[Row] = try {
override def run(sqlContext: SQLContext) = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val queryExecution = sqlContext.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString

outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}

override def otherCopyArgs = context :: Nil
}

/**
Expand All @@ -153,10 +134,9 @@ case class ExplainCommand(
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
isLazy: Boolean)
extends LeafNode with Command {
isLazy: Boolean) extends RunnableCommand {

override protected lazy val sideEffectResult = {
override def run(sqlContext: SQLContext) = {
import sqlContext._

plan.foreach(_.registerTempTable(tableName))
Expand All @@ -178,8 +158,9 @@ case class CacheTableCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
override protected lazy val sideEffectResult: Seq[Row] = {
case class UncacheTableCommand(tableName: String) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
sqlContext.table(tableName).unpersist()
Seq.empty[Row]
}
Expand All @@ -191,11 +172,11 @@ case class UncacheTableCommand(tableName: String) extends LeafNode with Command
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
case class DescribeCommand(
child: SparkPlan,
override val output: Seq[Attribute]) extends RunnableCommand {

override protected lazy val sideEffectResult: Seq[Row] = {
override def run(sqlContext: SQLContext) = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.language.implicitConversions

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, SqlLexical}
import org.apache.spark.sql.hive.execution.{AddJar, AddFile, HiveNativeCommand}

/**
* A parser that recognizes all HiveQL constructs together with Spark SQL specific extensions.
Expand Down Expand Up @@ -52,7 +53,7 @@ private[hive] class ExtendedHiveQlParser extends AbstractSparkSQLParser {

protected lazy val dfs: Parser[LogicalPlan] =
DFS ~> wholeInput ^^ {
case command => NativeCommand(command.trim)
case command => HiveNativeCommand(command.trim)
}

private lazy val addFile: Parser[LogicalPlan] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.DecimalType
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
import org.apache.spark.sql.sources.DataSourceStrategy

/**
Expand Down Expand Up @@ -340,7 +340,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override val strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
CommandStrategy(self),
CommandStrategy,
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
Expand Down Expand Up @@ -369,11 +369,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* execution is simply passed back to Hive.
*/
def stringResult(): Seq[String] = executedPlan match {
case describeHiveTableCommand: DescribeHiveTableCommand =>
case ExecutedCommand(desc: DescribeHiveTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
desc.run(self).map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
case command: ExecutedCommand =>
command.executeCollect().map(_.head.toString)

case other =>
Expand All @@ -386,7 +392,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override def simpleString: String =
logical match {
case _: NativeCommand => "<Native command: executed by Hive>"
case _: HiveNativeCommand => "<Native command: executed by Hive>"
case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>"
case _ => super.simpleString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}

import org.apache.spark.sql.execution.SparkPlan

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.util.ReflectionUtils
Expand Down Expand Up @@ -286,14 +288,24 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
Some(sa.getQB().getTableDesc)
}

CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
desc)

case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None)
execution.CreateTableAsSelect(
databaseName,
tableName,
child,
allowExisting,
None)
}
}

Expand Down
13 changes: 3 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand All @@ -44,14 +45,6 @@ import scala.collection.JavaConversions._
*/
private[hive] case object NativePlaceholder extends Command

private[hive] case class AddFile(filePath: String) extends Command

private[hive] case class AddJar(path: String) extends Command

private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command

private[hive] case class AnalyzeTable(tableName: String) extends Command

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
Expand Down Expand Up @@ -239,10 +232,10 @@ private[hive] object HiveQl {
try {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
HiveNativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case NativePlaceholder => HiveNativeCommand(sql)
case other => other
}
}
Expand Down
Loading