diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c1b885a72ad3..ebc8c3705ea2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1419,8 +1419,15 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(100) - val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") + val USE_V1_SOURCE_READER_LIST = buildConf("spark.sql.sources.read.useV1SourceList") .internal() + .doc("A comma-separated list of data source short names or fully qualified data source" + + " register class names for which data source V2 read paths are disabled. Reads from these" + + " sources will fall back to the V1 sources.") + .stringConf + .createWithDefault("") + + val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.") .stringConf @@ -2002,6 +2009,8 @@ class SQLConf extends Serializable with Logging { def continuousStreamingExecutorPollIntervalMs: Long = getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS) + def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) + def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) def disabledV2StreamingMicroBatchReaders: String = diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java index 1c5e3a0cd31e..00af0bf1b172 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java @@ -163,6 +163,11 @@ public double getDouble(String key, double defaultValue) { */ public static final String DATABASE_KEY = "database"; + /** + * The option key for whether to check existence of files for a table. + */ + public static final String CHECK_FILES_EXIST_KEY = "check_files_exist"; + /** * Returns all the paths specified by both the singular path option and the multiple * paths option. @@ -197,4 +202,9 @@ public Optional tableName() { public Optional databaseName() { return get(DATABASE_KEY); } + + public Boolean checkFilesExist() { + Optional result = get(CHECK_FILES_EXIST_KEY); + return result.isPresent() && result.get().equals("true"); + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index af369a5bca46..2b1521730bc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String @@ -193,7 +192,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) + val useV1Sources = + sparkSession.sessionState.conf.userV1SourceReaderList.toLowerCase(Locale.ROOT).split(",") + val lookupCls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) + val cls = lookupCls.newInstance() match { + case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) || + useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) => + f.fallBackFileFormat + case _ => lookupCls + } + if (classOf[TableProvider].isAssignableFrom(cls)) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( @@ -202,7 +210,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val objectMapper = new ObjectMapper() DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) } - val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "true" + val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption val dsOptions = new DataSourceOptions(finalOptions.asJava) val table = userSpecifiedSchema match { case Some(schema) => provider.getTable(dsOptions, schema) @@ -211,6 +220,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { table match { case _: SupportsBatchRead => Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions)) + case _ => loadV1Source(paths: _*) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 228dcb94b9ac..d9404cd92992 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -248,7 +248,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, session.sessionState.conf) - val options = sessionOptions ++ extraOptions + val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "false" + val options = sessionOptions ++ extraOptions + checkFilesExistsOption val dsOptions = new DataSourceOptions(options.asJava) provider.getTable(dsOptions) match { case table: SupportsBatchWrite => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 8b84eda36103..f6f3fb18c97f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -373,8 +373,7 @@ case class FileSourceScanExec( val filesGroupedToBuckets = selectedPartitions.flatMap { p => p.files.filter(_.getLen > 0).map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) - PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } }.groupBy { f => BucketingUtils @@ -410,107 +409,35 @@ case class FileSourceScanExec( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val defaultMaxSplitBytes = - fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism - - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + val maxSplitBytes = + FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => partition.files.filter(_.getLen > 0).flatMap { file => - val blockLocations = getBlockLocations(file) - if (fsRelation.fileFormat.isSplitable( - fsRelation.sparkSession, fsRelation.options, file.getPath)) { - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset - val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile( - partition.values, file.getPath.toUri.toString, offset, size, hosts) - } - } else { - val hosts = getBlockHosts(blockLocations, 0, file.getLen) - Seq(PartitionedFile( - partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) - } + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - val partitions = new ArrayBuffer[FilePartition] - val currentFiles = new ArrayBuffer[PartitionedFile] - var currentSize = 0L - - /** Close the current partition and move to the next. */ - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - val newPartition = - FilePartition( - partitions.size, - currentFiles.toArray.toSeq) // Copy to a new Array. - partitions += newPartition - } - currentFiles.clear() - currentSize = 0 - } - - // Assign files to partitions using "Next Fit Decreasing" - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { - closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes - currentFiles += file - } - closePartition() + val partitions = + FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } - private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { - case f: LocatedFileStatus => f.getBlockLocations - case f => Array.empty[BlockLocation] - } - - // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)` - // pair that represents a segment of the same file, find out the block that contains the largest - // fraction the segment, and returns location hosts of that block. If no such block can be found, - // returns an empty array. - private def getBlockHosts( - blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = { - val candidates = blockLocations.map { - // The fragment starts from a position within this block - case b if b.getOffset <= offset && offset < b.getOffset + b.getLength => - b.getHosts -> (b.getOffset + b.getLength - offset).min(length) - - // The fragment ends at a position within this block - case b if offset <= b.getOffset && offset + length < b.getLength => - b.getHosts -> (offset + length - b.getOffset).min(length) - - // The fragment fully contains this block - case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length => - b.getHosts -> b.getLength - - // The fragment doesn't intersect with this block - case b => - b.getHosts -> 0L - }.filter { case (hosts, size) => - size > 0L - } - - if (candidates.isEmpty) { - Array.empty[String] - } else { - val (hosts, _) = candidates.maxBy { case (_, size) => size } - hosts - } - } - override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala new file mode 100644 index 000000000000..3196624f7c7c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ + +object PartitionedFileUtil { + def splitFiles( + sparkSession: SparkSession, + file: FileStatus, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + if (isSplitable) { + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(getBlockLocations(file), offset, size) + PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts) + } + } else { + Seq(getPartitionedFile(file, filePath, partitionValues)) + } + } + + def getPartitionedFile( + file: FileStatus, + filePath: Path, + partitionValues: InternalRow): PartitionedFile = { + val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen) + PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts) + } + + private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { + case f: LocatedFileStatus => f.getBlockLocations + case f => Array.empty[BlockLocation] + } + // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)` + // pair that represents a segment of the same file, find out the block that contains the largest + // fraction the segment, and returns location hosts of that block. If no such block can be found, + // returns an empty array. + private def getBlockHosts( + blockLocations: Array[BlockLocation], + offset: Long, + length: Long): Array[String] = { + val candidates = blockLocations.map { + // The fragment starts from a position within this block + case b if b.getOffset <= offset && offset < b.getOffset + b.getLength => + b.getHosts -> (b.getOffset + b.getLength - offset).min(length) + + // The fragment ends at a position within this block + case b if offset <= b.getOffset && offset + length < b.getLength => + b.getHosts -> (offset + length - b.getOffset).min(length) + + // The fragment fully contains this block + case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length => + b.getHosts -> b.getLength + + // The fragment doesn't intersect with this block + case b => + b.getHosts -> 0L + }.filter { case (hosts, size) => + size > 0L + } + + if (candidates.isEmpty) { + Array.empty[String] + } else { + val (hosts, _) = candidates.maxBy { case (_, size) => size } + hosts + } + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e2cd40906f40..d24e66e58385 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -214,7 +215,7 @@ case class AlterTableAddColumnsCommand( /** * ALTER TABLE ADD COLUMNS command does not support temporary view/table, * view, or datasource table with text, orc formats or external provider. - * For datasource table, it currently only supports parquet, json, csv. + * For datasource table, it currently only supports parquet, json, csv, orc. */ private def verifyAlterTableAddColumn( conf: SQLConf, @@ -237,7 +238,7 @@ case class AlterTableAddColumnsCommand( // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not // come in here. - case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat => + case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat | _: OrcDataSourceV2 => case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") => case s => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5dad784e45af..d48261e783dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -40,6 +41,8 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.internal.SQLConf @@ -90,8 +93,19 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = - DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + lazy val providingClass: Class[_] = { + val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + // `providingClass` is used for resolving data source relation for catalog tables. + // As now catalog for data source V2 is under development, here we fall back all the + // [[FileDataSourceV2]] to [[FileFormat]] to guarantee the current catalog works. + // [[FileDataSourceV2]] will still be used if we call the load()/save() method in + // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource` + // instead of `providingClass`. + cls.newInstance() match { + case f: FileDataSourceV2 => f.fallBackFileFormat + case _ => cls + } + } lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver @@ -543,40 +557,9 @@ case class DataSource( checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() - val allGlobPath = allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - }.toSeq - if (checkFilesExist) { - val (filteredOut, filteredIn) = allGlobPath.partition { path => - InMemoryFileIndex.shouldFilterOut(path.getName) - } - if (filteredOut.nonEmpty) { - if (filteredIn.isEmpty) { - logWarning( - s"All paths were ignored:\n ${filteredOut.mkString("\n ")}") - } else { - logDebug( - s"Some paths were ignored:\n ${filteredOut.mkString("\n ")}") - } - } - } - - allGlobPath + DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, + checkEmptyGlobPath, checkFilesExist) } } @@ -632,7 +615,7 @@ object DataSource extends Logging { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => - classOf[OrcFileFormat].getCanonicalName + classOf[OrcDataSourceV2].getCanonicalName case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" @@ -721,6 +704,48 @@ object DataSource extends Logging { } } + /** + * Checks and returns files in all the paths. + */ + private[sql] def checkAndGlobPathIfNecessary( + paths: Seq[String], + hadoopConf: Configuration, + checkEmptyGlobPath: Boolean, + checkFilesExist: Boolean): Seq[Path] = { + val allGlobPath = paths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (checkEmptyGlobPath && globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + } + + if (checkFilesExist) { + val (filteredOut, filteredIn) = allGlobPath.partition { path => + InMemoryFileIndex.shouldFilterOut(path.getName) + } + if (filteredIn.isEmpty) { + logWarning( + s"All paths were ignored:\n ${filteredOut.mkString("\n ")}") + } else { + logDebug( + s"Some paths were ignored:\n ${filteredOut.mkString("\n ")}") + } + } + + allGlobPath + } + /** * When creating a data source table, the `path` option has a special meaning: the table location. * This method extracts the `path` option and treat it as table location to build a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala new file mode 100644 index 000000000000..cefdfd14b3b3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable + +/** + * Replace the ORC V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]]. + * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails + * since there is no corresponding physical plan. + * SPARK-23817: This is a temporary hack for making current data source V2 work. It should be + * removed when write path of file data source v2 is finished. + */ +class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) => + val v1FileFormat = new OrcFileFormat + val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema, + table.schema(), None, v1FileFormat, d.options)(sparkSession) + i.copy(table = LogicalRelation(relation)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala new file mode 100644 index 000000000000..4b1ade8e2957 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.Partition +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.sources.v2.reader.InputPartition + +/** + * A collection of file blocks that should be read as a single task + * (possibly from multiple partitioned directories). + */ +case class FilePartition(index: Int, files: Seq[PartitionedFile]) + extends Partition with InputPartition { + override def preferredLocations(): Array[String] = { + // Computes total number of bytes can be retrieved from each host. + val hostToNumBytes = mutable.HashMap.empty[String, Long] + files.foreach { file => + file.locations.filter(_ != "localhost").foreach { host => + hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length + } + } + + // Takes the first 3 hosts with the most data to be retrieved + hostToNumBytes.toSeq.sortBy { + case (host, numBytes) => numBytes + }.reverse.take(3).map { + case (host, numBytes) => host + }.toArray + } +} + +object FilePartition extends Logging { + + def getFilePartitions( + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile], + maxSplitBytes: Long): Seq[FilePartition] = { + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + /** Close the current partition and move to the next. */ + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + // Copy to a new Array. + val newPartition = FilePartition(partitions.size, currentFiles.toArray.toSeq) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + // Assign files to partitions using "Next Fit Decreasing" + partitionedFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + partitions + } + + def maxSplitBytes( + sparkSession: SparkSession, + selectedPartitions: Seq[PartitionDirectory]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val defaultParallelism = sparkSession.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index ffea33c08ef9..d92ea2e06895 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -29,6 +29,7 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator @@ -53,12 +54,6 @@ case class PartitionedFile( } } -/** - * A collection of file blocks that should be read as a single task - * (possibly from multiple partitioned directories). - */ -case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition - /** * An RDD that scans a list of file partitions. */ @@ -216,21 +211,6 @@ class FileScanRDD( override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { - val files = split.asInstanceOf[FilePartition].files - - // Computes total number of bytes can be retrieved from each host. - val hostToNumBytes = mutable.HashMap.empty[String, Long] - files.foreach { file => - file.locations.filter(_ != "localhost").foreach { host => - hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length - } - } - - // Takes the first 3 hosts with the most data to be retrieved - hostToNumBytes.toSeq.sortBy { - case (host, numBytes) => numBytes - }.reverse.take(3).map { - case (host, numBytes) => host - } + split.asInstanceOf[FilePartition].preferredLocations() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index b2f73b7f8d1f..d278802e6c9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -52,28 +52,12 @@ case class HadoopFsRelation( override def sqlContext: SQLContext = sparkSession.sqlContext - private def getColName(f: StructField): String = { - if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { - f.name - } else { - f.name.toLowerCase(Locale.ROOT) - } - } - - val overlappedPartCols = mutable.Map.empty[String, StructField] - partitionSchema.foreach { partitionField => - if (dataSchema.exists(getColName(_) == getColName(partitionField))) { - overlappedPartCols += getColName(partitionField) -> partitionField - } - } - // When data and partition schemas have overlapping columns, the output // schema respects the order of the data schema for the overlapping columns, and it // respects the data types of the partition schema. - val schema: StructType = { - StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++ - partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f)))) - } + val (schema: StructType, overlappedPartCols: Map[String, StructField]) = + PartitioningUtils.mergeDataAndPartitionSchema(dataSchema, + partitionSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis) def partitionSchemaOption: Option[StructType] = if (partitionSchema.isEmpty) None else Some(partitionSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ee770426e61f..a2e08180cc50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -21,6 +21,7 @@ import java.lang.{Double => JDouble, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.util.{Locale, TimeZone} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -30,7 +31,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -65,9 +66,7 @@ object PartitioningUtils { require(columnNames.size == literals.size) } - import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME - import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName - import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.{escapePathName, unescapePathName, DEFAULT_PARTITION_NAME} /** * Given a group of qualified paths, tries to parse them and returns a partition specification. @@ -558,6 +557,35 @@ object PartitioningUtils { }).asNullable } + def mergeDataAndPartitionSchema( + dataSchema: StructType, + partitionSchema: StructType, + caseSensitive: Boolean): (StructType, Map[String, StructField]) = { + val overlappedPartCols = mutable.Map.empty[String, StructField] + partitionSchema.foreach { partitionField => + val partitionFieldName = getColName(partitionField, caseSensitive) + if (dataSchema.exists(getColName(_, caseSensitive) == partitionFieldName)) { + overlappedPartCols += partitionFieldName -> partitionField + } + } + + // When data and partition schemas have overlapping columns, the output + // schema respects the order of the data schema for the overlapping columns, and it + // respects the data types of the partition schema. + val fullSchema = + StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f, caseSensitive), f)) ++ + partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f, caseSensitive)))) + (fullSchema, overlappedPartCols.toMap) + } + + def getColName(f: StructField, caseSensitive: Boolean): String = { + if (caseSensitive) { + f.name + } else { + f.name.toLowerCase(Locale.ROOT) + } + } + private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = { if (caseSensitive) { org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 0a64981b421c..cd2a68a53bab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -82,22 +82,24 @@ private[sql] object OrcFilters { */ def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) - } yield filter - for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) + conjunction <- buildTree(convertibleFilters(schema, dataTypeMap, filters)) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) } yield builder.build() } + def convertibleFilters( + schema: StructType, + dataTypeMap: Map[String, DataType], + filters: Seq[Filter]): Seq[Filter] = { + for { + filter <- filters + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder()) + } yield filter + } + /** * Return true if this is a searchable type in ORC. * Both CharType and VarcharType are cleaned at AstBuilder. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala new file mode 100644 index 000000000000..b177d15e1fe3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/EmptyPartitionReader.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.io.IOException + +import org.apache.spark.sql.sources.v2.reader.PartitionReader + +/** + * A [[PartitionReader]] with empty output. + */ +class EmptyPartitionReader[T] extends PartitionReader[T] { + override def next(): Boolean = false + + override def get(): T = + throw new IOException("No records should be returned from EmptyDataReader") + + override def close(): Unit = {} +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala new file mode 100644 index 000000000000..a0c932cbb0e0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider} +import org.apache.spark.sql.types.StructType + +/** + * A base interface for data source v2 implementations of the built-in file-based data sources. + */ +trait FileDataSourceV2 extends TableProvider with DataSourceRegister { + /** + * Returns a V1 [[FileFormat]] class of the same file data source. + * This is a solution for the following cases: + * 1. File datasource V2 implementations cause regression. Users can disable the problematic data + * source via SQL configuration and fall back to FileFormat. + * 2. Catalog support is required, which is still under development for data source V2. + */ + def fallBackFileFormat: Class[_ <: FileFormat] + + lazy val sparkSession = SparkSession.active + + def getFileIndex( + options: DataSourceOptions, + userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = { + val filePaths = options.paths() + val hadoopConf = + sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf, + checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist()) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex(sparkSession, rootPathsSpecified, + options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala new file mode 100644 index 000000000000..d76d69dba31d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import java.io.{FileNotFoundException, IOException} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.PartitionReader + +class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]]) + extends PartitionReader[T] with Logging { + private var currentReader: PartitionedFileReader[T] = null + + private val sqlConf = SQLConf.get + private def ignoreMissingFiles = sqlConf.ignoreMissingFiles + private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + + override def next(): Boolean = { + if (currentReader == null) { + if (readers.hasNext) { + if (ignoreMissingFiles || ignoreCorruptFiles) { + try { + currentReader = readers.next() + logInfo(s"Reading file $currentReader") + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentReader", e) + currentReader = null + return false + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentReader", e) + currentReader = null + return false + } + } else { + currentReader = readers.next() + logInfo(s"Reading file $currentReader") + } + } else { + return false + } + } + if (currentReader.next()) { + true + } else { + close() + currentReader = null + next() + } + } + + override def get(): T = currentReader.get() + + override def close(): Unit = { + if (currentReader != null) { + currentReader.close() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala new file mode 100644 index 000000000000..101a70ee92ce --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.vectorized.ColumnarBatch + +abstract class FilePartitionReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + assert(partition.isInstanceOf[FilePartition]) + val filePartition = partition.asInstanceOf[FilePartition] + val iter = filePartition.files.toIterator.map { file => + new PartitionedFileReader(file, buildReader(file)) + } + new FilePartitionReader[InternalRow](iter) + } + + override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { + assert(partition.isInstanceOf[FilePartition]) + val filePartition = partition.asInstanceOf[FilePartition] + val iter = filePartition.files.toIterator.map { file => + new PartitionedFileReader(file, buildColumnarReader(file)) + } + new FilePartitionReader[ColumnarBatch](iter) + } + + def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] + + def buildColumnarReader(partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { + throw new UnsupportedOperationException("Cannot create columnar reader.") + } +} + +// A compound class for combining file and its corresponding reader. +private[v2] class PartitionedFileReader[T]( + file: PartitionedFile, + reader: PartitionReader[T]) extends PartitionReader[T] { + override def next(): Boolean = reader.next() + + override def get(): T = reader.get() + + override def close(): Unit = reader.close() + + override def toString: String = file.toString +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala new file mode 100644 index 000000000000..3615b15be6fd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} +import org.apache.spark.sql.types.StructType + +abstract class FileScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex) extends Scan with Batch { + /** + * Returns whether a file with `path` could be split or not. + */ + def isSplitable(path: Path): Boolean = { + false + } + + protected def partitions: Seq[FilePartition] = { + val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) + val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) + val splitFiles = selectedPartitions.flatMap { partition => + partition.files.flatMap { file => + val filePath = file.getPath + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable(filePath), + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + } + FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) + } + + override def planInputPartitions(): Array[InputPartition] = { + partitions.toArray + } + + override def toBatch: Batch = this +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala new file mode 100644 index 000000000000..5dd343ba44b6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.types.StructType + +abstract class FileScanBuilder(schema: StructType) + extends ScanBuilder + with SupportsPushDownRequiredColumns + with SupportsPushDownFilters { + protected var readSchema = schema + + override def pruneColumns(requiredSchema: StructType): Unit = { + this.readSchema = requiredSchema + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala new file mode 100644 index 000000000000..b1786541a805 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.v2.{SupportsBatchRead, Table} +import org.apache.spark.sql.types.StructType + +abstract class FileTable( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + userSpecifiedSchema: Option[StructType]) extends Table with SupportsBatchRead { + def getFileIndex: PartitioningAwareFileIndex = this.fileIndex + + lazy val dataSchema: StructType = userSpecifiedSchema.orElse { + inferSchema(fileIndex.allFiles()) + }.getOrElse { + throw new AnalysisException( + s"Unable to infer schema for $name. It must be specified manually.") + }.asNullable + + override def schema(): StructType = { + val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + PartitioningUtils.mergeDataAndPartitionSchema(dataSchema, + fileIndex.partitionSchema, caseSensitive)._1 + } + + /** + * When possible, this method should return the schema of the given `files`. When the format + * does not support inference, or no valid files are given should return None. In these cases + * Spark will require that user specify the schema manually. + */ + def inferSchema(files: Seq[FileStatus]): Option[StructType] +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala new file mode 100644 index 000000000000..ff78ef3220c1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PartitionRecordReader.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.hadoop.mapreduce.RecordReader + +import org.apache.spark.sql.sources.v2.reader.PartitionReader + +class PartitionRecordReader[T]( + private[this] var rowReader: RecordReader[_, T]) extends PartitionReader[T] { + override def next(): Boolean = rowReader.nextKeyValue() + + override def get(): T = rowReader.getCurrentValue + + override def close(): Unit = rowReader.close() +} + +class PartitionRecordReaderWithProject[X, T]( + private[this] var rowReader: RecordReader[_, X], + project: X => T) extends PartitionReader[T] { + override def next(): Boolean = rowReader.nextKeyValue() + + override def get(): T = project(rowReader.getCurrentValue) + + override def close(): Unit = rowReader.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala new file mode 100644 index 000000000000..db1f2f793422 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table} +import org.apache.spark.sql.types.StructType + +class OrcDataSourceV2 extends FileDataSourceV2 { + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat] + + override def shortName(): String = "orc" + + private def getTableName(options: DataSourceOptions): String = { + shortName() + ":" + options.paths().mkString(";") + } + + override def getTable(options: DataSourceOptions): Table = { + val tableName = getTableName(options) + val fileIndex = getFileIndex(options, None) + OrcTable(tableName, sparkSession, fileIndex, None) + } + + override def getTable(options: DataSourceOptions, schema: StructType): Table = { + val tableName = getTableName(options) + val fileIndex = getFileIndex(options, Some(schema)) + OrcTable(tableName, sparkSession, fileIndex, Some(schema)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala new file mode 100644 index 000000000000..f6fc0ca83908 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, OrcFile} +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcUtils} +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader} +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + +/** + * A factory used to create Orc readers. + * + * @param sqlConf SQL configuration. + * @param broadcastedConf Broadcast serializable Hadoop Configuration. + * @param dataSchema Schema of orc files. + * @param partitionSchema Schema of partitions. + * @param readSchema Required schema in the batch scan. + */ +case class OrcPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + partitionSchema: StructType, + readSchema: StructType) extends FilePartitionReaderFactory { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val capacity = sqlConf.orcVectorizedReaderBatchSize + + override def supportColumnarReads(partition: InputPartition): Boolean = { + sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && + readSchema.length <= sqlConf.wholeStageMaxNumFields && + readSchema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { + val conf = broadcastedConf.value.value + + val filePath = new Path(new URI(file.filePath)) + + val fs = filePath.getFileSystem(conf) + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val reader = OrcFile.createReader(filePath, readerOptions) + + val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, readSchema, reader, conf) + + if (requestedColIdsOrEmptyFile.isEmpty) { + new EmptyPartitionReader[InternalRow] + } else { + val requestedColIds = requestedColIdsOrEmptyFile.get + assert(requestedColIds.length == readSchema.length, + "[BUG] requested column IDs do not match required schema") + val taskConf = new Configuration(conf) + taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, + requestedColIds.filter(_ != -1).sorted.mkString(",")) + + val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) + + val requiredDataSchema = subtractSchema(readSchema, partitionSchema) + val orcRecordReader = new OrcInputFormat[OrcStruct] + .createRecordReader(fileSplit, taskAttemptContext) + + val fullSchema = requiredDataSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + val deserializer = new OrcDeserializer(dataSchema, requiredDataSchema, requestedColIds) + + val projection = if (partitionSchema.length == 0) { + (value: OrcStruct) => unsafeProjection(deserializer.deserialize(value)) + } else { + val joinedRow = new JoinedRow() + (value: OrcStruct) => + unsafeProjection(joinedRow(deserializer.deserialize(value), file.partitionValues)) + } + new PartitionRecordReaderWithProject(orcRecordReader, projection) + } + } + + override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { + val conf = broadcastedConf.value.value + + val filePath = new Path(new URI(file.filePath)) + + val fs = filePath.getFileSystem(conf) + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val reader = OrcFile.createReader(filePath, readerOptions) + + val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, readSchema, reader, conf) + + if (requestedColIdsOrEmptyFile.isEmpty) { + new EmptyPartitionReader + } else { + val requestedColIds = requestedColIdsOrEmptyFile.get + assert(requestedColIds.length == readSchema.length, + "[BUG] requested column IDs do not match required schema") + val taskConf = new Configuration(conf) + taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, + requestedColIds.filter(_ != -1).sorted.mkString(",")) + + val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) + + val batchReader = new OrcColumnarBatchReader(capacity) + batchReader.initialize(fileSplit, taskAttemptContext) + val columnNameMap = partitionSchema.fields.map( + PartitioningUtils.getColName(_, isCaseSensitive)).zipWithIndex.toMap + val requestedPartitionColIds = readSchema.fields.map { field => + columnNameMap.getOrElse(PartitioningUtils.getColName(field, isCaseSensitive), -1) + } + + batchReader.initBatch( + reader.getSchema, + readSchema.fields, + requestedColIds, + requestedPartitionColIds, + file.partitionValues) + new PartitionRecordReader(batchReader) + } + } + + /** + * Returns a new StructType that is a copy of the original StructType, removing any items that + * also appear in other StructType. The order is preserved from the original StructType. + */ + private def subtractSchema(original: StructType, other: StructType): StructType = { + val otherNameSet = other.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet + val fields = original.fields.filterNot { field => + otherNameSet.contains(PartitioningUtils.getColName(field, isCaseSensitive)) + } + + StructType(fields) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala new file mode 100644 index 000000000000..a792ad318b39 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +case class OrcScan( + sparkSession: SparkSession, + hadoopConf: Configuration, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readSchema: StructType) extends FileScan(sparkSession, fileIndex) { + override def isSplitable(path: Path): Boolean = true + + override def createReaderFactory(): PartitionReaderFactory = { + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, fileIndex.partitionSchema, readSchema) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala new file mode 100644 index 000000000000..eb27bbd3abea --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.orc + +import scala.collection.JavaConverters._ + +import org.apache.orc.mapreduce.OrcInputFormat + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.orc.OrcFilters +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.Scan +import org.apache.spark.sql.types.StructType + +case class OrcScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: DataSourceOptions) extends FileScanBuilder(schema) { + lazy val hadoopConf = + sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + + override def build(): Scan = { + OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema) + } + + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (sparkSession.sessionState.conf.orcFilterPushDown) { + OrcFilters.createFilter(schema, filters).foreach { f => + // The pushed filters will be set in `hadoopConf`. After that, we can simply use the + // changed `hadoopConf` in executors. + OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames) + } + val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + _pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray + } + filters + } + + override def pushedFilters(): Array[Filter] = _pushedFilters +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala new file mode 100644 index 000000000000..719e757c33cb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.orc + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.orc.OrcUtils +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.types.StructType + +case class OrcTable( + name: String, + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + userSpecifiedSchema: Option[StructType]) + extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) { + override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder = + new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + OrcUtils.readSchema(sparkSession, files) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 319c2649592f..a605dc640dc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -159,6 +159,7 @@ abstract class BaseSessionStateBuilder( override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: + new FallbackOrcDataSourceV2(session) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index cf25f1ce910d..d83deb17a090 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -234,7 +235,9 @@ object QueryTest { checkToRDD: Boolean = true): Option[String] = { val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty if (checkToRDD) { - df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] + SQLExecution.withSQLConfPropagated(df.sparkSession) { + df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] + } } val sparkAnswer = try df.collect().toSeq catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index ee12f3089243..cccd8e9ee8bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -24,11 +24,15 @@ import scala.collection.JavaConverters._ import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -41,7 +45,7 @@ import org.apache.spark.sql.types._ */ class OrcFilterSuite extends OrcTest with SharedSQLContext { - private def checkFilterPredicate( + protected def checkFilterPredicate( df: DataFrame, predicate: Predicate, checker: (SearchArgument) => Unit): Unit = { @@ -50,24 +54,24 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { .select(output.map(e => Column(e)): _*) .where(Column(predicate)) - var maybeRelation: Option[HadoopFsRelation] = None - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => - maybeRelation = Some(orcRelation) - filters - }.flatten.reduceLeftOption(_ && _) - assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - - val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) - assert(selectedFilters.nonEmpty, "No filter is pushed down") - - val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters") - checker(maybeFilter.get) + query.queryExecution.optimizedPlan match { + case PhysicalOperation(_, filters, + DataSourceV2Relation(orcTable: OrcTable, _, options)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava)) + scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) + val pushedFilters = scanBuilder.pushedFilters() + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pushedFilters") + checker(maybeFilter.get) + + case _ => + throw new AnalysisException("Can not match OrcTable in the query.") + } } - private def checkFilterPredicate + protected def checkFilterPredicate (predicate: Predicate, filterOperator: PredicateLeaf.Operator) (implicit df: DataFrame): Unit = { def checkComparisonOperator(filter: SearchArgument) = { @@ -77,7 +81,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { checkFilterPredicate(df, predicate, checkComparisonOperator) } - private def checkFilterPredicate + protected def checkFilterPredicate (predicate: Predicate, stringExpr: String) (implicit df: DataFrame): Unit = { def checkLogicalOperator(filter: SearchArgument) = { @@ -86,28 +90,32 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { checkFilterPredicate(df, predicate, checkLogicalOperator) } - private def checkNoFilterPredicate - (predicate: Predicate) + protected def checkNoFilterPredicate + (predicate: Predicate, noneSupported: Boolean = false) (implicit df: DataFrame): Unit = { val output = predicate.collect { case a: Attribute => a }.distinct val query = df .select(output.map(e => Column(e)): _*) .where(Column(predicate)) - var maybeRelation: Option[HadoopFsRelation] = None - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => - maybeRelation = Some(orcRelation) - filters - }.flatten.reduceLeftOption(_ && _) - assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - - val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) - assert(selectedFilters.nonEmpty, "No filter is pushed down") - - val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) - assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters") + query.queryExecution.optimizedPlan match { + case PhysicalOperation(_, filters, + DataSourceV2Relation(orcTable: OrcTable, _, options)) => + assert(filters.nonEmpty, "No filter is analyzed from the given query") + val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava)) + scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) + val pushedFilters = scanBuilder.pushedFilters() + if (noneSupported) { + assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters") + } else { + assert(pushedFilters.nonEmpty, "No filter is pushed down") + val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters) + assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters") + } + + case _ => + throw new AnalysisException("Can not match OrcTable in the query.") + } } test("filter pushdown - integer") { @@ -346,15 +354,15 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } // ArrayType withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df => - checkNoFilterPredicate('_1.isNull) + checkNoFilterPredicate('_1.isNull, noneSupported = true) } // BinaryType withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => - checkNoFilterPredicate('_1 <=> 1.b) + checkNoFilterPredicate('_1 <=> 1.b, noneSupported = true) } // MapType withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df => - checkNoFilterPredicate('_1.isNotNull) + checkNoFilterPredicate('_1.isNotNull, noneSupported = true) } } @@ -419,3 +427,4 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } } } + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index d1911ea7f32a..4a695ac74c47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File +import org.apache.spark.SparkConf import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext // The data where the partitioning key exists only in the directory structure. @@ -227,3 +229,8 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext + +class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext { + override protected def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 918dbcdfa1cc..d0b386b88c59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -31,7 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS import org.apache.orc.mapred.OrcStruct import org.apache.orc.mapreduce.OrcInputFormat -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} @@ -574,7 +574,7 @@ abstract class OrcQueryTest extends OrcTest { val m1 = intercept[AnalysisException] { testAllCorruptFiles() }.getMessage - assert(m1.contains("Unable to infer schema for ORC")) + assert(m1.contains("Unable to infer schema")) testAllCorruptFilesWithoutSchemaInfer() } @@ -681,3 +681,8 @@ class OrcQuerySuite extends OrcQueryTest with SharedSQLContext { } } } + +class OrcV1QuerySuite extends OrcQuerySuite { + override protected def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala new file mode 100644 index 000000000000..cf5bbb3fff70 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.orc + +import scala.collection.JavaConverters._ + +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +class OrcV1FilterSuite extends OrcFilterSuite { + + override protected def sparkConf: SparkConf = + super.sparkConf.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") + + override def checkFilterPredicate( + df: DataFrame, + predicate: Predicate, + checker: (SearchArgument) => Unit): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(orcRelation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters") + checker(maybeFilter.get) + } + + override def checkFilterPredicate + (predicate: Predicate, filterOperator: PredicateLeaf.Operator) + (implicit df: DataFrame): Unit = { + def checkComparisonOperator(filter: SearchArgument) = { + val operator = filter.getLeaves.asScala + assert(operator.map(_.getOperator).contains(filterOperator)) + } + checkFilterPredicate(df, predicate, checkComparisonOperator) + } + + override def checkFilterPredicate + (predicate: Predicate, stringExpr: String) + (implicit df: DataFrame): Unit = { + def checkLogicalOperator(filter: SearchArgument) = { + assert(filter.toString == stringExpr) + } + checkFilterPredicate(df, predicate, checkLogicalOperator) + } + + override def checkNoFilterPredicate + (predicate: Predicate, noneSupported: Boolean = false) + (implicit df: DataFrame): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(orcRelation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) + assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala new file mode 100644 index 000000000000..f57c581fd800 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2 + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.ScanBuilder +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 { + + override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat] + + override def shortName(): String = "parquet" + + override def getTable(options: DataSourceOptions): Table = { + new DummyReadOnlyFileTable + } +} + +class DummyReadOnlyFileTable extends Table with SupportsBatchRead { + override def name(): String = "dummy" + + override def schema(): StructType = StructType(Nil) + + override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + throw new AnalysisException("Dummy file reader") + } +} + +class FileDataSourceV2FallBackSuite extends QueryTest with ParquetTest with SharedSQLContext { + + private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName + + test("Fall back to v1 when writing to file with read only FileDataSourceV2") { + val df = spark.range(10).toDF() + withTempPath { file => + val path = file.getCanonicalPath + // Writing file should fall back to v1 and succeed. + df.write.format(dummyParquetReaderV2).save(path) + + // Validate write result with [[ParquetFileFormat]]. + checkAnswer(spark.read.parquet(path), df) + + // Dummy File reader should fail as expected. + val exception = intercept[AnalysisException] { + spark.read.format(dummyParquetReaderV2).load(path).collect() + } + assert(exception.message.equals("Dummy file reader")) + } + } + + test("Fall back read path to v1 with configuration USE_V1_SOURCE_READER_LIST") { + val df = spark.range(10).toDF() + withTempPath { file => + val path = file.getCanonicalPath + df.write.parquet(path) + Seq( + "foo,parquet,bar", + "ParQuet,bar,foo", + s"foobar,$dummyParquetReaderV2" + ).foreach { fallbackReaders => + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> fallbackReaders) { + // Reading file should fall back to v1 and succeed. + checkAnswer(spark.read.format(dummyParquetReaderV2).load(path), df) + checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), df) + } + } + + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "foo,bar") { + // Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it. + val exception = intercept[AnalysisException] { + spark.read.format(dummyParquetReaderV2).load(path).collect() + } + assert(exception.message.equals("Dummy file reader")) + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 4f3914740ec2..132b0e4db0d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -71,6 +71,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: + new FallbackOrcDataSourceV2(session) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d93215fefb81..3402ed240f8b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -908,7 +908,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } assert(e.getMessage.contains( "The format of the existing table default.appendOrcToParquet is `ParquetFileFormat`. " + - "It doesn't match the specified format `OrcFileFormat`")) + "It doesn't match the specified format")) } withTable("appendParquetToJson") {