@@ -64,6 +64,32 @@ class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (S
6464 }
6565}
6666
67+ class RunningCountStatefulProcessorInt extends StatefulProcessor [String , String , (String , String )]
68+ with Logging {
69+ @ transient protected var _countState : ValueState [Int ] = _
70+
71+ override def init (
72+ outputMode : OutputMode ,
73+ timeMode : TimeMode ): Unit = {
74+ _countState = getHandle.getValueState[Int ](" countState" , Encoders .scalaInt)
75+ }
76+
77+ override def handleInputRows (
78+ key : String ,
79+ inputRows : Iterator [String ],
80+ timerValues : TimerValues ,
81+ expiredTimerInfo : ExpiredTimerInfo ): Iterator [(String , String )] = {
82+ val count = _countState.getOption().getOrElse(0 ) + 1
83+ if (count == 3 ) {
84+ _countState.clear()
85+ Iterator .empty
86+ } else {
87+ _countState.update(count)
88+ Iterator ((key, count.toString))
89+ }
90+ }
91+ }
92+
6793// Class to verify stateful processor usage with adding processing time timers
6894class RunningCountStatefulProcessorWithProcTimeTimer extends RunningCountStatefulProcessor {
6995 private def handleProcessingTimeBasedTimers (
@@ -1054,6 +1080,42 @@ class TransformWithStateSuite extends StateStoreMetricsTest
10541080 }
10551081 }
10561082 }
1083+
1084+ // TODO: Enable this test and expect error to be thrown when
1085+ // github.com/apache/spark/pull/47257 is merged
1086+ ignore(" test that invalid schema evolution fails query for column family" ) {
1087+ withSQLConf(SQLConf .STATE_STORE_PROVIDER_CLASS .key ->
1088+ classOf [RocksDBStateStoreProvider ].getName,
1089+ SQLConf .SHUFFLE_PARTITIONS .key ->
1090+ TransformWithStateSuiteUtils .NUM_SHUFFLE_PARTITIONS .toString) {
1091+ withTempDir { checkpointDir =>
1092+ val inputData = MemoryStream [String ]
1093+ val result1 = inputData.toDS()
1094+ .groupByKey(x => x)
1095+ .transformWithState(new RunningCountStatefulProcessor (),
1096+ TimeMode .None (),
1097+ OutputMode .Update ())
1098+
1099+ testStream(result1, OutputMode .Update ())(
1100+ StartStream (checkpointLocation = checkpointDir.getCanonicalPath),
1101+ AddData (inputData, " a" ),
1102+ CheckNewAnswer ((" a" , " 1" )),
1103+ StopStream
1104+ )
1105+ val result2 = inputData.toDS()
1106+ .groupByKey(x => x)
1107+ .transformWithState(new RunningCountStatefulProcessorInt (),
1108+ TimeMode .None (),
1109+ OutputMode .Update ())
1110+ testStream(result2, OutputMode .Update ())(
1111+ StartStream (checkpointLocation = checkpointDir.getCanonicalPath),
1112+ AddData (inputData, " a" ),
1113+ CheckNewAnswer ((" a" , " 2" )),
1114+ StopStream
1115+ )
1116+ }
1117+ }
1118+ }
10571119}
10581120
10591121class TransformWithStateValidationSuite extends StateStoreMetricsTest {
0 commit comments