-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53063][CORE] Implement and call new APIs in FileCommitProtocol instead of the deprecated #51772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala
Show resolved
Hide resolved
|
|
||
| override def newTaskTempFileAbsPath( | ||
| taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { | ||
| taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. A wrong indentation.
| taskAttemptContext, | ||
| None, | ||
| f"-c$fileCounter%03d" + ext) | ||
| FileNameSpec("", f"-c$fileCounter%03d" + ext)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a new independent change, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling the newer newTaskTempFile with spec is the goal of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR title is not~
Remove inner references of deprecated APIs in FileCommitProtocol
If this is this PR goal, please remove the throwing Exceptions in the deprecated APIs.
Calling the newer newTaskTempFile with spec is the goal of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, I got your point. I've changed the title to Implement and call new APIs in FileCommitProtocol instead of deprecated. Does the change here look reasonable to you with this positive tone? If still not, I will separate the overrides and callers into 2 PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I trust your decision. Initially, I want to remove the default implementation, throw SparkException, from this PR. But, this could be a migration step too. So, I agree with your decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dongjoon-hyun
| @deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0") | ||
| def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String | ||
| def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { | ||
| throw SparkException.mustOverrideOneMethodError("newTaskTempFile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, I don't think this is required to achieve your goal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it is not clear why these changes are being made.
I got confused with the intent as well - though now I realize what is being attempted.
| def newTaskTempFileAbsPath( | ||
| taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String | ||
| taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { | ||
| throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
Outdated
Show resolved
Hide resolved
|
Merged to master, thank you @dongjoon-hyun again. |
| new SparkRuntimeException( | ||
| "INTERNAL_ERROR", | ||
| Map("message" -> msg)) | ||
| SparkException.mustOverrideOneMethodError(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be SparkException.mustOverrideOneMethodError(methodName) ? Or better still remove mustOverrideOneMethodError and delegate to SparkException.mustOverrideOneMethodError insead ?
|
It is unclear what the benefits of this PR are. |
|
Hi @mridulm, If we implement a new Custom-FileCommitProtocol, we don't need to override the deprecated newTaskTempFile anymore. This makes the future removal of these APIs safer. |
|
There are unfortunately nontrivial number of user/library implementations which override from these public classes (a github search shows a lot - even if we exclude forlks, etc there appears to be significant count), and depend on the way they were currently written (for better or for worse). |
|
As long as the base interface can satisfy the current built-in implementations, there's no difference for third-party implementations |
What changes were proposed in this pull request?
This PR implements and calls new APIs in FileCommitProtocol instead of the deprecated
Why are the changes needed?
FileCommitProtocol and related classes are complicated as they play a lot of tricks for tasks like file naming, config setting/propagation, e.t.c. Removing these references can improve the call stack a bit. And also, we can make these deprecated ones ignorable。
Does this PR introduce any user-facing change?
No, nothing changes for existing implementations or end-users
How was this patch tested?
Pass existing CIs
Was this patch authored or co-authored using generative AI tooling?
no