Skip to content

Commit adbfabe

Browse files
gengliangwangJackey Lee
authored andcommitted
[SPARK-26871][SQL] File Source V2: avoid creating unnecessary FileIndex in the write path
## What changes were proposed in this pull request? In apache#23383, the file source V2 framework is implemented. In the PR, `FileIndex` is created as a member of `FileTable`, so that we can implement partition pruning like apache@0f9fcab in the future(As data source V2 catalog is under development, partition pruning is removed from the PR) However, after write path of file source V2 is implemented, I find that a simple write will create an unnecessary `FileIndex`, which is required by `FileTable`. This is a sort of regression. And we can see there is a warning message when writing to ORC files ``` WARN InMemoryFileIndex: The directory file:/tmp/foo was not found. Was it deleted very recently? ``` This PR is to make `FileIndex` as a lazy value in `FileTable`, so that we can avoid creating unnecessary `FileIndex` in the write path. ## How was this patch tested? Existing unit test Closes apache#23774 from gengliangwang/moveFileIndexInV2. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 04feb6b commit adbfabe

File tree

5 files changed

+21
-29
lines changed

5 files changed

+21
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPl
3535
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3636
case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
3737
val v1FileFormat = new OrcFileFormat
38-
val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema,
38+
val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema,
3939
table.schema(), None, v1FileFormat, d.options)(sparkSession)
4040
i.copy(table = LogicalRelation(relation))
4141
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2
1818

19-
import scala.collection.JavaConverters._
20-
21-
import org.apache.spark.sql.{AnalysisException, SparkSession}
19+
import org.apache.spark.sql.SparkSession
2220
import org.apache.spark.sql.execution.datasources._
2321
import org.apache.spark.sql.sources.DataSourceRegister
24-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider}
25-
import org.apache.spark.sql.types.StructType
22+
import org.apache.spark.sql.sources.v2.TableProvider
2623

2724
/**
2825
* A base interface for data source v2 implementations of the built-in file-based data sources.
@@ -38,17 +35,4 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
3835
def fallBackFileFormat: Class[_ <: FileFormat]
3936

4037
lazy val sparkSession = SparkSession.active
41-
42-
def getFileIndex(
43-
options: DataSourceOptions,
44-
userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
45-
val filePaths = options.paths()
46-
val hadoopConf =
47-
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
48-
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
49-
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
50-
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
51-
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
52-
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
53-
}
5438
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,30 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2
1818

19+
import scala.collection.JavaConverters._
20+
1921
import org.apache.hadoop.fs.FileStatus
2022

2123
import org.apache.spark.sql.{AnalysisException, SparkSession}
2224
import org.apache.spark.sql.execution.datasources._
23-
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
25+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
2426
import org.apache.spark.sql.types.StructType
2527

2628
abstract class FileTable(
2729
sparkSession: SparkSession,
28-
fileIndex: PartitioningAwareFileIndex,
30+
options: DataSourceOptions,
2931
userSpecifiedSchema: Option[StructType])
3032
extends Table with SupportsBatchRead with SupportsBatchWrite {
31-
def getFileIndex: PartitioningAwareFileIndex = this.fileIndex
33+
lazy val fileIndex: PartitioningAwareFileIndex = {
34+
val filePaths = options.paths()
35+
val hadoopConf =
36+
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
37+
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
38+
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
39+
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
40+
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
41+
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
42+
}
3243

3344
lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
3445
inferSchema(fileIndex.allFiles())

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,11 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
3434

3535
override def getTable(options: DataSourceOptions): Table = {
3636
val tableName = getTableName(options)
37-
val fileIndex = getFileIndex(options, None)
38-
OrcTable(tableName, sparkSession, fileIndex, None)
37+
OrcTable(tableName, sparkSession, options, None)
3938
}
4039

4140
override def getTable(options: DataSourceOptions, schema: StructType): Table = {
4241
val tableName = getTableName(options)
43-
val fileIndex = getFileIndex(options, Some(schema))
44-
OrcTable(tableName, sparkSession, fileIndex, Some(schema))
42+
OrcTable(tableName, sparkSession, options, Some(schema))
4543
}
4644
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2.orc
1919
import org.apache.hadoop.fs.FileStatus
2020

2121
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2322
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
2423
import org.apache.spark.sql.execution.datasources.v2.FileTable
2524
import org.apache.spark.sql.sources.v2.DataSourceOptions
@@ -29,9 +28,9 @@ import org.apache.spark.sql.types.StructType
2928
case class OrcTable(
3029
name: String,
3130
sparkSession: SparkSession,
32-
fileIndex: PartitioningAwareFileIndex,
31+
options: DataSourceOptions,
3332
userSpecifiedSchema: Option[StructType])
34-
extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) {
33+
extends FileTable(sparkSession, options, userSpecifiedSchema) {
3534
override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
3635
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
3736

0 commit comments

Comments
 (0)