Skip to content

Commit 6ff6d4f

Browse files
committed
Adding spark.sql.csv.parser.columnPruning.enabled
1 parent f89eeb7 commit 6ff6d4f

File tree

5 files changed

+32
-19
lines changed

5 files changed

+32
-19
lines changed

docs/sql-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1814,7 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18141814
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
18151815
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
18161816
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
1817-
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`.
1817+
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
18181818

18191819
## Upgrading From Spark SQL 2.2 to 2.3
18201820

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,13 @@ object SQLConf {
12661266
object Replaced {
12671267
val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
12681268
}
1269+
1270+
val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled")
1271+
.internal()
1272+
.doc("If it is set to true, column names of the requested schema are passed to CSV parser. " +
1273+
"Other column values can be ignored during parsing even if they are malformed.")
1274+
.booleanConf
1275+
.createWithDefault(true)
12691276
}
12701277

12711278
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.FastDateFormat
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.catalyst.util._
28+
import org.apache.spark.sql.internal.SQLConf
2829

2930
class CSVOptions(
3031
@transient val parameters: CaseInsensitiveMap[String],
@@ -80,6 +81,8 @@ class CSVOptions(
8081
}
8182
}
8283

84+
private[csv] val columnPruning = SQLConf.get.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
85+
8386
val delimiter = CSVUtils.toChar(
8487
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
8588
val parseMode: ParseMode =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535

3636
class UnivocityParser(
37-
schema: StructType,
37+
dataSchema: StructType,
3838
requiredSchema: StructType,
3939
val options: CSVOptions) extends Logging {
40-
require(requiredSchema.toSet.subsetOf(schema.toSet),
40+
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
4141
"requiredSchema should be the subset of schema.")
4242

4343
def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
@@ -47,14 +47,15 @@ class UnivocityParser(
4747

4848
private val tokenizer = {
4949
val parserSetting = options.asParserSettings
50-
if (requiredSchema.length < schema.length) {
51-
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f)))
50+
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
51+
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
5252
parserSetting.selectIndexes(tokenIndexArr: _*)
5353
}
5454
new CsvParser(parserSetting)
5555
}
56+
private val schema = if (options.columnPruning) requiredSchema else dataSchema
5657

57-
private val row = new GenericInternalRow(requiredSchema.length)
58+
private val row = new GenericInternalRow(schema.length)
5859

5960
// Retrieve the raw record string.
6061
private def getCurrentInput: UTF8String = {
@@ -81,7 +82,7 @@ class UnivocityParser(
8182
//
8283
// output row - ["A", 2]
8384
private val valueConverters: Array[ValueConverter] = {
84-
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
85+
schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
8586
}
8687

8788
/**
@@ -189,14 +190,14 @@ class UnivocityParser(
189190
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
190191

191192
private def convert(tokens: Array[String]): InternalRow = {
192-
if (tokens.length != requiredSchema.length) {
193+
if (tokens.length != schema.length) {
193194
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
194195
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
195196
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
196-
val checkedTokens = if (requiredSchema.length > tokens.length) {
197-
tokens ++ new Array[String](requiredSchema.length - tokens.length)
197+
val checkedTokens = if (schema.length > tokens.length) {
198+
tokens ++ new Array[String](schema.length - tokens.length)
198199
} else {
199-
tokens.take(requiredSchema.length)
200+
tokens.take(schema.length)
200201
}
201202
def getPartialResult(): Option[InternalRow] = {
202203
try {
@@ -214,7 +215,7 @@ class UnivocityParser(
214215
} else {
215216
try {
216217
var i = 0
217-
while (i < requiredSchema.length) {
218+
while (i < schema.length) {
218219
row(i) = valueConverters(i).apply(tokens(i))
219220
i += 1
220221
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -260,14 +260,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
260260
}
261261

262262
test("test for DROPMALFORMED parsing mode") {
263-
Seq(false, true).foreach { multiLine =>
264-
val cars = spark.read
265-
.format("csv")
266-
.option("multiLine", multiLine)
267-
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
268-
.load(testFile(carsFile))
263+
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") {
264+
Seq(false, true).foreach { multiLine =>
265+
val cars = spark.read
266+
.format("csv")
267+
.option("multiLine", multiLine)
268+
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
269+
.load(testFile(carsFile))
269270

270-
assert(cars.collect().size === 2)
271+
assert(cars.select("year").collect().size === 2)
272+
}
271273
}
272274
}
273275

0 commit comments

Comments
 (0)