From 37e240cf12ea7463c2e0ea56501b812e12745869 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 3 Dec 2017 15:33:55 -0800 Subject: [PATCH 1/9] [SPARK-20728][SQL] Make ORCFileFormat configurable between sql/hive and sql/core --- .../apache/spark/sql/internal/SQLConf.scala | 5 ++ .../apache/spark/sql/DataFrameReader.scala | 2 +- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../spark/sql/execution/command/tables.scala | 5 +- .../execution/datasources/DataSource.scala | 13 ++- .../sql/execution/datasources/rules.scala | 4 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 21 +++++ .../sql/sources/DDLSourceLoadSuite.scala | 15 +++- .../sql/test/DataFrameReaderWriterSuite.scala | 79 +++++++++++-------- .../spark/sql/hive/HiveStrategies.scala | 14 +++- .../sql/hive/execution/SQLQuerySuite.scala | 17 ++++ 11 files changed, 129 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8abb4262d735..0a5b4ff3e40b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -363,6 +363,11 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_ENABLED = buildConf("spark.sql.orc.enabled") + .doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.") + .booleanConf + .createWithDefault(false) + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 17966eecfc05..47c2f52e08b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - val cls = DataSource.lookupDataSource(source) + val cls = DataSource.lookupDataSource(sparkSession, source) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val options = new DataSourceV2Options(extraOptions.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 35abeccfd514..f74629496125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(source) + val cls = DataSource.lookupDataSource(df.sparkSession, source) if (classOf[DataSourceV2].isAssignableFrom(cls)) { cls.newInstance() match { case ds: WriteSupport => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index c42e6c3257fa..91bd8ab3f62d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -190,7 +190,7 @@ case class AlterTableAddColumnsCommand( colsToAdd: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val catalogTable = verifyAlterTableAddColumn(catalog, table) + val catalogTable = verifyAlterTableAddColumn(sparkSession, catalog, table) try { sparkSession.catalog.uncacheTable(table.quotedString) @@ -216,6 +216,7 @@ case class AlterTableAddColumnsCommand( * For datasource table, it currently only supports parquet, json, csv. */ private def verifyAlterTableAddColumn( + sparkSession: SparkSession, catalog: SessionCatalog, table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) @@ -229,7 +230,7 @@ case class AlterTableAddColumnsCommand( } if (DDLUtils.isDatasourceTable(catalogTable)) { - DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match { + DataSource.lookupDataSource(sparkSession, catalogTable.provider.get).newInstance() match { // For datasource table, this command can only support the following File format. // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b43d282bd434..545d889e4063 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -36,8 +36,10 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} @@ -85,7 +87,7 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) + lazy val providingClass: Class[_] = DataSource.lookupDataSource(sparkSession, className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -568,8 +570,13 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { - val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { + var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && + sparkSession.conf.get(SQLConf.ORC_ENABLED)) { + logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}") + provider1 = classOf[OrcFileFormat].getCanonicalName + } val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 60c430bcfece..2b47e8b30cda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -108,8 +108,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi } // Check if the specified data source match the data source of the existing table. - val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get) + val existingProvider = DataSource.lookupDataSource(sparkSession, existingTable.provider.get) + val specifiedProvider = DataSource.lookupDataSource(sparkSession, tableDesc.provider.get) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). if (existingProvider != specifiedProvider) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 31d9b909ad46..a96924a6314d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -2782,4 +2784,23 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(spark.read.format(orc).load(path).collect().length == 2) } } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + withSQLConf(SQLConf.ORC_ENABLED.key -> "false") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + } + assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + } + + withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(classOf[OrcFileFormat])) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 3ce6ae3c5292..0a40b48dff14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))) } - test("should fail to load ORC without Hive Support") { - val e = intercept[AnalysisException] { - spark.read.format("orc").load() + test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { + Seq( + (true, "Unable to infer schema for ORC. It must be specified manually"), + (false, "The ORC data source must be used with Hive support")).foreach { case (value, m) => + withSQLConf(SQLConf.ORC_ENABLED.key -> s"$value") { + val e = intercept[AnalysisException] { + spark.read.format("orc").load() + } + assert(e.message.contains(m)) + } } - assert(e.message.contains("The ORC data source must be used with Hive support enabled")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index a5d7e6257a6d..356682498f79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -155,7 +155,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } - test("resolve default source") { spark.read .format("org.apache.spark.sql.test") @@ -478,42 +477,56 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema) } - /** - * This only tests whether API compiles, but does not run it as orc() - * cannot be run without Hive classes. - */ - ignore("orc - API") { - // Reader, with user specified schema - // Refer to csv-specific test suites for behavior without user specified schema - spark.read.schema(userSchema).orc() - spark.read.schema(userSchema).orc(dir) - spark.read.schema(userSchema).orc(dir, dir, dir) - spark.read.schema(userSchema).orc(Seq(dir, dir): _*) - Option(dir).map(spark.read.schema(userSchema).orc) + test("orc - API and behavior regarding schema") { + withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + // Writer + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir) + val df = spark.read.orc(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema - // Writer - spark.range(10).write.orc(dir) + // Reader, without user specified schema + intercept[AnalysisException] { + testRead(spark.read.orc(), Seq.empty, schema) + } + testRead(spark.read.orc(dir), data, schema) + testRead(spark.read.orc(dir, dir), data ++ data, schema) + testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema) + // Test explicit calls to single arg method - SPARK-16009 + testRead(Option(dir).map(spark.read.orc).get, data, schema) + + // Reader, with user specified schema, data should be nulls as schema in file different + // from user schema + val expData = Seq[String](null, null, null) + testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).orc(dir), expData, userSchema) + testRead(spark.read.schema(userSchema).orc(dir, dir), expData ++ expData, userSchema) + testRead( + spark.read.schema(userSchema).orc(Seq(dir, dir): _*), expData ++ expData, userSchema) + } } test("column nullability and comment - write and then read") { - Seq("json", "parquet", "csv").foreach { format => - val schema = StructType( - StructField("cl1", IntegerType, nullable = false).withComment("test") :: - StructField("cl2", IntegerType, nullable = true) :: - StructField("cl3", IntegerType, nullable = true) :: Nil) - val row = Row(3, null, 4) - val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) - - val tableName = "tab" - withTable(tableName) { - df.write.format(format).mode("overwrite").saveAsTable(tableName) - // Verify the DDL command result: DESCRIBE TABLE - checkAnswer( - sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), - Row("cl1", "test") :: Nil) - // Verify the schema - val expectedFields = schema.fields.map(f => f.copy(nullable = true)) - assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + Seq("json", "orc", "parquet", "csv").foreach { format => + val schema = StructType( + StructField("cl1", IntegerType, nullable = false).withComment("test") :: + StructField("cl2", IntegerType, nullable = true) :: + StructField("cl3", IntegerType, nullable = true) :: Nil) + val row = Row(3, null, 4) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + + val tableName = "tab" + withTable(tableName) { + df.write.format(format).mode("overwrite").saveAsTable(tableName) + // Verify the DDL command result: DESCRIBE TABLE + checkAnswer( + sql(s"desc $tableName").select("col_name", "comment").where($"comment" === "test"), + Row("cl1", "test") :: Nil) + // Verify the schema + val expectedFields = schema.fields.map(f => f.copy(nullable = true)) + assert(spark.table(tableName).schema == schema.copy(fields = expectedFields)) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ee1f6ee17306..e8b426f8a6d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -195,8 +195,18 @@ case class RelationConversions( .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { val options = relation.tableMeta.storage.properties - sessionCatalog.metastoreCatalog - .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc") + if (conf.getConf(SQLConf.ORC_ENABLED)) { + sessionCatalog.metastoreCatalog.convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], + "orc") + } else { + sessionCatalog.metastoreCatalog.convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], "orc") + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c11e37a51664..0e0e4dee09ff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2153,4 +2153,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + Seq( + (true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), + (false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) => + + withSQLConf(SQLConf.ORC_ENABLED.key -> s"$v") { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(format)) + } + } + } + } } From e7beb02ef8b1f9761c77952fc69d087c7cc92d3f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 3 Dec 2017 22:16:24 -0800 Subject: [PATCH 2/9] Address comments. --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 ++++++--- .../scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/execution/command/tables.scala | 7 ++++--- .../spark/sql/execution/datasources/DataSource.scala | 8 ++++---- .../apache/spark/sql/execution/datasources/rules.scala | 5 +++-- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- .../apache/spark/sql/sources/DDLSourceLoadSuite.scala | 2 +- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 11 files changed, 26 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0a5b4ff3e40b..2694bbd822c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -363,10 +363,13 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") - val ORC_ENABLED = buildConf("spark.sql.orc.enabled") - .doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") + .doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + + "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + + "more stable and faster.") + .internal() .booleanConf - .createWithDefault(false) + .createWithDefault(true) val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 47c2f52e08b5..830545c33596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - val cls = DataSource.lookupDataSource(sparkSession, source) + val cls = DataSource.lookupDataSource(sparkSession.sessionState.conf, source) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val options = new DataSourceV2Options(extraOptions.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f74629496125..2905e8aa9e9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(df.sparkSession, source) + val cls = DataSource.lookupDataSource(df.sparkSession.sessionState.conf, source) if (classOf[DataSourceV2].isAssignableFrom(cls)) { cls.newInstance() match { case ds: WriteSupport => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 91bd8ab3f62d..cc1cd8a20bdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils @@ -190,7 +191,7 @@ case class AlterTableAddColumnsCommand( colsToAdd: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val catalogTable = verifyAlterTableAddColumn(sparkSession, catalog, table) + val catalogTable = verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table) try { sparkSession.catalog.uncacheTable(table.quotedString) @@ -216,7 +217,7 @@ case class AlterTableAddColumnsCommand( * For datasource table, it currently only supports parquet, json, csv. */ private def verifyAlterTableAddColumn( - sparkSession: SparkSession, + conf: SQLConf, catalog: SessionCatalog, table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) @@ -230,7 +231,7 @@ case class AlterTableAddColumnsCommand( } if (DDLUtils.isDatasourceTable(catalogTable)) { - DataSource.lookupDataSource(sparkSession, catalogTable.provider.get).newInstance() match { + DataSource.lookupDataSource(conf, catalogTable.provider.get).newInstance() match { // For datasource table, this command can only support the following File format. // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 545d889e4063..1a03bd5220c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -87,7 +87,8 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = DataSource.lookupDataSource(sparkSession, className) + lazy val providingClass: Class[_] = + DataSource.lookupDataSource(sparkSession.sessionState.conf, className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -570,10 +571,9 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { + def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) - if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && - sparkSession.conf.get(SQLConf.ORC_ENABLED)) { + if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}") provider1 = classOf[OrcFileFormat].getCanonicalName } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 2b47e8b30cda..9149b90e9251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -108,8 +108,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi } // Check if the specified data source match the data source of the existing table. - val existingProvider = DataSource.lookupDataSource(sparkSession, existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(sparkSession, tableDesc.provider.get) + val conf = sparkSession.sessionState.conf + val existingProvider = DataSource.lookupDataSource(conf, existingTable.provider.get) + val specifiedProvider = DataSource.lookupDataSource(conf, tableDesc.provider.get) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). if (existingProvider != specifiedProvider) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a96924a6314d..331e9dac8b32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2786,14 +2786,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { - withSQLConf(SQLConf.ORC_ENABLED.key -> "false") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "false") { val e = intercept[AnalysisException] { sql("CREATE TABLE spark_20728(a INT) USING ORC") } assert(e.message.contains("The ORC data source must be used with Hive support enabled")) } - withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { withTable("spark_20728") { sql("CREATE TABLE spark_20728(a INT) USING ORC") val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 0a40b48dff14..3f288bed14ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -59,7 +59,7 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { Seq( (true, "Unable to infer schema for ORC. It must be specified manually"), (false, "The ORC data source must be used with Hive support")).foreach { case (value, m) => - withSQLConf(SQLConf.ORC_ENABLED.key -> s"$value") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$value") { val e = intercept[AnalysisException] { spark.read.format("orc").load() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 356682498f79..210988548751 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -478,7 +478,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } test("orc - API and behavior regarding schema") { - withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { // Writer spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir) val df = spark.read.orc(dir) @@ -507,7 +507,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } test("column nullability and comment - write and then read") { - withSQLConf(SQLConf.ORC_ENABLED.key -> "true") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { Seq("json", "orc", "parquet", "csv").foreach { format => val schema = StructType( StructField("cl1", IntegerType, nullable = false).withComment("test") :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e8b426f8a6d8..6a3850f4739d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -195,7 +195,7 @@ case class RelationConversions( .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { val options = relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_ENABLED)) { + if (conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { sessionCatalog.metastoreCatalog.convertToLogicalRelation( relation, options, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0e0e4dee09ff..655f7091f360 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2159,7 +2159,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { (true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), (false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) => - withSQLConf(SQLConf.ORC_ENABLED.key -> s"$v") { + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") { withTable("spark_20728") { sql("CREATE TABLE spark_20728(a INT) USING ORC") val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { From 8b7e88a833adf1d3138ce426142b6bb6abe057df Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 3 Dec 2017 22:24:31 -0800 Subject: [PATCH 3/9] fix --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 1a03bd5220c3..b717c05ff957 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -573,8 +573,7 @@ object DataSource extends Logging { /** Given a provider name, look up the data source class definition. */ def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) - if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { - logInfo(s"$provider1 is replaced with ${classOf[OrcFileFormat].getCanonicalName}") + if ("orc".equalsIgnoreCase(provider1) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { provider1 = classOf[OrcFileFormat].getCanonicalName } val provider2 = s"$provider1.DefaultSource" From 2e498f9c1a748bdb648705c16305f9f34e638ff8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 3 Dec 2017 22:34:58 -0800 Subject: [PATCH 4/9] Remove redundant test case. --- .../spark/sql/sources/DDLSourceLoadSuite.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 3f288bed14ac..f22d843bfabd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.{AnalysisException, SQLContext} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -54,19 +53,6 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne") .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))) } - - test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { - Seq( - (true, "Unable to infer schema for ORC. It must be specified manually"), - (false, "The ORC data source must be used with Hive support")).foreach { case (value, m) => - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$value") { - val e = intercept[AnalysisException] { - spark.read.format("orc").load() - } - assert(e.message.contains(m)) - } - } - } } From 5474a070ada64be7467a64fcd849064b063e2e42 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Dec 2017 00:33:45 -0800 Subject: [PATCH 5/9] Address comments. --- .../execution/datasources/DataSource.scala | 10 +++++++--- .../spark/sql/hive/HiveStrategies.scala | 4 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 17 ----------------- .../spark/sql/hive/orc/OrcQuerySuite.scala | 19 ++++++++++++++++++- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b717c05ff957..78593e398158 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -540,6 +540,7 @@ object DataSource extends Logging { val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" + val newOrc = classOf[OrcFileFormat].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -556,6 +557,8 @@ object DataSource extends Logging { "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, "org.apache.spark.sql.hive.orc" -> orc, + "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> newOrc, + "org.apache.spark.sql.execution.datasources.orc" -> newOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv @@ -572,9 +575,10 @@ object DataSource extends Logging { /** Given a provider name, look up the data source class definition. */ def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { - var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) - if ("orc".equalsIgnoreCase(provider1) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { - provider1 = classOf[OrcFileFormat].getCanonicalName + val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { + case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_USE_NEW_VERSION) => + classOf[OrcFileFormat].getCanonicalName + case name => name } val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6a3850f4739d..d4e4a4406f34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -205,7 +204,8 @@ case class RelationConversions( sessionCatalog.metastoreCatalog.convertToLogicalRelation( relation, options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], "orc") + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], + "orc") } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 655f7091f360..c11e37a51664 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2153,21 +2153,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } - - test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { - Seq( - (true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), - (false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) => - - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") { - withTable("spark_20728") { - sql("CREATE TABLE spark_20728(a INT) USING ORC") - val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { - case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass - } - assert(fileFormat == Some(format)) - } - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 1fa9091f967a..467fcd6c44cb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -621,4 +621,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { makeOrcFile((1 to 10).map(Tuple1.apply), path2) assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + Seq( + (true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), + (false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) => + + withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(format)) + } + } + } + } } From 2393e1de729441b27bc5cdd83804071f14d77a4b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Dec 2017 09:02:23 -0800 Subject: [PATCH 6/9] Change function signature. --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../org/apache/spark/sql/execution/datasources/rules.scala | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 830545c33596..ea1cf6677523 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -182,7 +182,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - val cls = DataSource.lookupDataSource(sparkSession.sessionState.conf, source) + val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val options = new DataSourceV2Options(extraOptions.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2905e8aa9e9f..59a01e61124f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -234,7 +234,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(df.sparkSession.sessionState.conf, source) + val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { cls.newInstance() match { case ds: WriteSupport => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index cc1cd8a20bdd..e400975f1970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -231,7 +231,7 @@ case class AlterTableAddColumnsCommand( } if (DDLUtils.isDatasourceTable(catalogTable)) { - DataSource.lookupDataSource(conf, catalogTable.provider.get).newInstance() match { + DataSource.lookupDataSource(catalogTable.provider.get, conf).newInstance() match { // For datasource table, this command can only support the following File format. // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 78593e398158..1f910cd90481 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -88,7 +88,7 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) lazy val providingClass: Class[_] = - DataSource.lookupDataSource(sparkSession.sessionState.conf, className) + DataSource.lookupDataSource(className, sparkSession.sessionState.conf) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -574,7 +574,7 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { + def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_USE_NEW_VERSION) => classOf[OrcFileFormat].getCanonicalName diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9149b90e9251..6e08df75b8a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -109,8 +109,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // Check if the specified data source match the data source of the existing table. val conf = sparkSession.sessionState.conf - val existingProvider = DataSource.lookupDataSource(conf, existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(conf, tableDesc.provider.get) + val existingProvider = DataSource.lookupDataSource(existingTable.provider.get, conf) + val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). if (existingProvider != specifiedProvider) { From 8bc420ab6a085360f3996759819ad44dd40f9703 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Dec 2017 11:52:04 -0800 Subject: [PATCH 7/9] Use spark.sql.orc.impl. --- .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++------ .../spark/sql/execution/datasources/DataSource.scala | 9 +++++---- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 6 +++--- 6 files changed, 19 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2694bbd822c6..ce9cc562b220 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -363,13 +363,13 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") - val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") - .doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + - "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + - "more stable and faster.") + val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") + .doc("When native, use the native version of ORC support instead of the ORC library in Hive " + + "1.2.1. It is 'hive' by default prior to Spark 2.3.") .internal() - .booleanConf - .createWithDefault(true) + .stringConf + .checkValues(Set("hive", "native")) + .createWithDefault("native") val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 1f910cd90481..bb9cd1ef8079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -540,7 +540,7 @@ object DataSource extends Logging { val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" - val newOrc = classOf[OrcFileFormat].getCanonicalName + val nativeOrc = classOf[OrcFileFormat].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -557,8 +557,8 @@ object DataSource extends Logging { "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, "org.apache.spark.sql.hive.orc" -> orc, - "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> newOrc, - "org.apache.spark.sql.execution.datasources.orc" -> newOrc, + "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc, + "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv @@ -576,7 +576,8 @@ object DataSource extends Logging { /** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider: String, conf: SQLConf): Class[_] = { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { - case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_USE_NEW_VERSION) => + case name if name.equalsIgnoreCase("orc") && + conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName case name => name } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 331e9dac8b32..5d63708913bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2786,14 +2786,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "false") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") { val e = intercept[AnalysisException] { sql("CREATE TABLE spark_20728(a INT) USING ORC") } assert(e.message.contains("The ORC data source must be used with Hive support enabled")) } - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { withTable("spark_20728") { sql("CREATE TABLE spark_20728(a INT) USING ORC") val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 210988548751..8c9bb7d56a35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -478,7 +478,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } test("orc - API and behavior regarding schema") { - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { // Writer spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir) val df = spark.read.orc(dir) @@ -507,7 +507,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } test("column nullability and comment - write and then read") { - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> "true") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { Seq("json", "orc", "parquet", "csv").foreach { format => val schema = StructType( StructField("cl1", IntegerType, nullable = false).withComment("test") :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d4e4a4406f34..3018c0642f06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -194,7 +194,7 @@ case class RelationConversions( .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { val options = relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { + if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { sessionCatalog.metastoreCatalog.convertToLogicalRelation( relation, options, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 467fcd6c44cb..1ffaf3031103 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -624,10 +624,10 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { Seq( - (true, classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), - (false, classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (v, format) => + ("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), + ("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) => - withSQLConf(SQLConf.ORC_USE_NEW_VERSION.key -> s"$v") { + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) { withTable("spark_20728") { sql("CREATE TABLE spark_20728(a INT) USING ORC") val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { From e3f6f75084006a0ceb1c2e50c47819bc9ae8e463 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Dec 2017 23:45:22 -0800 Subject: [PATCH 8/9] Add new OrcFileFormat into DataSourceRegister. --- .../org.apache.spark.sql.sources.DataSourceRegister | 1 + .../apache/spark/sql/execution/datasources/DataSource.scala | 6 +++++- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0c5f3f22e31e..6cdfe2fae564 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,7 @@ org.apache.spark.sql.execution.datasources.csv.CSVFileFormat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index bb9cd1ef8079..52437b84abe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -579,6 +579,9 @@ object DataSource extends Logging { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName + case name if name.equalsIgnoreCase("orc") && + conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => + "org.apache.spark.sql.hive.orc.OrcFileFormat" case name => name } val provider2 = s"$provider1.DefaultSource" @@ -598,7 +601,8 @@ object DataSource extends Logging { if (provider1.toLowerCase(Locale.ROOT) == "orc" || provider1.startsWith("org.apache.spark.sql.hive.orc")) { throw new AnalysisException( - "The ORC data source must be used with Hive support enabled") + "Hive-based ORC data source must be used with Hive support enabled. " + + "Please use native ORC data source instead") } else if (provider1.toLowerCase(Locale.ROOT) == "avro" || provider1 == "com.databricks.spark.avro") { throw new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5d63708913bf..86bd9b95bca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1666,7 +1666,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { e = intercept[AnalysisException] { sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`") } - assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + assert(e.message.contains("Hive-based ORC data source must be used with Hive support")) e = intercept[AnalysisException] { sql(s"select id from `com.databricks.spark.avro`.`file_path`") @@ -2790,7 +2790,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e = intercept[AnalysisException] { sql("CREATE TABLE spark_20728(a INT) USING ORC") } - assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + assert(e.message.contains("Hive-based ORC data source must be used with Hive support")) } withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { From 7fac88f3a25122d4445f2491dcfc3ebc34749186 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 4 Dec 2017 23:47:07 -0800 Subject: [PATCH 9/9] fix indent --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 52437b84abe9..5f12d5f93a35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -580,7 +580,7 @@ object DataSource extends Logging { conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName case name if name.equalsIgnoreCase("orc") && - conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => + conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" case name => name }