1818package org .apache .spark .sql .streaming
1919
2020import java .io .File
21- import java .util .UUID
21+ import java .util .{ Locale , UUID }
2222
2323import scala .util .Random
2424
@@ -420,6 +420,63 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
420420 AddData (input2, 1 .to(1000 ): _* ),
421421 CheckAnswer (1 .to(1000 ): _* ))
422422 }
423+
424+ test(" SPARK-26187 restore the stream-stream inner join query from Spark 2.4" ) {
425+ val inputStream = MemoryStream [(Int , Long )]
426+ val df = inputStream.toDS()
427+ .select(col(" _1" ).as(" value" ), col(" _2" ).cast(" timestamp" ).as(" timestamp" ))
428+
429+ val leftStream = df.select(col(" value" ).as(" leftId" ), col(" timestamp" ).as(" leftTime" ))
430+
431+ val rightStream = df
432+ // Introduce misses for ease of debugging
433+ .where(col(" value" ) % 2 === 0 )
434+ .select(col(" value" ).as(" rightId" ), col(" timestamp" ).as(" rightTime" ))
435+
436+ val query = leftStream
437+ .withWatermark(" leftTime" , " 5 seconds" )
438+ .join(
439+ rightStream.withWatermark(" rightTime" , " 5 seconds" ),
440+ expr(" rightId = leftId AND rightTime >= leftTime AND " +
441+ " rightTime <= leftTime + interval 5 seconds" ),
442+ joinType = " inner" )
443+ .select(col(" leftId" ), col(" leftTime" ).cast(" int" ),
444+ col(" rightId" ), col(" rightTime" ).cast(" int" ))
445+
446+ val resourceUri = this .getClass.getResource(
447+ " /structured-streaming/checkpoint-version-2.4.0-streaming-join/" ).toURI
448+ val checkpointDir = Utils .createTempDir().getCanonicalFile
449+ // Copy the checkpoint to a temp dir to prevent changes to the original.
450+ // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
451+ FileUtils .copyDirectory(new File (resourceUri), checkpointDir)
452+ inputStream.addData((1 , 1L ), (2 , 2L ), (3 , 3L ), (4 , 4L ), (5 , 5L ))
453+
454+ testStream(query)(
455+ StartStream (checkpointLocation = checkpointDir.getAbsolutePath),
456+ /*
457+ Note: The checkpoint was generated using the following input in Spark version 2.4.0
458+ AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
459+ // batch 1 - global watermark = 0
460+ // states
461+ // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
462+ // right: (2, 2L), (4, 4L)
463+ CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
464+ assertNumStateRows(7, 7),
465+ */
466+ AddData (inputStream, (6 , 6L ), (7 , 7L ), (8 , 8L ), (9 , 9L ), (10 , 10L )),
467+ // batch 2: same result as above test
468+ CheckNewAnswer ((6 , 6L , 6 , 6L ), (8 , 8L , 8 , 8L ), (10 , 10L , 10 , 10L )),
469+ assertNumStateRows(11 , 6 ),
470+ Execute { query =>
471+ // Verify state format = 1
472+ val f = query.lastExecution.executedPlan.collect {
473+ case f : StreamingSymmetricHashJoinExec => f
474+ }
475+ assert(f.size == 1 )
476+ assert(f.head.stateFormatVersion == 1 )
477+ }
478+ )
479+ }
423480}
424481
425482
@@ -822,7 +879,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
822879 )
823880 }
824881
825- test(" SPARK-26187 restore the stream-stream join query from Spark 2.4" ) {
882+ test(" SPARK-26187 restore the stream-stream outer join query from Spark 2.4" ) {
826883 val inputStream = MemoryStream [(Int , Long )]
827884 val df = inputStream.toDS()
828885 .select(col(" _1" ).as(" value" ), col(" _2" ).cast(" timestamp" ).as(" timestamp" ))
@@ -851,9 +908,8 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
851908 // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
852909 FileUtils .copyDirectory(new File (resourceUri), checkpointDir)
853910 inputStream.addData((1 , 1L ), (2 , 2L ), (3 , 3L ), (4 , 4L ), (5 , 5L ))
854- testStream(query)(
855- StartStream (checkpointLocation = checkpointDir.getAbsolutePath),
856- /*
911+
912+ /*
857913 Note: The checkpoint was generated using the following input in Spark version 2.4.0
858914 AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
859915 // batch 1 - global watermark = 0
@@ -863,28 +919,19 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
863919 CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
864920 assertNumStateRows(7, 7),
865921 */
866- AddData (inputStream, (6 , 6L ), (7 , 7L ), (8 , 8L ), (9 , 9L ), (10 , 10L )),
867- // batch 2: same result as above test
868- CheckNewAnswer ((6 , 6L , 6 , 6L ), (8 , 8L , 8 , 8L ), (10 , 10L , 10 , 10L )),
869- assertNumStateRows(13 , 8 ),
870- Execute { query =>
871- // Verify state format = 1
872- val f = query.lastExecution.executedPlan.collect {
873- case f : StreamingSymmetricHashJoinExec => f
874- }
875- assert(f.size == 1 )
876- assert(f.head.stateFormatVersion == 1 )
877- },
878- AddData (inputStream, (11 , 11L ), (12 , 12L ), (13 , 13L ), (14 , 14L ), (15 , 15L )),
879- // batch 3: global watermark, remaining rows in states, evicted rows are all same
880- // The query is running with old format, which SPARK-26187 is not fixed.
881- // Hence (2, 2L, null, null) is also emitted as output as well.
882- CheckNewAnswer (
883- Row (12 , 12L , 12 , 12L ), Row (14 , 14L , 14 , 14L ),
884- Row (1 , 1L , null , null ), Row (3 , 3L , null , null ),
885- Row (2 , 2L , null , null )),
886- assertNumStateRows(15 , 7 )
887- )
922+
923+ // we just fail the query if the checkpoint was create from less than Spark 3.0
924+ val e = intercept[StreamingQueryException ] {
925+ val writer = query.writeStream.format(" console" )
926+ .option(" checkpointLocation" , checkpointDir.getAbsolutePath).start()
927+ inputStream.addData((7 , 7L ), (8 , 8L ))
928+ eventually(timeout(streamingTimeout)) {
929+ assert(writer.exception.isDefined)
930+ }
931+ throw writer.exception.get
932+ }
933+ assert(e.getMessage.toLowerCase(Locale .ROOT )
934+ .contains(" the query is using stream-stream outer join with state format version 1" ))
888935 }
889936}
890937
0 commit comments