-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18726][SQL]resolveRelation for FileFormat DataSource don't need to listFiles twice #17081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18726][SQL]resolveRelation for FileFormat DataSource don't need to listFiles twice #17081
Conversation
…ed to listFiles twice
|
Test build #73500 has finished for PR 17081 at commit
|
|
Test build #73539 has started for PR 17081 at commit |
|
retest this please |
|
retest this please |
|
Test build #73715 has finished for PR 17081 at commit
|
|
Test build #73716 has finished for PR 17081 at commit
|
|
Test build #73724 has finished for PR 17081 at commit
|
|
cc @cloud-fan @gatorsmile @ericl could you help to review this?thanks :) |
| } else { | ||
| new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) | ||
| new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), | ||
| fileStatusCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: indent issue
|
Test build #73726 has finished for PR 17081 at commit
|
|
Test build #73733 has started for PR 17081 at commit |
| globbedPaths, | ||
| options, | ||
| Some(partitionSchema), | ||
| fileStatusCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new InMemoryFileIndex(
sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)This is also valid
| SparkHadoopUtil.get.globPathIfNecessary(qualified) | ||
| }.toArray | ||
| new InMemoryFileIndex(sparkSession, globbedPaths, options, None) | ||
| new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also impacts the streaming code path. If it is fine to streaming, the code changes look good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have make it local only in the no streaming FileFormat match case~
| lazy val sourceInfo: SourceInfo = sourceSchema() | ||
| private val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
|
|
||
| private lazy val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the lifetime of this cache?
| catalogTable.get, | ||
| catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) | ||
| } else { | ||
| new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to create file status cache as a local variable, pass it to getOrInferFileFormatSchema, then use it here. It's much easier to reason about the lifetime of this cache by this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think it is more reasonable~ thanks~
|
Test build #73758 has finished for PR 17081 at commit
|
|
Test build #73759 has finished for PR 17081 at commit
|
|
Why you closed it? |
|
oh...sorry , I don't know when I close it... |
| private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { | ||
| private def getOrInferFileFormatSchema( | ||
| format: FileFormat, | ||
| fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the function description with a new @parm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks~
|
Please remove |
|
Test build #73791 has finished for PR 17081 at commit
|
| * be any further inference in any triggers. | ||
| * | ||
| * @param format the file format object for this DataSource | ||
| * @param fileStatusCache fileStatusCache for InMemoryFileIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param fileStatusCache the shared cache for file statuses to speed up listing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok thanks!
|
Test build #73793 has finished for PR 17081 at commit
|
|
Test build #73798 has finished for PR 17081 at commit
|
|
LGTM |
|
thanks, merging to master! |
What changes were proposed in this pull request?
Currently when we resolveRelation for a
FileFormat DataSourcewithout providing user schema, it will executelistFilestwice inInMemoryFileIndexduringresolveRelation.This PR add a
FileStatusCachefor DataSource, this can avoid listFiles twice.But there is a bug in
InMemoryFileIndexsee:SPARK-19748
SPARK-19761,
so this pr should be after SPARK-19748/ SPARK-19761.
How was this patch tested?
unit test added