diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala index 254c09001f7e..e22d6a6d399a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala @@ -35,7 +35,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPl override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) => val v1FileFormat = new OrcFileFormat - val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema, + val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema, table.schema(), None, v1FileFormat, d.options)(sparkSession) i.copy(table = LogicalRelation(relation)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index a0c932cbb0e0..06c57066aa24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -16,13 +16,10 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.sources.v2.TableProvider /** * A base interface for data source v2 implementations of the built-in file-based data sources. @@ -38,17 +35,4 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { def fallBackFileFormat: Class[_ <: FileFormat] lazy val sparkSession = SparkSession.active - - def getFileIndex( - options: DataSourceOptions, - userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = { - val filePaths = options.paths() - val hadoopConf = - sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) - val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf, - checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist()) - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new InMemoryFileIndex(sparkSession, rootPathsSpecified, - options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 0dbef145f732..21d3e5e29cfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -16,19 +16,30 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table} import org.apache.spark.sql.types.StructType abstract class FileTable( sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, + options: DataSourceOptions, userSpecifiedSchema: Option[StructType]) extends Table with SupportsBatchRead with SupportsBatchWrite { - def getFileIndex: PartitioningAwareFileIndex = this.fileIndex + lazy val fileIndex: PartitioningAwareFileIndex = { + val filePaths = options.paths() + val hadoopConf = + sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf, + checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist()) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex(sparkSession, rootPathsSpecified, + options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache) + } lazy val dataSchema: StructType = userSpecifiedSchema.orElse { inferSchema(fileIndex.allFiles()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index db1f2f793422..74739b4fe2d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -34,13 +34,11 @@ class OrcDataSourceV2 extends FileDataSourceV2 { override def getTable(options: DataSourceOptions): Table = { val tableName = getTableName(options) - val fileIndex = getFileIndex(options, None) - OrcTable(tableName, sparkSession, fileIndex, None) + OrcTable(tableName, sparkSession, options, None) } override def getTable(options: DataSourceOptions, schema: StructType): Table = { val tableName = getTableName(options) - val fileIndex = getFileIndex(options, Some(schema)) - OrcTable(tableName, sparkSession, fileIndex, Some(schema)) + OrcTable(tableName, sparkSession, options, Some(schema)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index b467e505f1ba..249df8b8622f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2.orc import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.sources.v2.DataSourceOptions @@ -29,9 +28,9 @@ import org.apache.spark.sql.types.StructType case class OrcTable( name: String, sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, + options: DataSourceOptions, userSpecifiedSchema: Option[StructType]) - extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) { + extends FileTable(sparkSession, options, userSpecifiedSchema) { override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder = new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)