From 534459bb87ede3ccafc9689de1dce03b27e8f112 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 9 Jun 2021 00:12:58 -0700 Subject: [PATCH] Add plan check for stream-stream join unit tewt --- .../spark/sql/streaming/StreamingJoinSuite.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 40131e822c5ce..020d7fe75d1fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, StreamingSymmetricHashJoinHelper} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreProviderId} import org.apache.spark.sql.functions._ @@ -574,7 +575,14 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { testStream(joined)( AddData(input1, 1.to(1000): _*), AddData(input2, 1.to(1000): _*), - CheckAnswer(1.to(1000): _*)) + CheckAnswer(1.to(1000): _*), + Execute { query => + // Verify the query plan + assert(query.lastExecution.executedPlan.collect { + case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, + _: ShuffleExchangeExec, ShuffleExchangeExec(_, _: ShuffleExchangeExec, _)) => j + }.size == 1) + }) } test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") {