Skip to content

Commit 74f5c21

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-18433][SQL] Improve DataSource option keys to be more case-insensitive
## What changes were proposed in this pull request? This PR aims to improve DataSource option keys to be more case-insensitive DataSource partially use CaseInsensitiveMap in code-path. For example, the following fails to find url. ```scala val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) df.write.format("jdbc") .option("UrL", url1) .option("dbtable", "TEST.SAVETEST") .options(properties.asScala) .save() ``` This PR makes DataSource options to use CaseInsensitiveMap internally and also makes DataSource to use CaseInsensitiveMap generally except `InMemoryFileIndex` and `InsertIntoHadoopFsRelationCommand`. We can not pass them CaseInsensitiveMap because they creates new case-sensitive HadoopConfs by calling newHadoopConfWithOptions(options) inside. ## How was this patch tested? Pass the Jenkins test with newly added test cases. Author: Dongjoon Hyun <[email protected]> Closes #15884 from dongjoon-hyun/SPARK-18433.
1 parent 95eb06b commit 74f5c21

File tree

18 files changed

+133
-49
lines changed

18 files changed

+133
-49
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,19 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2323
import org.apache.commons.lang3.time.FastDateFormat
2424

2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
26+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
2727

2828
/**
2929
* Options for parsing JSON data into Spark SQL rows.
3030
*
3131
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
3232
*/
3333
private[sql] class JSONOptions(
34-
@transient private val parameters: Map[String, String])
34+
@transient private val parameters: CaseInsensitiveMap)
3535
extends Logging with Serializable {
3636

37+
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
38+
3739
val samplingRatio =
3840
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
3941
val primitivesAsString =
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util
19+
20+
/**
21+
* Builds a map in which keys are case insensitive
22+
*/
23+
class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
24+
with Serializable {
25+
26+
val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
27+
28+
override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
29+
30+
override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
31+
baseMap + kv.copy(_1 = kv._1.toLowerCase)
32+
33+
override def iterator: Iterator[(String, String)] = baseMap.iterator
34+
35+
override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
36+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3434
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
3535
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
36-
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
36+
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3737
import org.apache.spark.sql.types._
3838
import org.apache.spark.util.SerializableConfiguration
3939

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
3131
import org.apache.spark.sql._
3232
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
3333
import org.apache.spark.sql.catalyst.expressions.Attribute
34+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3435
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3536
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
3637
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -80,13 +81,13 @@ case class DataSource(
8081

8182
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
8283
lazy val sourceInfo = sourceSchema()
84+
private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
8385

8486
/**
8587
* Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
8688
*/
8789
private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
8890
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
89-
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
9091
val allPaths = caseInsensitiveOptions.get("path")
9192
val globbedPaths = allPaths.toSeq.flatMap { path =>
9293
val hdfsPath = new Path(path)
@@ -114,11 +115,10 @@ case class DataSource(
114115
providingClass.newInstance() match {
115116
case s: StreamSourceProvider =>
116117
val (name, schema) = s.sourceSchema(
117-
sparkSession.sqlContext, userSpecifiedSchema, className, options)
118+
sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
118119
SourceInfo(name, schema, Nil)
119120

120121
case format: FileFormat =>
121-
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
122122
val path = caseInsensitiveOptions.getOrElse("path", {
123123
throw new IllegalArgumentException("'path' is not specified")
124124
})
@@ -158,10 +158,14 @@ case class DataSource(
158158
providingClass.newInstance() match {
159159
case s: StreamSourceProvider =>
160160
s.createSource(
161-
sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, options)
161+
sparkSession.sqlContext,
162+
metadataPath,
163+
userSpecifiedSchema,
164+
className,
165+
caseInsensitiveOptions)
162166

163167
case format: FileFormat =>
164-
val path = new CaseInsensitiveMap(options).getOrElse("path", {
168+
val path = caseInsensitiveOptions.getOrElse("path", {
165169
throw new IllegalArgumentException("'path' is not specified")
166170
})
167171
new FileStreamSource(
@@ -171,7 +175,7 @@ case class DataSource(
171175
schema = sourceInfo.schema,
172176
partitionColumns = sourceInfo.partitionColumns,
173177
metadataPath = metadataPath,
174-
options = options)
178+
options = caseInsensitiveOptions)
175179
case _ =>
176180
throw new UnsupportedOperationException(
177181
s"Data source $className does not support streamed reading")
@@ -182,18 +186,17 @@ case class DataSource(
182186
def createSink(outputMode: OutputMode): Sink = {
183187
providingClass.newInstance() match {
184188
case s: StreamSinkProvider =>
185-
s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)
189+
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
186190

187191
case fileFormat: FileFormat =>
188-
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
189192
val path = caseInsensitiveOptions.getOrElse("path", {
190193
throw new IllegalArgumentException("'path' is not specified")
191194
})
192195
if (outputMode != OutputMode.Append) {
193196
throw new IllegalArgumentException(
194197
s"Data source $className does not support $outputMode output mode")
195198
}
196-
new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)
199+
new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
197200

198201
case _ =>
199202
throw new UnsupportedOperationException(
@@ -234,7 +237,6 @@ case class DataSource(
234237
* that files already exist, we don't need to check them again.
235238
*/
236239
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
237-
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
238240
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
239241
// TODO: Throw when too much is given.
240242
case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -274,7 +276,7 @@ case class DataSource(
274276
dataSchema = dataSchema,
275277
bucketSpec = None,
276278
format,
277-
options)(sparkSession)
279+
caseInsensitiveOptions)(sparkSession)
278280

279281
// This is a non-streaming file based datasource.
280282
case (format: FileFormat, _) =>
@@ -358,13 +360,13 @@ case class DataSource(
358360

359361
providingClass.newInstance() match {
360362
case dataSource: CreatableRelationProvider =>
361-
dataSource.createRelation(sparkSession.sqlContext, mode, options, data)
363+
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
362364
case format: FileFormat =>
363365
// Don't glob path for the write path. The contracts here are:
364366
// 1. Only one output path can be specified on the write path;
365367
// 2. Output path must be a legal HDFS style file system path;
366368
// 3. It's OK that the output path doesn't exist yet;
367-
val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
369+
val allPaths = paths ++ caseInsensitiveOptions.get("path")
368370
val outputPath = if (allPaths.length == 1) {
369371
val path = new Path(allPaths.head)
370372
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -391,7 +393,7 @@ case class DataSource(
391393
// TODO: Case sensitivity.
392394
val sameColumns =
393395
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
394-
if (existingPartitionColumns.size > 0 && !sameColumns) {
396+
if (existingPartitionColumns.nonEmpty && !sameColumns) {
395397
throw new AnalysisException(
396398
s"""Requested partitioning does not match existing partitioning.
397399
|Existing partitioning columns:

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ import java.util.Locale
2323
import org.apache.commons.lang3.time.FastDateFormat
2424

2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
26+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
2727

28-
private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
28+
private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap)
2929
extends Logging with Serializable {
3030

31+
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
32+
3133
private def getChar(paramName: String, default: Char): Char = {
3234
val paramValue = parameters.get(paramName)
3335
paramValue match {
@@ -128,7 +130,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
128130

129131
object CSVOptions {
130132

131-
def apply(): CSVOptions = new CSVOptions(Map.empty)
133+
def apply(): CSVOptions = new CSVOptions(new CaseInsensitiveMap(Map.empty))
132134

133135
def apply(paramName: String, paramValue: String): CSVOptions = {
134136
new CSVOptions(Map(paramName -> paramValue))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,3 @@ case class RefreshResource(path: String)
9696
Seq.empty[Row]
9797
}
9898
}
99-
100-
/**
101-
* Builds a map in which keys are case insensitive
102-
*/
103-
class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
104-
with Serializable {
105-
106-
val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
107-
108-
override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
109-
110-
override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
111-
baseMap + kv.copy(_1 = kv._1.toLowerCase)
112-
113-
override def iterator: Iterator[(String, String)] = baseMap.iterator
114-
115-
override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
116-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@ import java.util.Properties
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

25+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
26+
2527
/**
2628
* Options for the JDBC data source.
2729
*/
2830
class JDBCOptions(
29-
@transient private val parameters: Map[String, String])
31+
@transient private val parameters: CaseInsensitiveMap)
3032
extends Serializable {
3133

3234
import JDBCOptions._
3335

36+
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
37+
3438
def this(url: String, table: String, parameters: Map[String, String]) = {
35-
this(parameters ++ Map(
39+
this(new CaseInsensitiveMap(parameters ++ Map(
3640
JDBCOptions.JDBC_URL -> url,
37-
JDBCOptions.JDBC_TABLE_NAME -> table))
41+
JDBCOptions.JDBC_TABLE_NAME -> table)))
3842
}
3943

4044
val asConnectionProperties: Properties = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import org.apache.parquet.hadoop.metadata.CompressionCodecName
2121

22+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2223
import org.apache.spark.sql.internal.SQLConf
2324

2425
/**
2526
* Options for the Parquet data source.
2627
*/
2728
private[parquet] class ParquetOptions(
28-
@transient private val parameters: Map[String, String],
29+
@transient private val parameters: CaseInsensitiveMap,
2930
@transient private val sqlConf: SQLConf)
3031
extends Serializable {
3132

3233
import ParquetOptions._
3334

35+
def this(parameters: Map[String, String], sqlConf: SQLConf) =
36+
this(new CaseInsensitiveMap(parameters), sqlConf)
37+
3438
/**
3539
* Compression codec to use. By default use the value specified in SQLConf.
3640
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming
2020
import scala.util.Try
2121

2222
import org.apache.spark.internal.Logging
23-
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
23+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2424
import org.apache.spark.util.Utils
2525

2626
/**
2727
* User specified options for file streams.
2828
*/
29-
class FileStreamOptions(parameters: Map[String, String]) extends Logging {
29+
class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
30+
31+
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
3032

3133
val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
3234
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
@@ -50,5 +52,5 @@ class FileStreamOptions(parameters: Map[String, String]) extends Logging {
5052

5153
/** Options as specified by the user, in a case-insensitive map, without "path" set. */
5254
val optionMapWithoutPath: Map[String, String] =
53-
new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
55+
parameters.filterKeys(_ != "path")
5456
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
109109
val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType))
110110
assert(mergedNullTypes.deep == Array(NullType).deep)
111111
}
112+
113+
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
114+
val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
115+
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
116+
}
112117
}

0 commit comments

Comments
 (0)