Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented May 10, 2018

What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes for parsing like:

// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);

In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:

Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2

Note: Comparing to current implementation, the changes can return different result for malformed rows in the DROPMALFORMED and FAILFAST modes if only subset of all columns is requested. To have previous behavior, set spark.sql.csv.parser.columnPruning.enabled to false.

How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.

.load(testFile(carsFile))

assert(cars.select("year").collect().size === 2)
assert(cars.collect().size === 2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cars.csv file has header with 5 columns:

year,make,model,comment,blank

and 2 rows with 4 valid columns and the last one is blank:

"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",

and one more row with only with 3 columns:

2015,Chevy,Volt

Previous (current) implementation drops the last row in the dropmalformed mode because it parses whole rows, and the last one is incorrect. If only the year column is selected, uniVocity parser returns values for first column (with index 0) and doesn't analyze correctness of the rest part of the rows. So in this way cars.select("year").collect().size returns 3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behaviour and it's intendedly parsed to keep the backword compatibility. There was an issue about the different number of counts. I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's intendedly parsed to keep the backword compatibility.

Right, by selecting all columns I force UnivocityParser to fall to the case:
https://github.com/MaxGekk/spark-1/blob/a4a0a549156a15011c33c7877a35f244d75b7a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala#L193-L213
when number of returned tokens are less than required.

In the case of cars.select("year"), uniVocity parser returns only one token as it is expected.

There was an issue about the different number of counts.

The PR changes behavior for some malformed inputs but I believe we could provide better performance for users who have correct inputs.

I think you are basically saying cars.select("year").collect().size and cars.collect().size are different and they are correct, right?

Yes, you can say that. You are right it seems the PR proposes another interpretation for malformed rows. cars.select("year") is:

+----+
|year|
+----+
|2012|
|1997|
|2015|
+----+

and we should not reject 2015 only because there are problems in not requested columns. In this particular case, the last row consists of only one value at 0 position and it is correct.

@MaxGekk
Copy link
Member Author

MaxGekk commented May 10, 2018

@cloud-fan @hvanhovell Could you look at the PR, please.

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90471 has finished for PR 21296 at commit e3958b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90473 has finished for PR 21296 at commit a4a0a54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

--------------------------------------------------------------------------------------------
Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X
Select 100 columns 28625 / 32884 0.0 28625.1 2.7X
Select one column 22498 / 22669 0.0 22497.8 3.4X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we are already doing the column pruning by avoiding casting cost which is relatively expensive comparing to the parsing logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, avoiding unnecessary casting speeds up more than 2 times. We can see that on this benchmark before my changes. Without the changes, selecting only one string column takes 44.5 seconds but select of all columns ~80 seconds.

... relatively expensive comparing to the parsing logic.

As the benchmark shows we can achieve performance improvements in parsing too. Selecting only 1 out of 1000 columns takes 22.5 seconds but without the PR it takes 44.5:
8809cec

@HyukjinKwon
Copy link
Member

mind fixing the PR title? sounds we have never implemented the "column pruning" before in CSV.

val parserSetting = options.asParserSettings
if (requiredSchema.length < schema.length) {
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f)))
parserSetting.selectIndexes(tokenIndexArr: _*)
Copy link
Member

@HyukjinKwon HyukjinKwon May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried this locally a while ago but I didn't submit a PR since the improvement was trivial and a test was broken fwiw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the changes as an experiment because some of our clients have many columns (> 200 columns) in their input CSV files. The experiment shows that the improvements can significantly impact on total execution time.

@MaxGekk MaxGekk changed the title [SPARK-24244][SQL] CSV column pruning [SPARK-24244][SQL] Passing only required columns to the CSV parser May 11, 2018
assert(sampled.count() == ds.count())
}

test("SPARK-24244: Select a little of many columns") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the test to check that requesting only subset of all columns works correctly. And to check the case when ordering of fields in required schema is different from the data schema. Previously I had a concern that if I select columns in different order like select('f15, 'f10, 'f5), I will get the required schema with the same field order. It seems the required schema has the same order as data schema. That's why I removed https://github.com/apache/spark/pull/21296/files/a4a0a549156a15011c33c7877a35f244d75b7a4f#diff-d19881aceddcaa5c60620fdcda99b4c4L214

@cloud-fan
Copy link
Contributor

I'm fine with breaking the CSV behavior of malformed input a little, as long as we documented it well in the migration guide. cc @gatorsmile

new CsvParser(parserSetting)
}

private val row = new GenericInternalRow(requiredSchema.length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need to move this down.

@HyukjinKwon
Copy link
Member

Can we update the migration guide then? I want to see if the note makes sense.

@SparkQA
Copy link

SparkQA commented May 13, 2018

Test build #90555 has finished for PR 21296 at commit f90daa7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- Since Spark 2.4, handling of malformed rows in CSV files was changed. Previously, all column values of every row are parsed independently of its future usage. A row was considered as malformed if the CSV parser wasn't able to handle any column value in the row even if the value wasn't requested. Starting from version 2.4, only requested column values are parsed, and other values can be ignored. In such way, correct column values that were considered as malformed in previous Spark version only because of other malformed values become correct in Spark version 2.4.
Copy link
Member

@HyukjinKwon HyukjinKwon May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add some more examples? For example, I guess now df.count() with dropmalformed give a different number too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow the style of other migration guides?

In version 2.3 and earlier, xxxx. Since Spark 2.4, xxxx. As an example, xxxx. (and talk about the flag to restore the previous behavior)

MaxGekk added 3 commits May 17, 2018 22:02
# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
#	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90751 has finished for PR 21296 at commit 7dcfc7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WidenSetOperationTypes(conf: SQLConf) extends Rule[LogicalPlan]
  • case class FunctionArgumentConversion(conf: SQLConf) extends TypeCoercionRule
  • case class CaseWhenCoercion(conf: SQLConf) extends TypeCoercionRule
  • case class IfCoercion(conf: SQLConf) extends TypeCoercionRule
  • case class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule
  • class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90752 has finished for PR 21296 at commit f89eeb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using the filter(_ => true) trick, can we provide a config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to add spark.sql.csv.parser.columnPruning.enabled to SQLConf. Is it ok for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we follow the parquet configs and just use spark.sql.csv.columnPruning.enabled?

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90797 has finished for PR 21296 at commit 6ff6d4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented May 22, 2018

@cloud-fan @HyukjinKwon Could you look at the PR, please.

@cloud-fan
Copy link
Contributor

LGTM except a minor comment about the config name

@HyukjinKwon
Copy link
Member

Just for clarification, I'm okay.

@MaxGekk
Copy link
Member Author

MaxGekk commented May 22, 2018

I added the word parser to the feature name because as @HyukjinKwon wrote above we do pruning in type conversion already. This PR enables column pruning by CSV parser only.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 8086acc May 22, 2018
@gatorsmile
Copy link
Member

This PR breaks the tests. Let me revert it. Please resubmit the PR with the fix. Thanks!

@koertkuipers
Copy link
Contributor

if i do not select a schema (and i use inferSchema), and i do a select for only a few column, does this push down the column selection into the reading of data (for schema inference and for the actual data read)?

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 15, 2018

... does this push down the column selection into the reading of data

@koertkuipers Yes, it does.

Halo9Pan pushed a commit to Halo9Pan/dive-spark that referenced this pull request Oct 12, 2018
## What changes were proposed in this pull request?

1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).

    - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.

    - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can.

2. Create `CSVHeaderChecker` and put `enforceSchema` logics into that.

    - The checking header and column pruning stuff were added (per apache#20894 and apache#21296) but some of codes such as apache#22123 are duplicated

    - Also, checking header code is basically here and there. We better put them in a single place, which was quite error-prone. See (apache#22656).

3. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is).

    - Similar reasons above with 1.

## How was this patch tested?

Existing tests should cover this.

Closes apache#22676 from HyukjinKwon/refactoring-csv.

Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

1. Move `CSVDataSource.makeSafeHeader` to `CSVUtils.makeSafeHeader` (as is).

    - Historically and at the first place of refactoring (which I did), I intended to put all CSV specific handling (like options), filtering, extracting header, etc.

    - See `JsonDataSource`. Now `CSVDataSource` is quite consistent with `JsonDataSource`. Since CSV's code path is quite complicated, we might better match them as possible as we can.

2. Create `CSVHeaderChecker` and put `enforceSchema` logics into that.

    - The checking header and column pruning stuff were added (per apache#20894 and apache#21296) but some of codes such as apache#22123 are duplicated

    - Also, checking header code is basically here and there. We better put them in a single place, which was quite error-prone. See (apache#22656).

3. Move `CSVDataSource.checkHeaderColumnNames` to `CSVHeaderChecker.checkHeaderColumnNames` (as is).

    - Similar reasons above with 1.

## How was this patch tested?

Existing tests should cover this.

Closes apache#22676 from HyukjinKwon/refactoring-csv.

Authored-by: hyukjinkwon <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
@MaxGekk MaxGekk deleted the csv-column-pruning branch August 17, 2019 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants