Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*)
typeCoercionRules :_*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)

/**
Expand All @@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case s: Star => s.copy(table = s.table.map(_.toLowerCase))
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
case Alias(c, name) => Alias(c, name.toLowerCase)()
case GetField(c, name) => GetField(c, name.toLowerCase)
}
}
}
Expand Down Expand Up @@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
exprs.collect { case _: Star => true }.nonEmpty
}
}

/**
* Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
* only required to provide scoping information for attributes and can be removed once analysis is
* complete. Similarly, this node also removes
* [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators.
*/
object EliminateAnalysisOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(_, child) => child
case LowerCaseSchema(child) => child
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,16 @@ import org.apache.spark.sql.catalyst.types._

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
}

/**
* Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
* only required to provide scoping information for attributes and can be removed once analysis is
* complete.
*/
object EliminateSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(_, child) => child
}
}

/**
* Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
* equivalent [[catalyst.expressions.Literal Literal]] values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package plans
package logical

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
Expand Down Expand Up @@ -86,7 +87,7 @@ case class Join(
}

case class InsertIntoTable(
table: BaseRelation,
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean)
Expand Down Expand Up @@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
def references = Set.empty
}

/**
* Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences
* this allows for optional case insensitive attribute resolution. This node can be elided after
* analysis.
*/
case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
protected def lowerCaseSchema(dataType: DataType): DataType = dataType match {
case StructType(fields) =>
StructType(fields.map(f =>
StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
case otherType => otherType
}

val output = child.output.map {
case a: AttributeReference =>
AttributeReference(
a.name.toLowerCase,
lowerCaseSchema(a.dataType),
a.nullable)(
a.exprId,
a.qualifiers)
}

def references = Set.empty
}

case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
extends UnaryNode {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ package org.apache.spark.sql
package catalyst
package optimizer

import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types.IntegerType

// For implicit conversions
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class ConstantFoldingSuite extends OptimizerTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
Batch("AnalysisNodes", Once,
EliminateAnalysisOperators) ::
Batch("ConstantFolding", Once,
ConstantFolding,
BooleanSimplification) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.apache.spark.sql
package catalyst
package optimizer


import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

Expand All @@ -14,9 +16,8 @@ class FilterPushdownSuite extends OptimizerTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
EliminateAnalysisOperators) ::
Batch("Filter Pushdown", Once,
EliminateSubqueries,
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin) :: Nil
Expand Down Expand Up @@ -155,7 +156,7 @@ class FilterPushdownSuite extends OptimizerTest {
}
val optimized = Optimize(originalQuery.analyze)

comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized)
comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
}

test("joins: conjunctive predicates") {
Expand All @@ -174,7 +175,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}

test("joins: conjunctive predicates #2") {
Expand All @@ -193,7 +194,7 @@ class FilterPushdownSuite extends OptimizerTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}

test("joins: conjunctive predicates #3") {
Expand All @@ -216,6 +217,6 @@ class FilterPushdownSuite extends OptimizerTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze

comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -108,18 +108,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient
override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {

LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}

/* An analyzer that uses the Hive metastore. */
@transient
override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)

def tables: Seq[BaseRelation] = {
// TODO: Move this functionallity to Catalog. Make client protected.
val allTables = catalog.client.getAllTables("default")
allTables.map(catalog.lookupRelation(None, _, None)).collect { case b: BaseRelation => b }
}

/**
* Runs the specified SQL query using Hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer

import org.apache.spark.sql.catalyst.analysis.Catalog

import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
createTable(databaseName, tableName, child.output)

InsertIntoTable(
lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation],
EliminateAnalysisOperators(
lookupRelation(Some(databaseName), tableName, None)),
Map.empty,
child,
overwrite = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ trait HiveStrategies {
case p @ FilteredOperation(predicates, relation: MetastoreRelation)
if relation.isPartitioned =>

val partitionKeyIds = relation.partitionKeys.map(_.id).toSet
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet

// Filter out all predicates that only deal with partition keys
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.id).subsetOf(partitionKeyIds)
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

val scan = HiveTableScan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.sql
package hive
package execution

import TestHive._

case class Data(a: Int, B: Int, n: Nested)
case class Nested(a: Int, B: Int)

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
*/
Expand Down Expand Up @@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
createQueryTest("alias.*",
"SELECT a.* FROM src a ORDER BY key LIMIT 1")

test("case insensitivity with scala reflection") {
// Test resolution with Scala Reflection
TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil)
.registerAsTable("caseSensitivityTest")

sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
}

/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.
Expand Down