diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 0f9f01e18682f..ea57a0c7dec0c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1004,6 +1004,29 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
+## ORC Files
+
+Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files.
+To do that, the following configurations are newly added. The vectorized reader is used for the
+native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl`
+is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC
+serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`),
+the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also set to `true`.
+
+
@@ -1776,35 +1799,6 @@ working with timestamps in `pandas_udf`s to get the best performance, see
## Upgrading From Spark SQL 2.2 to 2.3
- - Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file format for ORC files. To do that, the following configurations are newly added or change their default values. The vectorized reader is used for the native ORC tables (e.g., the ones created using the clause `USING ORC`) when `spark.sql.orc.impl` is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 'ORC')`), the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is set to `true`.
-
- - New configurations
-
-
- | Property Name | Default | Meaning |
-
- spark.sql.orc.impl |
- native |
- The name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1 which is used prior to Spark 2.3. |
-
-
- spark.sql.orc.enableVectorizedReader |
- true |
- Enables vectorized orc decoding in native implementation. If false, a new non-vectorized ORC reader is used in native implementation. For hive implementation, this is ignored. |
-
-
-
- - Changed configurations
-
-
- | Property Name | Default | Meaning |
-
- spark.sql.orc.filterPushdown |
- true |
- Enables filter pushdown for ORC files. It is false by default prior to Spark 2.3. |
-
-
-
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
- The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
- Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7835dbaa58439..f24fd7ff74d3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -399,11 +399,11 @@ object SQLConf {
val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
.doc("When native, use the native version of ORC support instead of the ORC library in Hive " +
- "1.2.1. It is 'hive' by default prior to Spark 2.3.")
+ "1.2.1. It is 'hive' by default.")
.internal()
.stringConf
.checkValues(Set("hive", "native"))
- .createWithDefault("native")
+ .createWithDefault("hive")
val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
.doc("Enables vectorized orc decoding.")
@@ -426,7 +426,7 @@ object SQLConf {
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 2e332362ea644..b5d4c558f0d3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -20,14 +20,29 @@ package org.apache.spark.sql
import java.io.FileNotFoundException
import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
+
+class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with BeforeAndAfterAll {
import testImplicits._
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+ } finally {
+ super.afterAll()
+ }
+ }
+
private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
private val nameWithSpecialChars = "sp&cial%c hars"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8c4e1fd00b0a2..ba48bc1ce0c4d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -33,6 +33,19 @@ import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest {
import testImplicits._
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+ } finally {
+ super.afterAll()
+ }
+ }
+
test("unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 5bb0f4d643bbe..d4bd9c7987f2d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -207,6 +207,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
.collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
}
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+ } finally {
+ super.afterAll()
+ }
+ }
+
// ============= Basic parameter exists tests ================
test("FileStreamSource schema: no path") {