Skip to content

Commit ffaba07

Browse files
zsxwingcmonkey
authored andcommitted
[SPARK-19603][SS] Fix StreamingQuery explain command
## What changes were proposed in this pull request? `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false. This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan. Examples of the explain outputs: - streaming DataFrame.explain() ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#513.toString, obj#516: java.lang.String +- StreamingRelation MemoryStream[value#513], [value#513] ``` - StreamingQuery.explain(extended = false) ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` - StreamingQuery.explain(extended = true) ``` == Parsed Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Analyzed Logical Plan == value: string, count(1): bigint Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Optimized Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalRelation [value#543] == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` ## How was this patch tested? The updated unit test. Author: Shixiong Zhu <[email protected]> Closes apache#16934 from zsxwing/SPARK-19603.
1 parent 8877cf1 commit ffaba07

File tree

3 files changed

+52
-11
lines changed

3 files changed

+52
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,18 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
8686
* }}}
8787
*
8888
* @param logicalPlan plan to explain
89-
* @param output output schema
9089
* @param extended whether to do extended explain or not
9190
* @param codegen whether to output generated code from whole-stage codegen or not
9291
*/
9392
case class ExplainCommand(
9493
logicalPlan: LogicalPlan,
95-
override val output: Seq[Attribute] =
96-
Seq(AttributeReference("plan", StringType, nullable = true)()),
9794
extended: Boolean = false,
9895
codegen: Boolean = false)
9996
extends RunnableCommand {
10097

98+
override val output: Seq[Attribute] =
99+
Seq(AttributeReference("plan", StringType, nullable = true)())
100+
101101
// Run through the optimizer to generate the physical plan.
102102
override def run(sparkSession: SparkSession): Seq[Row] = try {
103103
val queryExecution =
@@ -121,3 +121,25 @@ case class ExplainCommand(
121121
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
122122
}
123123
}
124+
125+
/** An explain command for users to see how a streaming batch is executed. */
126+
case class StreamingExplainCommand(
127+
queryExecution: IncrementalExecution,
128+
extended: Boolean) extends RunnableCommand {
129+
130+
override val output: Seq[Attribute] =
131+
Seq(AttributeReference("plan", StringType, nullable = true)())
132+
133+
// Run through the optimizer to generate the physical plan.
134+
override def run(sparkSession: SparkSession): Seq[Row] = try {
135+
val outputString =
136+
if (extended) {
137+
queryExecution.toString
138+
} else {
139+
queryExecution.simpleString
140+
}
141+
Seq(Row(outputString))
142+
} catch { case cause: TreeNodeException[_] =>
143+
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
144+
}
145+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import java.io.IOException
2120
import java.util.UUID
2221
import java.util.concurrent.{CountDownLatch, TimeUnit}
2322
import java.util.concurrent.locks.ReentrantLock
@@ -33,7 +32,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
3332
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
3433
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3534
import org.apache.spark.sql.execution.QueryExecution
36-
import org.apache.spark.sql.execution.command.ExplainCommand
35+
import org.apache.spark.sql.execution.command.StreamingExplainCommand
3736
import org.apache.spark.sql.streaming._
3837
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
3938

@@ -162,7 +161,7 @@ class StreamExecution(
162161
private var state: State = INITIALIZING
163162

164163
@volatile
165-
var lastExecution: QueryExecution = _
164+
var lastExecution: IncrementalExecution = _
166165

167166
/** Holds the most recent input data for each source. */
168167
protected var newData: Map[Source, DataFrame] = _
@@ -673,7 +672,7 @@ class StreamExecution(
673672
if (lastExecution == null) {
674673
"No physical plan. Waiting for data."
675674
} else {
676-
val explain = ExplainCommand(lastExecution.logical, extended = extended)
675+
val explain = StreamingExplainCommand(lastExecution, extended = extended)
677676
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
678677
.map(_.getString(0)).mkString("\n")
679678
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import scala.util.control.ControlThrowable
2222

2323
import org.apache.spark.sql._
2424
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
25+
import org.apache.spark.sql.execution.command.ExplainCommand
2526
import org.apache.spark.sql.execution.streaming._
27+
import org.apache.spark.sql.functions._
2628
import org.apache.spark.sql.sources.StreamSourceProvider
2729
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2830

@@ -277,10 +279,24 @@ class StreamSuite extends StreamTest {
277279

278280
test("explain") {
279281
val inputData = MemoryStream[String]
280-
val df = inputData.toDS().map(_ + "foo")
281-
// Test `explain` not throwing errors
282-
df.explain()
283-
val q = df.writeStream.queryName("memory_explain").format("memory").start()
282+
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
283+
284+
// Test `df.explain`
285+
val explain = ExplainCommand(df.queryExecution.logical, extended = false)
286+
val explainString =
287+
spark.sessionState
288+
.executePlan(explain)
289+
.executedPlan
290+
.executeCollect()
291+
.map(_.getString(0))
292+
.mkString("\n")
293+
assert(explainString.contains("StateStoreRestore"))
294+
assert(explainString.contains("StreamingRelation"))
295+
assert(!explainString.contains("LocalTableScan"))
296+
297+
// Test StreamingQuery.display
298+
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
299+
.start()
284300
.asInstanceOf[StreamingQueryWrapper]
285301
.streamingQuery
286302
try {
@@ -294,12 +310,16 @@ class StreamSuite extends StreamTest {
294310
// `extended = false` only displays the physical plan.
295311
assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
296312
assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
313+
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
314+
assert(explainWithoutExtended.contains("StateStoreRestore"))
297315

298316
val explainWithExtended = q.explainInternal(true)
299317
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
300318
// plan.
301319
assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
302320
assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
321+
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
322+
assert(explainWithExtended.contains("StateStoreRestore"))
303323
} finally {
304324
q.stop()
305325
}

0 commit comments

Comments
 (0)