Skip to content

Commit bae4449

Browse files
dongjoon-hyungatorsmile
authored andcommitted
[SPARK-23426][SQL] Use hive ORC impl and disable PPD for Spark 2.3.0
## What changes were proposed in this pull request? To prevent any regressions, this PR changes ORC implementation to `hive` by default like Spark 2.2.X. Users can enable `native` ORC. Also, ORC PPD is also restored to `false` like Spark 2.2.X. ![orc_section](https://user-images.githubusercontent.com/9700541/36221575-57a1d702-1173-11e8-89fe-dca5842f4ca7.png) ## How was this patch tested? Pass all test cases. Author: Dongjoon Hyun <[email protected]> Closes #20610 from dongjoon-hyun/SPARK-ORC-DISABLE. (cherry picked from commit 2f0498d) Signed-off-by: gatorsmile <[email protected]>
1 parent d24d131 commit bae4449

File tree

5 files changed

+68
-33
lines changed

5 files changed

+68
-33
lines changed

docs/sql-programming-guide.md

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,29 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
10041004
</tr>
10051005
</table>
10061006

1007+
## ORC Files
1008+
1009+
Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files.
1010+
To do that, the following configurations are newly added. The vectorized reader is used for the
1011+
native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl`
1012+
is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC
1013+
serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`),
1014+
the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`.
1015+
1016+
<table class="table">
1017+
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
1018+
<tr>
1019+
<td><code>spark.sql.orc.impl</code></td>
1020+
<td><code>hive</code></td>
1021+
<td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1.</td>
1022+
</tr>
1023+
<tr>
1024+
<td><code>spark.sql.orc.enableVectorizedReader</code></td>
1025+
<td><code>true</code></td>
1026+
<td>Enables vectorized orc decoding in <code>native</code> implementation. If <code>false</code>, a new non-vectorized ORC reader is used in <code>native</code> implementation. For <code>hive</code> implementation, this is ignored.</td>
1027+
</tr>
1028+
</table>
1029+
10071030
## JSON Datasets
10081031
<div class="codetabs">
10091032

@@ -1776,35 +1799,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see
17761799

17771800
## Upgrading From Spark SQL 2.2 to 2.3
17781801

1779-
- Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added or change their default values. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`.
1780-
1781-
- New configurations
1782-
1783-
<table class="table">
1784-
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
1785-
<tr>
1786-
<td><code>spark.sql.orc.impl</code></td>
1787-
<td><code>native</code></td>
1788-
<td>The name of ORC implementation. It can be one of <code>native</code> and <code>hive</code>. <code>native</code> means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3.</td>
1789-
</tr>
1790-
<tr>
1791-
<td><code>spark.sql.orc.enableVectorizedReader</code></td>
1792-
<td><code>true</code></td>
1793-
<td>Enables vectorized orc decoding in <code>native</code> implementation. If <code>false</code>, a new non-vectorized ORC reader is used in <code>native</code> implementation. For <code>hive</code> implementation, this is ignored.</td>
1794-
</tr>
1795-
</table>
1796-
1797-
- Changed configurations
1798-
1799-
<table class="table">
1800-
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
1801-
<tr>
1802-
<td><code>spark.sql.orc.filterPushdown</code></td>
1803-
<td><code>true</code></td>
1804-
<td>Enables filter pushdown for ORC files. It is <code>false</code> by default prior to Spark 2.3.</td>
1805-
</tr>
1806-
</table>
1807-
18081802
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
18091803
- The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
18101804
- Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,11 +393,11 @@ object SQLConf {
393393

394394
val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
395395
.doc("When native, use the native version of ORC support instead of the ORC library in Hive " +
396-
"1.2.1. It is 'hive' by default prior to Spark 2.3.")
396+
"1.2.1. It is 'hive' by default.")
397397
.internal()
398398
.stringConf
399399
.checkValues(Set("hive", "native"))
400-
.createWithDefault("native")
400+
.createWithDefault("hive")
401401

402402
val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
403403
.doc("Enables vectorized orc decoding.")
@@ -414,7 +414,7 @@ object SQLConf {
414414
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
415415
.doc("When true, enable filter pushdown for ORC files.")
416416
.booleanConf
417-
.createWithDefault(true)
417+
.createWithDefault(false)
418418

419419
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
420420
.doc("When true, check all the partition paths under the table\'s root directory " +

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,29 @@ package org.apache.spark.sql
2020
import java.io.FileNotFoundException
2121

2222
import org.apache.hadoop.fs.Path
23+
import org.scalatest.BeforeAndAfterAll
2324

2425
import org.apache.spark.SparkException
2526
import org.apache.spark.sql.internal.SQLConf
2627
import org.apache.spark.sql.test.SharedSQLContext
2728

28-
class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
29+
30+
class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
2931
import testImplicits._
3032

33+
override def beforeAll(): Unit = {
34+
super.beforeAll()
35+
spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
36+
}
37+
38+
override def afterAll(): Unit = {
39+
try {
40+
spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
41+
} finally {
42+
super.afterAll()
43+
}
44+
}
45+
3146
private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
3247
private val nameWithSpecialChars = "sp&cial%c hars"
3348

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ import org.apache.spark.util.Utils
3333
class FileStreamSinkSuite extends StreamTest {
3434
import testImplicits._
3535

36+
override def beforeAll(): Unit = {
37+
super.beforeAll()
38+
spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
39+
}
40+
41+
override def afterAll(): Unit = {
42+
try {
43+
spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
44+
} finally {
45+
super.afterAll()
46+
}
47+
}
48+
3649
test("unpartitioned writing and batch reading") {
3750
val inputData = MemoryStream[Int]
3851
val df = inputData.toDF()

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
207207
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
208208
}
209209

210+
override def beforeAll(): Unit = {
211+
super.beforeAll()
212+
spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
213+
}
214+
215+
override def afterAll(): Unit = {
216+
try {
217+
spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
218+
} finally {
219+
super.afterAll()
220+
}
221+
}
222+
210223
// ============= Basic parameter exists tests ================
211224

212225
test("FileStreamSource schema: no path") {

0 commit comments

Comments
 (0)