Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPl
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
val v1FileFormat = new OrcFileFormat
val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema,
val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema,
table.schema(), None, v1FileFormat, d.options)(sparkSession)
i.copy(table = LogicalRelation(relation))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.sources.v2.TableProvider

/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
Expand All @@ -38,17 +35,4 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
def fallBackFileFormat: Class[_ <: FileFormat]

lazy val sparkSession = SparkSession.active

def getFileIndex(
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
val filePaths = options.paths()
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,30 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.types.StructType

abstract class FileTable(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType])
extends Table with SupportsBatchRead with SupportsBatchWrite {
def getFileIndex: PartitioningAwareFileIndex = this.fileIndex
lazy val fileIndex: PartitioningAwareFileIndex = {
val filePaths = options.paths()
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
}

lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
inferSchema(fileIndex.allFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ class OrcDataSourceV2 extends FileDataSourceV2 {

override def getTable(options: DataSourceOptions): Table = {
val tableName = getTableName(options)
val fileIndex = getFileIndex(options, None)
OrcTable(tableName, sparkSession, fileIndex, None)
OrcTable(tableName, sparkSession, options, None)
}

override def getTable(options: DataSourceOptions, schema: StructType): Table = {
val tableName = getTableName(options)
val fileIndex = getFileIndex(options, Some(schema))
OrcTable(tableName, sparkSession, fileIndex, Some(schema))
OrcTable(tableName, sparkSession, options, Some(schema))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2.orc
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.sources.v2.DataSourceOptions
Expand All @@ -29,9 +28,9 @@ import org.apache.spark.sql.types.StructType
case class OrcTable(
name: String,
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType])
extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) {
extends FileTable(sparkSession, options, userSpecifiedSchema) {
override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)

Expand Down