Skip to content

Commit 2d8838d

Browse files
rxingatorsmile
authored andcommitted
[SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
## What changes were proposed in this pull request? I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out. ## How was this patch tested? This is a simple code move and should be covered by existing tests. Closes #23317 from rxin/SPARK-26368. Authored-by: Reynold Xin <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 93139af commit 2d8838d

File tree

1 file changed

+13
-12
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

1 file changed

+13
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,14 @@ case class DataSource(
122122
* be any further inference in any triggers.
123123
*
124124
* @param format the file format object for this DataSource
125-
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
125+
* @param getFileIndex [[InMemoryFileIndex]] for getting partition schema and file list
126126
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
127127
* columns.
128128
*/
129129
private def getOrInferFileFormatSchema(
130130
format: FileFormat,
131-
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
132-
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
133-
// in streaming mode, we have already inferred and registered partition columns, we will
134-
// never have to materialize the lazy val below
135-
lazy val tempFileIndex = fileIndex.getOrElse {
136-
val globbedPaths =
137-
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
138-
createInMemoryFileIndex(globbedPaths)
139-
}
131+
getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = {
132+
lazy val tempFileIndex = getFileIndex()
140133

141134
val partitionSchema = if (partitionColumns.isEmpty) {
142135
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
@@ -236,7 +229,15 @@ case class DataSource(
236229
"you may be able to create a static DataFrame on that directory with " +
237230
"'spark.read.load(directory)' and infer schema from it.")
238231
}
239-
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
232+
233+
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => {
234+
// The operations below are expensive therefore try not to do them if we don't need to,
235+
// e.g., in streaming mode, we have already inferred and registered partition columns,
236+
// we will never have to materialize the lazy val below
237+
val globbedPaths =
238+
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
239+
createInMemoryFileIndex(globbedPaths)
240+
})
240241
SourceInfo(
241242
s"FileSource[$path]",
242243
StructType(dataSchema ++ partitionSchema),
@@ -370,7 +371,7 @@ case class DataSource(
370371
} else {
371372
val index = createInMemoryFileIndex(globbedPaths)
372373
val (resultDataSchema, resultPartitionSchema) =
373-
getOrInferFileFormatSchema(format, Some(index))
374+
getOrInferFileFormatSchema(format, () => index)
374375
(index, resultDataSchema, resultPartitionSchema)
375376
}
376377

0 commit comments

Comments
 (0)