-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods #16962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good as far as I can tell. Thanks!
|
|
||
| dataSource.write(mode, df) | ||
| runCommand(df.sparkSession, "save") { | ||
| SaveIntoDataSourceCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this also cover SPARK-19557? If so, might as well mention that in the PR (or close the bug as a duplicate or related or something).
| val qe = session.sessionState.executePlan(command) | ||
| try { | ||
| qe.executedPlan.foreach { plan => | ||
| plan.resetMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realized that, in this code path, we pass in a logical plan and will always get a new physical plan, so we don't need to reset metrics here, let me remove it
|
Test build #73009 has finished for PR 16962 at commit
|
|
Test build #73012 has finished for PR 16962 at commit
|
| className = provider, | ||
| partitionColumns = partitionColumns, | ||
| options = options).write(mode, Dataset.ofRows(sparkSession, query)) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to Invalidate the cache to be consistent with InsertIntoDataSourceCommand?
sparkSession.sharedState.cacheManager.invalidateCache(query)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have the LogicalRelation to be used as cache key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: )
| format("csv").save(path) | ||
| } | ||
|
|
||
| private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a function description like?
/**
* Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/| val commands = ArrayBuffer.empty[(String, LogicalPlan)] | ||
| val exceptions = ArrayBuffer.empty[(String, Exception)] | ||
| val listener = new QueryExecutionListener { | ||
| // Only test successful case here, so no need to implement `onFailure` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid comment?
|
LGTM except three minor comments. |
|
LGTM |
|
Test build #73023 has finished for PR 16962 at commit
|
|
thanks for the review, merging to master! |
| format("csv").save(path) | ||
| } | ||
|
|
||
| private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use SQLExecution instead of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is that, in some commands, like InsertIntoHiveTable, we already use SQLExecution.newExecution, so we can't use it again to wrap these commands.
In the future we should figure out a central place to put SQLExecution.newExecution
…methods for DataFrameWriter methods We only notify `QueryExecutionListener` for several `Dataset` operations, e.g. collect, take, etc. We should also do the notification for `DataFrameWriter` operations. new regression test close apache#16664 Author: Wenchen Fan <[email protected]> Closes apache#16962 from cloud-fan/insert.
What changes were proposed in this pull request?
We only notify
QueryExecutionListenerfor severalDatasetoperations, e.g. collect, take, etc. We should also do the notification forDataFrameWriteroperations.How was this patch tested?
new regression test
close #16664