-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23426][SQL] Use hive ORC impl and disable PPD for Spark 2.3.0
#20610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2d74b20
93e6c7d
7ff4ccf
46c8697
2769633
183ec21
19b50b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1004,6 +1004,29 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession | |
| </tr> | ||
| </table> | ||
|
|
||
| ## ORC Files | ||
|
|
||
| Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. | ||
| To do that, the following configurations are newly added. The vectorized reader is used for the | ||
| native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` | ||
| is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC | ||
| serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), | ||
| the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`. | ||
|
|
||
| <table class="table"> | ||
| <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> | ||
| <tr> | ||
| <td><code>spark.sql.orc.impl</code></td> | ||
| <td><code>hive</code></td> | ||
| <td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.orc.enableVectorizedReader</code></td> | ||
| <td><code>true</code></td> | ||
| <td>Enables vectorized orc decoding in <code>native</code> implementation. If <code>false</code>, a new non-vectorized ORC reader is used in <code>native</code> implementation. For <code>hive</code> implementation, this is ignored.</td> | ||
| </tr> | ||
| </table> | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The description of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It's disabled back. @viirya |
||
| ## JSON Datasets | ||
| <div class="codetabs"> | ||
|
|
||
|
|
@@ -1776,35 +1799,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see | |
|
|
||
| ## Upgrading From Spark SQL 2.2 to 2.3 | ||
|
|
||
| - Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added or change their default values. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`. | ||
|
|
||
| - New configurations | ||
|
|
||
| <table class="table"> | ||
| <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> | ||
| <tr> | ||
| <td><code>spark.sql.orc.impl</code></td> | ||
| <td><code>native</code></td> | ||
| <td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3.</td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.sql.orc.enableVectorizedReader</code></td> | ||
| <td><code>true</code></td> | ||
| <td>Enables vectorized orc decoding in <code>native</code> implementation. If <code>false</code>, a new non-vectorized ORC reader is used in <code>native</code> implementation. For <code>hive</code> implementation, this is ignored.</td> | ||
| </tr> | ||
| </table> | ||
|
|
||
| - Changed configurations | ||
|
|
||
| <table class="table"> | ||
| <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr> | ||
| <tr> | ||
| <td><code>spark.sql.orc.filterPushdown</code></td> | ||
| <td><code>true</code></td> | ||
| <td>Enables filter pushdown for ORC files. It is <code>false</code> by default prior to Spark 2.3.</td> | ||
| </tr> | ||
| </table> | ||
|
|
||
| - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. | ||
| - The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles. | ||
| - Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -399,11 +399,11 @@ object SQLConf { | |
|
|
||
| val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") | ||
| .doc("When native, use the native version of ORC support instead of the ORC library in Hive " + | ||
| "1.2.1. It is 'hive' by default prior to Spark 2.3.") | ||
| "1.2.1. It is 'hive' by default.") | ||
| .internal() | ||
| .stringConf | ||
| .checkValues(Set("hive", "native")) | ||
| .createWithDefault("native") | ||
| .createWithDefault("hive") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to disable the ORC pushdown, because the ORC reader of Hive 1.2.1 has a few bugs.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, we don't have a test case for that, do we? Actually, I want to have a test case for that. |
||
|
|
||
| val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader") | ||
| .doc("Enables vectorized orc decoding.") | ||
|
|
@@ -426,7 +426,7 @@ object SQLConf { | |
| val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") | ||
| .doc("When true, enable filter pushdown for ORC files.") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
| .createWithDefault(false) | ||
|
|
||
| val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") | ||
| .doc("When true, check all the partition paths under the table\'s root directory " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,19 @@ import org.apache.spark.util.Utils | |
| class FileStreamSinkSuite extends StreamTest { | ||
| import testImplicits._ | ||
|
|
||
| override def beforeAll(): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: a simpler way to fix this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @cloud-fan . |
||
| super.beforeAll() | ||
| spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") | ||
| } | ||
|
|
||
| override def afterAll(): Unit = { | ||
| try { | ||
| spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION) | ||
| } finally { | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| test("unpartitioned writing and batch reading") { | ||
| val inputData = MemoryStream[Int] | ||
| val df = inputData.toDF() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile . Now, this becomes a section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!