Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4ebf362
First cut at SQLConf inside SQLContext.
concretevitamin Jun 3, 2014
cb722c1
Finish up SQLConf patch.
concretevitamin Jun 3, 2014
ce22d80
Fix parsing issues.
concretevitamin Jun 3, 2014
0ecea46
Changes for HiveQl and HiveContext.
concretevitamin Jun 3, 2014
d0c4578
Tmux typo.
concretevitamin Jun 3, 2014
3b0c71b
Remove Parser for set commands. A few other fixes.
concretevitamin Jun 4, 2014
2276929
Fix default hive result for set commands in HiveComparisonTest.
concretevitamin Jun 4, 2014
41acd75
Add a test for hql() in HiveQuerySuite.
concretevitamin Jun 4, 2014
0f00d86
Add a test for singleton set command in SQL.
concretevitamin Jun 4, 2014
c1017c2
WIP in changing SetCommand to take two Options (for different semanti…
concretevitamin Jun 4, 2014
efd82db
Clean up semantics of several cases of SET.
concretevitamin Jun 4, 2014
c651797
Add commands.scala.
concretevitamin Jun 4, 2014
5b67985
New line at EOF.
concretevitamin Jun 4, 2014
6983180
Move a SET test to SQLQuerySuite and make it complete.
concretevitamin Jun 4, 2014
b14b83e
In a HiveContext, make SQLConf a subset of HiveConf.
concretevitamin Jun 5, 2014
13279e6
Refactor the logic of eagerly processing SET commands.
concretevitamin Jun 5, 2014
41d7f09
Fix imports.
concretevitamin Jun 5, 2014
2ea8cdc
Wrap long line.
concretevitamin Jun 5, 2014
c2067e8
Mark SQLContext transient and put it in a second param list.
concretevitamin Jun 5, 2014
555599c
Bullet-proof (relatively) parsing SET per review comment.
concretevitamin Jun 5, 2014
d52e1bd
De-hardcode number of shuffle partitions for BasicOperators (read fro…
concretevitamin Jun 5, 2014
b766af9
Remove a test.
concretevitamin Jun 5, 2014
1ce8a5e
Invoke runSqlHive() in SQLConf#get for the HiveContext case.
concretevitamin Jun 5, 2014
f8983d1
Minor changes per review comments.
concretevitamin Jun 6, 2014
271f0b1
Minor change.
concretevitamin Jun 6, 2014
88dd0c8
Remove redundant SET Keyword.
concretevitamin Jun 6, 2014
e9856c4
Use java.util.Collections.synchronizedMap on a Java HashMap.
concretevitamin Jun 6, 2014
22d9ed7
Fix output() of Set physical. Add SQLConf param accessor method.
concretevitamin Jun 6, 2014
5f7e6d8
Add default num partitions.
concretevitamin Jun 6, 2014
baa5d29
Remove default param for shuffle partitions accessor.
concretevitamin Jun 9, 2014
dd19666
Update a comment.
concretevitamin Jun 9, 2014
26c40eb
Make SQLConf a trait and have SQLContext mix it in.
concretevitamin Jun 9, 2014
c129b86
Merge remote-tracking branch 'upstream/master' into sqlconf
concretevitamin Jun 10, 2014
d74dde5
Remove the redundant mkQueryExecution() method.
concretevitamin Jun 10, 2014
4968c11
Very minor cleanup.
concretevitamin Jun 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,25 @@ import org.apache.spark.sql.catalyst.types._
* for a SQL like language should checkout the HiveQL support in the sql/hive sub-project.
*/
class SqlParser extends StandardTokenParsers with PackratParsers {

def apply(input: String): LogicalPlan = {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
// Special-case out set commands since the value fields can be
// complex to handle without RegexParsers. Also this approach
// is clearer for the several possible cases of set commands.
if (input.trim.toLowerCase.startsWith("set")) {
input.trim.drop(3).split("=", 2).map(_.trim) match {
case Array("") => // "set"
SetCommand(None, None)
case Array(key) => // "set key"
SetCommand(Some(key), None)
case Array(key, value) => // "set key=value"
SetCommand(Some(key), Some(value))
}
} else {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
}
}
}

Expand Down Expand Up @@ -169,11 +184,13 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
}
}

protected lazy val query: Parser[LogicalPlan] =
protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
) | insert
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert
)

protected lazy val select: Parser[LogicalPlan] =
SELECT ~> opt(DISTINCT) ~ projections ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
}

/**
Expand All @@ -111,6 +111,16 @@ abstract class Command extends LeafNode {
*/
case class NativeCommand(cmd: String) extends Command

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
Expand Down
78 changes: 78 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.Properties

import scala.collection.JavaConverters._

/**
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
* getters of this class. This class is thread-safe.
*/
trait SQLConf {

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

def set(props: Properties): Unit = {
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
}

def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for ${key}")
settings.put(key, value)
}

def get(key: String): String = {
if (!settings.containsKey(key)) {
throw new NoSuchElementException(key)
}
settings.get(key)
}

def get(key: String, defaultValue: String): String = {
if (!settings.containsKey(key)) defaultValue else settings.get(key)
}

def getAll: Array[(String, String)] = settings.asScala.toArray

def getOption(key: String): Option[String] = {
if (!settings.containsKey(key)) None else Some(settings.get(key))
}

def contains(key: String): Boolean = settings.containsKey(key)

def toDebugString: String = {
settings.synchronized {
settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
}
}

private[spark] def clear() {
settings.clear()
}

}
39 changes: 31 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
Expand All @@ -52,6 +52,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
extends Logging
with SQLConf
with dsl.ExpressionConversions
with Serializable {

Expand Down Expand Up @@ -190,6 +191,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext

def numPartitions = self.numShufflePartitions

val strategies: Seq[Strategy] =
CommandStrategy(self) ::
TakeOrdered ::
Expand Down Expand Up @@ -246,37 +249,57 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] val planner = new SparkPlanner

@transient
protected[sql] lazy val emptyResult =
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)

/**
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
* inserting shuffle operations as needed.
*/
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, AddExchange) ::
Batch("Add exchange", Once, AddExchange(self)) ::
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
}

// TODO: or should we make QueryExecution protected[sql]?
protected[sql] def mkQueryExecution(plan: LogicalPlan) = new QueryExecution {
val logical = plan
}

/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused that the actual logic for the set command is not handled in SetCommandPhysical but here as a special case. I think we can remove eagerlyProcess and processCmd in this file with the help of changes made in PR #948, so that we can move the actual processing logic of set command back to SetCommandPhysical. @rxin @marmbrus What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @liancheng -- so this function eagerlyProcess is supposed to be triggered as soon as hql("set ...") is typed in / is being evaluated. The actual physical operator SetCommandPhysical handles evaluating the actual contents of the SchemaRDD. These contents are not (and need not be, in the spirit of RDDs) eagerly processed at this particular place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @concretevitamin, previously I thought the only important thing of a SchemaRDD made from an eagerly executed statement (a Hive native command, a set command, or some insertion command etc.) is its side effect, not the content of the RDD itself. But now I do agree that when user call .collect() of such an RDD, we should present some meaningful result.

As for the eager evaluation of the RDD content, IMHO, for command SchemaRDDs, the RDD contents are either empty or usually generated at the same time when the command is executed (at least I can't find a counter example in the current code base), thus this shouldn't be an issue.

So I think the constraints presented here are:

  1. The side effect of a command SchemaRDD should take place eagerly;
  2. The side effect of a command SchemaRDD should take place once and only once;
  3. When .collect() method is called, something meaningful, usually the output message lines of the command, should be presented.

Then how about adding a lazy field inside all the physical command nodes to wrap up the side effect and hold the command output? Take the SetCommandPhysical as an example:

trait PhysicalCommand(@transient context: SQLContext) {
   lazy val commandOutput: Any
}

case class SetCommandPhysical(
    key: Option[String], value: Option[String], output: Seq[Attribute])(
    @transient context: SQLContext)
  extends PhysicalCommand(context)
  with PhysicalCommand {

  override lazy val commandOutput = {
    // Perform the side effect, and record appropriate output
    ???
  }

  def execute(): RDD[Row] = {
    val row = new GenericRow(Array[Any](commandOutput))
    context.sparkContext.parallelize(row, 1)
  }
}

In this way, all the constraints are met:

  1. Eager evaluation: done by the toRdd call in SchemaRDDLike (PR [SPARK-1852] prevents queries with sorts submitting jobs prematurely #948),
  2. Side effect should take place once and only once: ensured by the lazy commandOutput field,
  3. Present meaningful output as RDD contents: command output is held by commandOutput and returned in execute().

An additional benefit is that, side effect logic of all the commands can be implemented within their own physical command nodes, instead of adding special cases inside SQLContext.toRdd and/or HiveContext.toRdd.

Did I miss something here?

On the other hand, although I think this solution can be more concise and clean, it may involve some non-trivial changes, so I think we'd better merge this PR as is and make those changes in another PR when appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I merged this, but that sounds like a good plan to me.

Follow up JIRA: https://issues.apache.org/jira/browse/SPARK-2094

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng That sounds like a very clear & good solution!

// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute()
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
Expand Down Expand Up @@ -86,9 +86,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
* [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting
* [[Exchange]] Operators where required.
*/
private[sql] object AddExchange extends Rule[SparkPlan] {
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
val numPartitions = 150
def numPartitions = sqlContext.numShufflePartitions

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.{SQLConf, SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -193,8 +193,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
// TODO: Set
val numPartitions = 200
def numPartitions = self.numPartitions

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
execution.Aggregate(
Expand Down Expand Up @@ -234,11 +234,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

// TODO: this should be merged with SPARK-1508's SetCommandStrategy
case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
val qe = context.mkQueryExecution(child)
val qe = context.executePlan(child)
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,45 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
(@transient context: SQLContext) extends LeafNode {
def execute(): RDD[Row] = (key, value) match {
// Set value for key k; the action itself would
// have been performed in QueryExecution eagerly.
case (Some(k), Some(v)) => context.emptyResult
// Query the value bound to key k.
case (Some(k), None) =>
val resultString = context.getOption(k) match {
case Some(v) => s"$k=$v"
case None => s"$k is undefined"
}
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
val pairs = context.getAll
val rows = pairs.map { case (k, v) =>
new GenericRow(Array[Any](s"$k=$v"))
}.toSeq
// Assume config parameters can fit into one split (machine) ;)
context.sparkContext.parallelize(rows, 1)
// The only other case is invalid semantics and is impossible.
case _ => context.emptyResult
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
(@transient context: SQLContext) extends UnaryNode {
def execute(): RDD[Row] = {
Expand Down
Loading