Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class AttributeReference(
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql

/**
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
* considered an internal API to Spark SQL and are subject to change between minor releases.
*/
package object catalyst {
/**
* A JVM-global lock that should be used to prevent thread safety issues when using things in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
* Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}

/**
* Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
* Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
Expand All @@ -35,16 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
*/
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
def strategies: Seq[Strategy]

/**
* Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
abstract protected class Strategy extends Logging {
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}
def strategies: Seq[GenericStrategy[PhysicalPlan]]

/**
* Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
Expand Down
25 changes: 23 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.{SparkStrategies, _}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser, LogicalRelation}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -68,13 +69,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer

@transient
protected[sql] val ddlParser = new DDLParser

@transient
protected[sql] val sqlParser = {
val fallback = new catalyst.SqlParser
new catalyst.SparkSQLParser(fallback(_))
}

protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser(sql)
protected[sql] def parseSql(sql: String): LogicalPlan = {
ddlParser(sql).getOrElse(sqlParser(sql))
}

protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
Expand Down Expand Up @@ -104,6 +111,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self))
}

implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
}

/**
* :: DeveloperApi ::
* Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
Expand Down Expand Up @@ -283,6 +294,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
def table(tableName: String): SchemaRDD =
new SchemaRDD(this, catalog.lookupRelation(None, tableName))

/**
* :: DeveloperApi ::
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
*/
@DeveloperApi
var extraStrategies: Seq[Strategy] = Nil

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext

Expand All @@ -293,7 +312,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
def numPartitions = self.numShufflePartitions

val strategies: Seq[Strategy] =
extraStrategies ++ (
CommandStrategy(self) ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
Expand All @@ -302,7 +323,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil
BroadcastNestedLoopJoin :: Nil)

/**
* Used to build table scan operators where complex projection and filtering are done using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation}
import org.apache.spark.sql.types.util.DataTypeConversions
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
Expand All @@ -39,6 +40,10 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {

def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc))

def baseRelationToSchemaRDD(baseRelation: BaseRelation): JavaSchemaRDD = {
new JavaSchemaRDD(sqlContext, LogicalRelation(baseRelation))
}

/**
* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ object RDDConversions {
}
}
}

/*
def toLogicalPlan[A <: Product : TypeTag](productRdd: RDD[A]): LogicalPlan = {
LogicalRDD(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
}
*/
}

case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -304,6 +304,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case logical.SetCommand(kv) =>
Seq(execution.SetCommand(kv, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.sql.{SQLConf, SQLContext}

// TODO: DELETE ME...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of an internal refactor that is started below. RunnableCommands will prevent the need for having two classes for every single command as well as a special handler in the query planner for each.

Its pretty big though so I decided to not finish it in this PR.

trait Command {
this: SparkPlan =>

Expand All @@ -44,6 +46,35 @@ trait Command {
override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

// TODO: Replace command with runnable command.
trait RunnableCommand extends logical.Command {
self: Product =>

def output: Seq[Attribute]
def run(sqlContext: SQLContext): Seq[Row]
}

case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)

override def output = cmd.output

override def children = Nil

override def executeCollect(): Array[Row] = sideEffectResult.toArray

override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

/**
* :: DeveloperApi ::
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.json

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio)(sqlContext)
}
}

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
@transient val sqlContext: SQLContext)
extends TableScan {

private def baseRDD = sqlContext.sparkContext.textFile(fileName)

override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)

override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
}
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.SparkPlan

/**
* Allows the execution of relational queries, including those expressed in SQL using Spark.
Expand Down Expand Up @@ -432,6 +433,12 @@ package object sql {
@DeveloperApi
val StructField = catalyst.types.StructField

/**
* Converts a logical plan into zero or more SparkPlans.
*/
@DeveloperApi
type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]

/**
* :: DeveloperApi ::
*
Expand All @@ -448,7 +455,9 @@ package object sql {
type Metadata = catalyst.util.Metadata

/**
* :: DeveloperApi ::
* Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former.
*/
@DeveloperApi
type MetadataBuilder = catalyst.util.MetadataBuilder
}
Loading