Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
578d167
make caseSensitive configurable
Dec 2, 2014
f57f15c
add testcase
Dec 3, 2014
91b1b96
make caseSensitive configurable in Analyzer
Dec 20, 2014
e7bca31
make caseSensitive configuration in Analyzer and Catalog
Dec 20, 2014
fcbf0d9
fix scalastyle check
Dec 20, 2014
6332e0f
fix bug
Jan 3, 2015
005c56d
make SQLContext caseSensitivity configurable
Jan 3, 2015
9bf4cc7
fix bug in catalyst
Jan 3, 2015
73c16b1
fix bug in sql/hive
Jan 3, 2015
05b09a3
fix conflict base on the latest master branch
Jan 19, 2015
dee56e9
fix test case failure
Jan 19, 2015
39e369c
fix confilct after DataFrame PR
Feb 3, 2015
12eca9a
solve conflict with master
Feb 21, 2015
664d1e9
Merge branch 'master' of https://github.com/apache/spark into case
Feb 21, 2015
56034ca
fix conflicts and improve for catalystconf
scwf Apr 30, 2015
5472b08
fix compile issue
scwf Apr 30, 2015
69b3b70
fix AnalysisSuite
scwf Apr 30, 2015
fd30e25
added override
scwf Apr 30, 2015
966e719
set CASE_SENSITIVE false in hivecontext
scwf Apr 30, 2015
5d7c456
set CASE_SENSITIVE false in TestHive
scwf Apr 30, 2015
6ef31cf
revert pom changes
scwf Apr 30, 2015
eee75ba
fix EmptyConf
scwf Apr 30, 2015
d5a9933
fix style
scwf Apr 30, 2015
7fc4a98
fix test case
scwf Apr 30, 2015
6db4bf5
also fix for HiveContext
scwf Apr 30, 2015
2a56515
fix conflicts
scwf May 1, 2015
a3f7659
remove unsed imports
scwf May 3, 2015
b35529e
minor style
scwf May 3, 2015
9e11752
improve SimpleCatalystConf
scwf May 3, 2015
b73df6c
style issue
scwf May 3, 2015
269cf21
fix conflicts
scwf May 6, 2015
4ef1be7
fix conflicts
scwf May 8, 2015
af512c7
fix conflicts
scwf May 8, 2015
d4b724f
address michael's comment
scwf May 8, 2015
cd51712
fix compile
scwf May 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
object SimpleAnalyzer
extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -41,11 +43,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
class Analyzer(
catalog: Catalog,
registry: FunctionRegistry,
caseSensitive: Boolean,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis {

val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
def resolver: Resolver = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to change this into a function because it'll never change.

I'm more of a fan of this:

val resolver = caseSensitive ? caseSensitiveResolution : caseInsensitiveResolution;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually can change since you can set SQL config at runtime, in between queries. Also scala does not have a ternary operator.

if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}

val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}

/**
Expand All @@ -34,7 +36,7 @@ class NoSuchDatabaseException extends Exception
*/
trait Catalog {

def caseSensitive: Boolean
val conf: CatalystConf

def tableExists(tableIdentifier: Seq[String]): Boolean

Expand All @@ -57,10 +59,10 @@ trait Catalog {
def unregisterAllTables(): Unit

protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (!caseSensitive) {
tableIdentifier.map(_.toLowerCase)
} else {
if (conf.caseSensitiveAnalysis) {
tableIdentifier
} else {
tableIdentifier.map(_.toLowerCase)
}
}

Expand All @@ -78,7 +80,7 @@ trait Catalog {
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
Expand Down Expand Up @@ -164,10 +166,10 @@ trait OverrideCatalog extends Catalog {
}

abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {
val dbName = if (conf.caseSensitiveAnalysis) {
databaseName
} else {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
}

val temporaryTables = overrides.filter {
Expand Down Expand Up @@ -207,7 +209,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {

override val caseSensitive: Boolean = true
override val conf: CatalystConf = EmptyConf

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveConf = new SimpleCatalystConf(true)
val caseInsensitiveConf = new SimpleCatalystConf(false)

val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)

val caseSensitiveAnalyzer =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
val caseInsensitiveAnalyzer =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}


def caseSensitiveAnalyze(plan: LogicalPlan): Unit =
caseSensitiveAnalyzer.checkAnalysis(caseSensitiveAnalyzer.execute(plan))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.spark.sql.catalyst.analysis

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.catalyst.SimpleCatalystConf

class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val catalog = new SimpleCatalog(false)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
val conf = new SimpleCatalystConf(true)
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql

import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConversions._

import java.util.Properties
import org.apache.spark.sql.catalyst.CatalystConf

private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
Expand All @@ -32,6 +34,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val DIALECT = "spark.sql.dialect"
val CASE_SENSITIVE = "spark.sql.caseSensitive"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now place this config in SQLConf


val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
Expand Down Expand Up @@ -89,7 +92,8 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable {

private[sql] class SQLConf extends Serializable with CatalystConf {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -158,6 +162,11 @@ private[sql] class SQLConf extends Serializable {
*/
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean

/**
* caseSensitive analysis true by default
*/
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean

/**
* When set to true, Spark SQL will use managed memory for certain operations. This option only
* takes effect if codegen is enabled.
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ class SQLContext(@transient val sparkContext: SparkContext)

// TODO how to handle the temp table per user session?
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

// TODO how to handle the temp function per user session?
@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)

@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
sources.PreInsertCastAndRename ::
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.GeneratedAggregate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
Expand Down Expand Up @@ -1277,6 +1278,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}

test("SPARK-4699 case sensitivity SQL query") {
setConf(SQLConf.CASE_SENSITIVE, "false")
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
setConf(SQLConf.CASE_SENSITIVE, "true")
}

test("SPARK-6145: ORDER BY test for nested fields") {
jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
.registerTempTable("nestedOrder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,13 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter

abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// Case sensitivity is not configurable yet, but we want to test some edge cases.
// TODO: Remove when it is configurable
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
@transient
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
Nil
// We want to test some edge cases.
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)

override val extendedCheckRules = Seq(
sources.PreWriteCheck(catalog)
)
}
}
caseInsensisitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false")
}

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, Query
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -329,7 +330,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
Expand All @@ -350,6 +351,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] class SQLSession extends super.SQLSession {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean =
getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
}

/**
Expand Down
Loading