-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20213][SQL] Fix DataFrameWriter operations in SQL UI tab #18064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Agreed, sorry I haven't updated it. I was out most of last week. I'll get this fixed up as soon as I can. Thanks for all your help! |
|
Test build #77198 has finished for PR 18064 at commit
|
|
Test build #77200 has finished for PR 18064 at commit
|
|
Test build #77215 has finished for PR 18064 at commit
|
|
Test build #77219 has finished for PR 18064 at commit
|
|
Test build #77220 has finished for PR 18064 at commit
|
|
This seems also related to my PR #17998. |
| val namedExpressions = expressions.map(e => Alias(e, e.toString)()) | ||
| val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head() | ||
| val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) | ||
| .queryExecution.executedPlan.executeTake(1).head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation)).executedPlan.executeTake(1).head
|
@viirya I'm not very clear about the purpose of the "write operator", does my PR cover it? |
|
Test build #77240 has finished for PR 18064 at commit
|
|
@cloud-fan The purpose of the operator is to record metrics of writing data. I said it's related because this PR is also going to show the writing op in UI. But looks like this change doesn't do anything about the metrics. There are many classes are both covered by this PR and #17998, e.g. the commands and how to trigger them. I'd rebase my PR once this is merged. |
|
@cloud-fan Actually, if we decide to show individual writing commands in UI like this, we may not need "write operator". To have a writing operator on top on those commands might looks confusing. Then we can directly attach the metrics to the commands. @rxin What do you think? |
|
That works too, if we can attach metrics to these commands. |
|
Test build #77258 has finished for PR 18064 at commit
|
|
Test build #77276 has finished for PR 18064 at commit
|
|
@rxin OK. I will rebase my PR after this gets merged. |
|
cc @zsxwing for review |
| topic: Option[String] = None): Unit = { | ||
| val schema = queryExecution.analyzed.output | ||
| validateQuery(queryExecution, kafkaParameters, topic) | ||
| SQLExecution.withNewExecutionId(sparkSession, queryExecution) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After removing this, can KafkaSink still track its streaming writing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, because it's used in KafkaProvider which is run with SaveIntoDataSourceCommand, and all commands will be wrapped by SQLExecution.withNewExecutionId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you mean KafkaSourceProvider, is it the same code path as KafkaSink? In KafkaSink.addBath, KafkaWriter.write is also called to write into Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaSink.addBath is also wrapped with SQLExecution.withNewExecutionId in StreamExecution: https://github.com/apache/spark/pull/18064/files#diff-6532dd3b63bdab0364fbcf2303e290e4R658
| } | ||
|
|
||
| /** | ||
| * Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment needs to change: given [[LogicalPlan]].
| */ | ||
| private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = { | ||
| private def planForWritingFileFormat( | ||
| format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment: given [[LogicalPlan]].
|
|
||
| /** | ||
| * Writes the given [[DataFrame]] out to this [[DataSource]]. | ||
| * Returns a logical plan to write the given [[DataFrame]] out to this [[DataSource]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the give [[LogicalPlan]].
|
Test build #77539 has finished for PR 18064 at commit
|
|
Test build #77540 has finished for PR 18064 at commit
|
|
Test build #77550 has finished for PR 18064 at commit
|
|
Test build #77562 has finished for PR 18064 at commit
|
|
thanks for the review, merging to master! |
|
I just case across this pr. I have one general feedback. It will be great if we can make a pr have a single purpose. This pr contains different kinds of changes in order to fix the UI. If refactoring is needed, I'd recommend to have separate PR for refactoring purposes. Then, use a different PR to do the actual fix. |
|
@yhuai thanks for the remind! I'll send separated PR for refactor in the future :) |
|
Refactor? I thought that was the problem with the original PR; that PR was too narrow and didn't unify the physical plans to get all metrics working. |
|
My suggestion was about getting changes on the interfaces of ExecutedCommandExec and SaveIntoDataSourceCommand to separate prs. It will help code review (both speed and quality). |
… INSERT AS SELECT [WIP] ### What changes were proposed in this pull request? The input query schema of INSERT AS SELECT could be changed after optimization. For example, the following query's output schema is changed by the rule `SimplifyCasts` and `RemoveRedundantAliases`. ```SQL SELECT word, length, cast(first as string) as first FROM view1 ``` This PR is to fix the issue in Spark 2.2. Instead of using the analyzed plan of the input query, this PR use its executed plan to determine the attributes in `FileFormatWriter`. The related issue in the master branch has been fixed by #18064. After this PR is merged, I will submit a separate PR to merge the test case to the master. ### How was this patch tested? Added a test case Author: Xiao Li <[email protected]> Author: gatorsmile <[email protected]> Closes #18386 from gatorsmile/newRC5.
…utionId ## What changes were proposed in this pull request? in apache#18064, to work around the nested sql execution id issue, we introduced several internal methods in `Dataset`, like `collectInternal`, `countInternal`, `showInternal`, etc., to avoid nested execution id. However, this approach has poor expansibility. When we hit other nested execution id cases, we may need to add more internal methods in `Dataset`. Our goal is to ignore the nested execution id in some cases, and we can have a better approach to achieve this goal, by introducing `SQLExecution.ignoreNestedExecutionId`. Whenever we find a place which needs to ignore the nested execution, we can just wrap the action with `SQLExecution.ignoreNestedExecutionId`, and this is more expansible than the previous approach. The idea comes from https://github.com/apache/spark/pull/17540/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R57 by rdblue ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#18419 from cloud-fan/follow.
## What changes were proposed in this pull request? This is kind of another follow-up for apache#18064 . In apache#18064 , we wrap every SQL command with SQL execution, which makes nested SQL execution very likely to happen. apache#18419 trid to improve it a little bit, by introduing `SQLExecition.ignoreNestedExecutionId`. However, this is not friendly to data source developers, they may need to update their code to use this `ignoreNestedExecutionId` API. This PR proposes a new solution, to just allow nested execution. The downside is that, we may have multiple executions for one query. We can improve this by updating the data organization in SQLListener, to have 1-n mapping from query to execution, instead of 1-1 mapping. This can be done in a follow-up. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#18450 from cloud-fan/execution-id.
…utionId ## What changes were proposed in this pull request? in apache#18064, to work around the nested sql execution id issue, we introduced several internal methods in `Dataset`, like `collectInternal`, `countInternal`, `showInternal`, etc., to avoid nested execution id. However, this approach has poor expansibility. When we hit other nested execution id cases, we may need to add more internal methods in `Dataset`. Our goal is to ignore the nested execution id in some cases, and we can have a better approach to achieve this goal, by introducing `SQLExecution.ignoreNestedExecutionId`. Whenever we find a place which needs to ignore the nested execution, we can just wrap the action with `SQLExecution.ignoreNestedExecutionId`, and this is more expansible than the previous approach. The idea comes from https://github.com/apache/spark/pull/17540/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R57 by rdblue ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#18419 from cloud-fan/follow.
## What changes were proposed in this pull request? This is kind of another follow-up for apache#18064 . In apache#18064 , we wrap every SQL command with SQL execution, which makes nested SQL execution very likely to happen. apache#18419 trid to improve it a little bit, by introduing `SQLExecition.ignoreNestedExecutionId`. However, this is not friendly to data source developers, they may need to update their code to use this `ignoreNestedExecutionId` API. This PR proposes a new solution, to just allow nested execution. The downside is that, we may have multiple executions for one query. We can improve this by updating the data organization in SQLListener, to have 1-n mapping from query to execution, instead of 1-1 mapping. This can be done in a follow-up. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes apache#18450 from cloud-fan/execution-id.
…hema ## What changes were proposed in this pull request? In apache#18064, we allowed `RunnableCommand` to have children in order to fix some UI issues. Then we made `InsertIntoXXX` commands take the input `query` as a child, when we do the actual writing, we just pass the physical plan to the writer(`FileFormatWriter.write`). However this is problematic. In Spark SQL, optimizer and planner are allowed to change the schema names a little bit. e.g. `ColumnPruning` rule will remove no-op `Project`s, like `Project("A", Scan("a"))`, and thus change the output schema from "<A: int>" to `<a: int>`. When it comes to writing, especially for self-description data format like parquet, we may write the wrong schema to the file and cause null values at the read path. Fortunately, in apache#18450 , we decided to allow nested execution and one query can map to multiple executions in the UI. This releases the major restriction in apache#18604 , and now we don't have to take the input `query` as child of `InsertIntoXXX` commands. So the fix is simple, this PR partially revert apache#18064 and make `InsertIntoXXX` commands leaf nodes again. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes apache#19474 from cloud-fan/bug.
## What changes were proposed in this pull request? This is a minor folllowup of #19474 . #19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method. This PR fixes this by making `Command` extend `LogicalPlan` ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #19493 from cloud-fan/minor.
Currently the `DataFrameWriter` operations have several problems: 1. non-file-format data source writing action doesn't show up in the SQL tab in Spark UI 2. file-format data source writing action shows a scan node in the SQL tab, without saying anything about writing. (streaming also have this issue, but not fixed in this PR) 3. Spark SQL CLI actions don't show up in the SQL tab. This PR fixes all of them, by refactoring the `ExecuteCommandExec` to make it have children. close apache#17540 existing tests. Also test the UI manually. For a simple command: `Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe")` before this PR: <img width="266" alt="qq20170523-035840 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326050/24e18ba2-3f6c-11e7-8817-6dd275bf6ac5.png"> after this PR: <img width="287" alt="qq20170523-035708 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326054/2ad7f460-3f6c-11e7-8053-d68325beb28f.png"> Author: Wenchen Fan <[email protected]> Closes apache#18064 from cloud-fan/execution. This also includes the following commits: 0795c16 introduce SQLExecution.ignoreNestedExecutionId cd6e3f0 address comments
What changes were proposed in this pull request?
Currently the
DataFrameWriteroperations have several problems:This PR fixes all of them, by refactoring the
ExecuteCommandExecto make it have children.close #17540
How was this patch tested?
existing tests.
Also test the UI manually. For a simple command:
Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe")before this PR:


after this PR: