-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19603][SS]Fix StreamingQuery explain command #16934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #72909 has finished for PR 16934 at commit
|
| "No physical plan. Waiting for data." | ||
| } else { | ||
| val explain = ExplainCommand(lastExecution.logical, extended = extended) | ||
| val explain = ExplainCommand(lastExecution.logical, extended = extended, streaming = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this means that this code will always return an updated plan for the last batch showing which data files were read instead of just referring to it as a StreamingRelation. We wouldn't have the bug if we had just used logicalPlan instead of lastExecution.logicalPlan, right? Then the problem would be that the logicalPlan may contain errors though?
|
This solution is okay. I'm just not sure why we use |
|
Good question. Made me realize my fix was wrong. We cannot use |
| .executeCollect() | ||
| .map(_.getString(0)) | ||
| .mkString("\n") | ||
| assert(explainString.contains("StateStoreRestore")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also check that this doesn't have a LocalTableScan but has a StreamingRelation
| queryExecution: IncrementalExecution, | ||
| extended: Boolean) extends RunnableCommand { | ||
|
|
||
| override val output: Seq[Attribute] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this required? Just asking because the one above doesn't have it.
|
@zsxwing Left one comment for tests and a question. |
|
LGTM pending tests |
|
Test build #72957 has finished for PR 16934 at commit
|
|
Test build #72962 has finished for PR 16934 at commit
|
|
Thanks! Merging to master and 2.1. |
## What changes were proposed in this pull request?
`StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.
This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.
Examples of the explain outputs:
- streaming DataFrame.explain()
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#513.toString, obj#516: java.lang.String
+- StreamingRelation MemoryStream[value#513], [value#513]
```
- StreamingQuery.explain(extended = false)
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
- StreamingQuery.explain(extended = true)
```
== Parsed Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Optimized Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
## How was this patch tested?
The updated unit test.
Author: Shixiong Zhu <[email protected]>
Closes #16934 from zsxwing/SPARK-19603.
(cherry picked from commit fc02ef9)
Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request?
`StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.
This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.
Examples of the explain outputs:
- streaming DataFrame.explain()
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#513.toString, obj#516: java.lang.String
+- StreamingRelation MemoryStream[value#513], [value#513]
```
- StreamingQuery.explain(extended = false)
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
- StreamingQuery.explain(extended = true)
```
== Parsed Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Optimized Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
## How was this patch tested?
The updated unit test.
Author: Shixiong Zhu <[email protected]>
Closes apache#16934 from zsxwing/SPARK-19603.
What changes were proposed in this pull request?
StreamingQuery.explaindoesn't show the correct streaming physical plan right now becauseExplainCommandreceives a runtime batch plan and itslogicalPlan.isStreamingis always false.This PR adds
streamingparameter toExplainCommandto allowStreamExecutionto specify that it's a streaming plan.Examples of the explain outputs:
How was this patch tested?
The updated unit test.