Skip to content

Conversation

@rxin
Copy link
Contributor

@rxin rxin commented Jun 29, 2014

No description provided.

@rxin
Copy link
Contributor Author

rxin commented Jun 29, 2014

Can you two take a look at this? @mattf, @cmccabe Thanks!

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16236/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16237/

@rxin
Copy link
Contributor Author

rxin commented Jun 29, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16239/

@cmccabe
Copy link

cmccabe commented Jun 30, 2014

Looks good. +1

@vanzin
Copy link
Contributor

vanzin commented Jun 30, 2014

Nice, this will be very helpful. Probably could also be added to yarn's ApplicationMaster.scala (both of them)?

@rxin
Copy link
Contributor Author

rxin commented Jun 30, 2014

Ok pushed a new change to add signal handler to YARN ApplicationMaster as well. Thanks for looking at this.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16268/

@rxin
Copy link
Contributor Author

rxin commented Jun 30, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16269/

@aarondav
Copy link
Contributor

Can we have HistoryServer too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you indulge me by wrapping this in a "synchronized"? Also, that if seems a bit too important / unexpected to keep hidden on the same line, so this will help that too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@aarondav
Copy link
Contributor

LGTM, just two very minor comments.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16270/

@rxin
Copy link
Contributor Author

rxin commented Jun 30, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16271/

@rxin
Copy link
Contributor Author

rxin commented Jun 30, 2014

I'm merging this in master. Thanks for everybody for taking a look!

@asfgit asfgit closed this in 5fccb56 Jun 30, 2014
@aarondav
Copy link
Contributor

aarondav commented Jul 1, 2014

You forgot to add HistoryServer. Do you want to make pandas cry?

@rxin rxin deleted the signalhandler1 branch July 4, 2014 07:32
asfgit pushed a commit that referenced this pull request Jul 5, 2014
This was omitted in #1260. @aarondav

Author: Reynold Xin <[email protected]>

Closes #1300 from rxin/historyServer and squashes the following commits:

af720a3 [Reynold Xin] Added SignalLogger to HistoryServer.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Author: Reynold Xin <[email protected]>

Closes apache#1260 from rxin/signalhandler1 and squashes the following commits:

8e73552 [Reynold Xin] Uh add Logging back in ApplicationMaster.
0402ba8 [Reynold Xin] Synchronize SignalLogger.register.
dc70705 [Reynold Xin] Added SignalLogger to YARN ApplicationMaster.
79a21b4 [Reynold Xin] Added license header.
0da052c [Reynold Xin] Added the SignalLogger itself.
e587d2e [Reynold Xin] [SPARK-2318] When exiting on a signal, print the signal name first.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This was omitted in apache#1260. @aarondav

Author: Reynold Xin <[email protected]>

Closes apache#1300 from rxin/historyServer and squashes the following commits:

af720a3 [Reynold Xin] Added SignalLogger to HistoryServer.
sunchao added a commit to sunchao/spark that referenced this pull request Dec 8, 2021
…reader (apache#1239) (apache#1260)

### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetType` which binds a Spark type with its corresponding  Parquet definition & repetition level. This is used when Spark assembles a vector of complex type for Parquet.
2. Changed `ParquetSchemaConverter` and added a new method `convertTypeInfo` which converts a Parquet `MessageType` to a `ParquetType` above. The existing conversion logic in the class remains the same but now operates with `ParquetType` instead of `DataType`, and annotate the former with extra information such as definition & repetition level, column path, column descriptor, etc.
3. Added a new class `ParquetColumn` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetType` for the column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels. 
4. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumn` for the columns read.
5. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
6. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels.
7. Added a new config `spark.sql.parquet.enableNestedColumnVectorizedReader` to turn on or turn off the feature. By default it is true.
8. Modified `WritableColumnVector` to better support null structs. Currently it requires populating null entries to all child vectors when there is a null struct, however this will waste space and also doesn't work well with Parquet scan. This adds an extra field `structOffsets` which records the mapping from a row ID to the position of the row in the child vector, so that child vectors will only need to store real null elements.

To test this, the PR introduced an interface `ParquetRowGroupReader ` in `SpecificParquetRecordReaderBase` to abstract the Parquet file reading logic. The bulk of the tests are in `ParquetVectorizedSuite` which covers different batch size & page size, column index, first row index, nulls, etc.

The `DataSourceReadBenchmark` is extended with two more cases: reading struct fields of primitive types and reading array of struct & map field. 

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.

Micro benchmark of reading primitive fields from a struct, over 400m rows:
```
================================================================================================
SQL Single Numeric Column Scan in Struct
================================================================================================

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single TINYINT Column Scan in Struct:        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              77684          78174         692          5.4         185.2       1.0X
SQL ORC Vectorized (Enabled Nested Column)                4137           4226         126        101.4           9.9      18.8X
SQL Parquet Vectorized (Disabled Nested Column)          42095          42193         138         10.0         100.4       1.8X
SQL Parquet Vectorized (Enabled Nested Column)            3317           4147        1174        126.4           7.9      23.4X

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single SMALLINT Column Scan in Struct:       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              82438          82443           7          5.1         196.5       1.0X
SQL ORC Vectorized (Enabled Nested Column)                4746           5022         391         88.4          11.3      17.4X
SQL Parquet Vectorized (Disabled Nested Column)          43689          43761         102          9.6         104.2       1.9X
SQL Parquet Vectorized (Enabled Nested Column)            2894           2986         130        144.9           6.9      28.5X

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single INT Column Scan in Struct:            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              82749          82774          34          5.1         197.3       1.0X
SQL ORC Vectorized (Enabled Nested Column)                4848           4869          30         86.5          11.6      17.1X
SQL Parquet Vectorized (Disabled Nested Column)          47718          47957         338          8.8         113.8       1.7X
SQL Parquet Vectorized (Enabled Nested Column)            3055           3056           2        137.3           7.3      27.1X

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single BIGINT Column Scan in Struct:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              82398          82416          25          5.1         196.5       1.0X
SQL ORC Vectorized (Enabled Nested Column)                6562           7010         634         63.9          15.6      12.6X
SQL Parquet Vectorized (Disabled Nested Column)          51007          51032          35          8.2         121.6       1.6X
SQL Parquet Vectorized (Enabled Nested Column)            4300           4358          82         97.6          10.3      19.2X

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single FLOAT Column Scan in Struct:          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              85791          86323         753          4.9         204.5       1.0X
SQL ORC Vectorized (Enabled Nested Column)                7231           7246          21         58.0          17.2      11.9X
SQL Parquet Vectorized (Disabled Nested Column)          48381          48476         134          8.7         115.3       1.8X
SQL Parquet Vectorized (Enabled Nested Column)            2770           2791          29        151.4           6.6      31.0X

OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
SQL Single DOUBLE Column Scan in Struct:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
SQL ORC Vectorized (Disabled Nested Column)              85566          85598          45          4.9         204.0       1.0X
SQL ORC Vectorized (Enabled Nested Column)                8579           8591          17         48.9          20.5      10.0X
SQL Parquet Vectorized (Disabled Nested Column)          56052          56106          77          7.5         133.6       1.5X
SQL Parquet Vectorized (Enabled Nested Column)            4135           4185          70        101.4           9.9      20.7X
```

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off.

### How was this patch tested?

Added new unit tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants