Skip to content

Commit 8889d78

Browse files
committed
[SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation
## What changes were proposed in this pull request? We should overwrite "otherCopyArgs" to provide the SparkSession parameter otherwise TreeNode.toJSON cannot get the full constructor parameter list. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Closes #21275 from zsxwing/SPARK-24214. (cherry picked from commit fd1179c) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent aba52f4 commit 8889d78

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
6666
output: Seq[Attribute])(session: SparkSession)
6767
extends LeafNode with MultiInstanceRelation {
6868

69+
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
6970
override def isStreaming: Boolean = true
7071
override def toString: String = source.toString
7172

@@ -97,6 +98,7 @@ case class StreamingRelationV2(
9798
output: Seq[Attribute],
9899
v1Relation: Option[StreamingRelation])(session: SparkSession)
99100
extends LeafNode with MultiInstanceRelation {
101+
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
100102
override def isStreaming: Boolean = true
101103
override def toString: String = sourceName
102104

@@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
116118
output: Seq[Attribute])(session: SparkSession)
117119
extends LeafNode with MultiInstanceRelation {
118120

121+
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
119122
override def isStreaming: Boolean = true
120123
override def toString: String = source.toString
121124

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
687687
CheckLastBatch(("A", 1)))
688688
}
689689

690+
test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " +
691+
"should not fail") {
692+
val df = spark.readStream.format("rate").load()
693+
assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
694+
695+
testStream(df)(
696+
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
697+
)
698+
699+
testStream(df, useV2Sink = true)(
700+
StartStream(trigger = Trigger.Continuous(100)),
701+
AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
702+
)
703+
}
704+
690705
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
691706
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
692707
require(!triggerDF.isStreaming)

0 commit comments

Comments
 (0)