Skip to content

Commit feaf659

Browse files
Dylan Wonganishshri-db
authored andcommitted
[SPARK-53332][SS] Enable StateDataSource with state checkpoint v2 (only snapshotStartBatchId option)
### What changes were proposed in this pull request? This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration. There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version. Before ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore ``` After ``` def replayStateFromSnapshot( snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false, snapshotVersionStateStoreCkptId: Option[String] = None, endVersionStateStoreCkptId: Option[String] = None): StateStore ``` This is the final PR in the series following: - #52047: Enable StateDataSource with state checkpoint v2 (only batchId option) - #52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed) NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change. ### Why are the changes needed? State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in #52047 and only `readChangeFeed` was implemented in #52148. ### Does this PR introduce _any_ user-facing change? Yes. State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used. ### How was this patch tested? In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests). `RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to #46944 where `snapshotStartBatchId` is first added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52202 from dylanwong250/SPARK-53332. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 81823c2 commit feaf659

File tree

856 files changed

+485
-139
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

856 files changed

+485
-139
lines changed

python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,10 +1923,6 @@ def conf(cls):
19231923
cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
19241924
return cfg
19251925

1926-
# TODO(SPARK-53332): Add test back when checkpoint v2 support exists for snapshotStartBatchId
1927-
def test_transform_with_value_state_metadata(self):
1928-
pass
1929-
19301926

19311927
class TransformWithStateInPySparkWithCheckpointV2TestsMixin(TransformWithStateInPySparkTestsMixin):
19321928
@classmethod
@@ -1935,10 +1931,6 @@ def conf(cls):
19351931
cfg.set("spark.sql.streaming.stateStore.checkpointFormatVersion", "2")
19361932
return cfg
19371933

1938-
# TODO(SPARK-53332): Add test back when checkpoint v2 support exists for snapshotStartBatchId
1939-
def test_transform_with_value_state_metadata(self):
1940-
pass
1941-
19421934

19431935
class TransformWithStateInPandasTests(TransformWithStateInPandasTestsMixin, ReusedSQLTestCase):
19441936
pass

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -610,14 +610,6 @@ object StateSourceOptions extends DataSourceOptions {
610610
)
611611
}
612612

613-
if (startOperatorStateUniqueIds.isDefined) {
614-
if (fromSnapshotOptions.isDefined) {
615-
throw StateDataSourceErrors.invalidOptionValue(
616-
SNAPSHOT_START_BATCH_ID,
617-
"Snapshot reading is currently not supported with checkpoint v2.")
618-
}
619-
}
620-
621613
StateSourceOptions(
622614
resolvedCpLocation, batchId.get, operatorId, storeName, joinSide,
623615
readChangeFeed, fromSnapshotOptions, readChangeFeedOptions,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ class StatePartitionReader(
206206
provider.asInstanceOf[SupportsFineGrainedReplay]
207207
.replayReadStateFromSnapshot(
208208
fromSnapshotOptions.snapshotStartBatchId + 1,
209-
partition.sourceOptions.batchId + 1)
209+
partition.sourceOptions.batchId + 1,
210+
getStartStoreUniqueId,
211+
getEndStoreUniqueId)
210212
}
211213
}
212214

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, Par
2424
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
2525
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
2626
import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorStateInfo
27-
import org.apache.spark.sql.execution.streaming.operators.stateful.join.{JoinStateManagerStoreGenerator, SymmetricHashJoinStateManager}
27+
import org.apache.spark.sql.execution.streaming.operators.stateful.join.{JoinStateManagerStoreGenerator, SnapshotOptions, SymmetricHashJoinStateManager}
2828
import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinHelper.{JoinSide, LeftSide, RightSide}
2929
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
3030
import org.apache.spark.sql.types.{BooleanType, StructType}
@@ -78,22 +78,40 @@ class StreamStreamJoinStatePartitionReader(
7878

7979
private val startStateStoreCheckpointIds =
8080
SymmetricHashJoinStateManager.getStateStoreCheckpointIds(
81-
partition.partition,
82-
partition.sourceOptions.startOperatorStateUniqueIds,
83-
usesVirtualColumnFamilies)
81+
partition.partition,
82+
partition.sourceOptions.startOperatorStateUniqueIds,
83+
usesVirtualColumnFamilies)
8484

85-
private val keyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) {
85+
private val endStateStoreCheckpointIds =
86+
SymmetricHashJoinStateManager.getStateStoreCheckpointIds(
87+
partition.partition,
88+
partition.sourceOptions.endOperatorStateUniqueIds,
89+
usesVirtualColumnFamilies)
90+
91+
private val startKeyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) {
8692
startStateStoreCheckpointIds.left.keyToNumValues
8793
} else {
8894
startStateStoreCheckpointIds.right.keyToNumValues
8995
}
9096

91-
private val keyWithIndexToValueStateStoreCkptId = if (joinSide == LeftSide) {
97+
private val startKeyWithIndexToValueStateStoreCkptId = if (joinSide == LeftSide) {
9298
startStateStoreCheckpointIds.left.keyWithIndexToValue
9399
} else {
94100
startStateStoreCheckpointIds.right.keyWithIndexToValue
95101
}
96102

103+
private val endKeyToNumValuesStateStoreCkptId = if (joinSide == LeftSide) {
104+
endStateStoreCheckpointIds.left.keyToNumValues
105+
} else {
106+
endStateStoreCheckpointIds.right.keyToNumValues
107+
}
108+
109+
private val endKeyWithIndexToValueStateStoreCkptId = if (joinSide == LeftSide) {
110+
endStateStoreCheckpointIds.left.keyWithIndexToValue
111+
} else {
112+
endStateStoreCheckpointIds.right.keyWithIndexToValue
113+
}
114+
97115
/*
98116
* This is to handle the difference of schema across state format versions. The major difference
99117
* is whether we have added new field(s) in addition to the fields from input schema.
@@ -150,13 +168,19 @@ class StreamStreamJoinStatePartitionReader(
150168
storeConf = storeConf,
151169
hadoopConf = hadoopConf.value,
152170
partitionId = partition.partition,
153-
keyToNumValuesStateStoreCkptId = keyToNumValuesStateStoreCkptId,
154-
keyWithIndexToValueStateStoreCkptId = keyWithIndexToValueStateStoreCkptId,
171+
keyToNumValuesStateStoreCkptId = startKeyToNumValuesStateStoreCkptId,
172+
keyWithIndexToValueStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId,
155173
formatVersion,
156174
skippedNullValueCount = None,
157175
useStateStoreCoordinator = false,
158-
snapshotStartVersion =
159-
partition.sourceOptions.fromSnapshotOptions.map(_.snapshotStartBatchId + 1),
176+
snapshotOptions =
177+
partition.sourceOptions.fromSnapshotOptions.map(opts => SnapshotOptions(
178+
snapshotVersion = opts.snapshotStartBatchId + 1,
179+
endVersion = partition.sourceOptions.batchId + 1,
180+
startKeyToNumValuesStateStoreCkptId = startKeyToNumValuesStateStoreCkptId,
181+
startKeyWithIndexToValueStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId,
182+
endKeyToNumValuesStateStoreCkptId = endKeyToNumValuesStateStoreCkptId,
183+
endKeyWithIndexToValueStateStoreCkptId = endKeyWithIndexToValueStateStoreCkptId)),
160184
joinStoreGenerator = new JoinStateManagerStoreGenerator()
161185
)
162186
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ import org.apache.spark.util.NextIterator
5959
* store providers being used in this class. If true, Spark will
6060
* take care of management for state store providers, e.g. running
6161
* maintenance task for these providers.
62+
* @param snapshotOptions Options controlling snapshot-based state replay for the state data
63+
* source reader.
6264
* @param joinStoreGenerator The generator to create state store instances, re-using the same
6365
* instance when the join implementation uses virtual column families
6466
* for join version 3.
@@ -95,15 +97,20 @@ abstract class SymmetricHashJoinStateManager(
9597
stateFormatVersion: Int,
9698
skippedNullValueCount: Option[SQLMetric] = None,
9799
useStateStoreCoordinator: Boolean = true,
98-
snapshotStartVersion: Option[Long] = None,
100+
snapshotOptions: Option[SnapshotOptions] = None,
99101
joinStoreGenerator: JoinStateManagerStoreGenerator) extends Logging {
100102
import SymmetricHashJoinStateManager._
101103

102104
protected val keySchema = StructType(
103105
joinKeys.zipWithIndex.map { case (k, i) => StructField(s"field$i", k.dataType, k.nullable) })
104106
protected val keyAttributes = toAttributes(keySchema)
105-
protected val keyToNumValues = new KeyToNumValuesStore(stateFormatVersion)
106-
protected val keyWithIndexToValue = new KeyWithIndexToValueStore(stateFormatVersion)
107+
108+
protected val keyToNumValues = new KeyToNumValuesStore(
109+
stateFormatVersion,
110+
snapshotOptions.map(_.getKeyToNumValuesHandlerOpts()))
111+
protected val keyWithIndexToValue = new KeyWithIndexToValueStore(
112+
stateFormatVersion,
113+
snapshotOptions.map(_.getKeyWithIndexToValueHandlerOpts()))
107114

108115
/*
109116
=====================================================
@@ -456,7 +463,8 @@ abstract class SymmetricHashJoinStateManager(
456463
/** Helper trait for invoking common functionalities of a state store. */
457464
protected abstract class StateStoreHandler(
458465
stateStoreType: StateStoreType,
459-
stateStoreCkptId: Option[String]) extends Logging {
466+
stateStoreCkptId: Option[String],
467+
handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None) extends Logging {
460468
private var stateStoreProvider: StateStoreProvider = _
461469

462470
/** StateStore that the subclasses of this class is going to operate on */
@@ -497,7 +505,7 @@ abstract class SymmetricHashJoinStateManager(
497505
}
498506
val storeProviderId = StateStoreProviderId(stateInfo.get, partitionId, storeName)
499507
val store = if (useStateStoreCoordinator) {
500-
assert(snapshotStartVersion.isEmpty, "Should not use state store coordinator " +
508+
assert(handlerSnapshotOptions.isEmpty, "Should not use state store coordinator " +
501509
"when reading state as data source.")
502510
joinStoreGenerator.getStore(
503511
storeProviderId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
@@ -509,13 +517,19 @@ abstract class SymmetricHashJoinStateManager(
509517
storeProviderId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
510518
useColumnFamilies = useVirtualColumnFamilies, storeConf, hadoopConf,
511519
useMultipleValuesPerKey = false, stateSchemaProvider = None)
512-
if (snapshotStartVersion.isDefined) {
520+
if (handlerSnapshotOptions.isDefined) {
513521
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) {
514522
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
515523
stateStoreProvider.getClass.toString)
516524
}
525+
val opts = handlerSnapshotOptions.get
517526
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay]
518-
.replayStateFromSnapshot(snapshotStartVersion.get, stateInfo.get.storeVersion)
527+
.replayStateFromSnapshot(
528+
opts.snapshotVersion,
529+
opts.endVersion,
530+
readOnly = true,
531+
opts.startStateStoreCkptId,
532+
opts.endStateStoreCkptId)
519533
} else {
520534
stateStoreProvider.getStore(stateInfo.get.storeVersion, stateStoreCkptId)
521535
}
@@ -539,9 +553,12 @@ abstract class SymmetricHashJoinStateManager(
539553

540554

541555
/** A wrapper around a [[StateStore]] that stores [key -> number of values]. */
542-
protected class KeyToNumValuesStore(val stateFormatVersion: Int)
543-
extends StateStoreHandler(KeyToNumValuesType, keyToNumValuesStateStoreCkptId) {
544-
556+
protected class KeyToNumValuesStore(
557+
val stateFormatVersion: Int,
558+
val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None)
559+
extends StateStoreHandler(
560+
KeyToNumValuesType, keyToNumValuesStateStoreCkptId, handlerSnapshotOptions) {
561+
SnapshotOptions
545562
private val useVirtualColumnFamilies = stateFormatVersion == 3
546563
private val longValueSchema = new StructType().add("value", "long")
547564
private val longToUnsafeRow = UnsafeProjection.create(longValueSchema)
@@ -707,8 +724,11 @@ abstract class SymmetricHashJoinStateManager(
707724
* A wrapper around a [[StateStore]] that stores the mapping; the mapping depends on the
708725
* state format version - please refer implementations of [[KeyWithIndexToValueRowConverter]].
709726
*/
710-
protected class KeyWithIndexToValueStore(stateFormatVersion: Int)
711-
extends StateStoreHandler(KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId) {
727+
protected class KeyWithIndexToValueStore(
728+
stateFormatVersion: Int,
729+
handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None)
730+
extends StateStoreHandler(
731+
KeyWithIndexToValueType, keyWithIndexToValueStateStoreCkptId, handlerSnapshotOptions) {
712732

713733
private val useVirtualColumnFamilies = stateFormatVersion == 3
714734
private val keyWithIndexExprs = keyAttributes :+ Literal(1L)
@@ -848,11 +868,11 @@ class SymmetricHashJoinStateManagerV1(
848868
stateFormatVersion: Int,
849869
skippedNullValueCount: Option[SQLMetric] = None,
850870
useStateStoreCoordinator: Boolean = true,
851-
snapshotStartVersion: Option[Long] = None,
871+
snapshotOptions: Option[SnapshotOptions] = None,
852872
joinStoreGenerator: JoinStateManagerStoreGenerator) extends SymmetricHashJoinStateManager(
853873
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
854874
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
855-
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotStartVersion,
875+
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotOptions,
856876
joinStoreGenerator) {
857877

858878
/** Commit all the changes to all the state stores */
@@ -927,11 +947,11 @@ class SymmetricHashJoinStateManagerV2(
927947
stateFormatVersion: Int,
928948
skippedNullValueCount: Option[SQLMetric] = None,
929949
useStateStoreCoordinator: Boolean = true,
930-
snapshotStartVersion: Option[Long] = None,
950+
snapshotOptions: Option[SnapshotOptions] = None,
931951
joinStoreGenerator: JoinStateManagerStoreGenerator) extends SymmetricHashJoinStateManager(
932952
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
933953
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
934-
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotStartVersion,
954+
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotOptions,
935955
joinStoreGenerator) {
936956

937957
/** Commit all the changes to the state store */
@@ -1034,20 +1054,20 @@ object SymmetricHashJoinStateManager {
10341054
stateFormatVersion: Int,
10351055
skippedNullValueCount: Option[SQLMetric] = None,
10361056
useStateStoreCoordinator: Boolean = true,
1037-
snapshotStartVersion: Option[Long] = None,
1057+
snapshotOptions: Option[SnapshotOptions] = None,
10381058
joinStoreGenerator: JoinStateManagerStoreGenerator): SymmetricHashJoinStateManager = {
10391059
if (stateFormatVersion == 3) {
10401060
new SymmetricHashJoinStateManagerV2(
10411061
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
10421062
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
1043-
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotStartVersion,
1063+
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotOptions,
10441064
joinStoreGenerator
10451065
)
10461066
} else {
10471067
new SymmetricHashJoinStateManagerV1(
10481068
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
10491069
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
1050-
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotStartVersion,
1070+
stateFormatVersion, skippedNullValueCount, useStateStoreCoordinator, snapshotOptions,
10511071
joinStoreGenerator
10521072
)
10531073
}
@@ -1280,3 +1300,36 @@ object SymmetricHashJoinStateManager {
12801300
}
12811301
}
12821302
}
1303+
1304+
/**
1305+
* Options controlling snapshot-based state replay for state data source reader.
1306+
*/
1307+
case class SnapshotOptions(
1308+
snapshotVersion: Long,
1309+
endVersion: Long,
1310+
startKeyToNumValuesStateStoreCkptId: Option[String] = None,
1311+
startKeyWithIndexToValueStateStoreCkptId: Option[String] = None,
1312+
endKeyToNumValuesStateStoreCkptId: Option[String] = None,
1313+
endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) {
1314+
1315+
def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions =
1316+
HandlerSnapshotOptions(
1317+
snapshotVersion = snapshotVersion,
1318+
endVersion = endVersion,
1319+
startStateStoreCkptId = startKeyToNumValuesStateStoreCkptId,
1320+
endStateStoreCkptId = endKeyToNumValuesStateStoreCkptId)
1321+
1322+
def getKeyWithIndexToValueHandlerOpts(): HandlerSnapshotOptions =
1323+
HandlerSnapshotOptions(
1324+
snapshotVersion = snapshotVersion,
1325+
endVersion = endVersion,
1326+
startStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId,
1327+
endStateStoreCkptId = endKeyWithIndexToValueStateStoreCkptId)
1328+
}
1329+
1330+
/** Snapshot options specialized for a single state store handler. */
1331+
private[join] case class HandlerSnapshotOptions(
1332+
snapshotVersion: Long,
1333+
endVersion: Long,
1334+
startStateStoreCkptId: Option[String],
1335+
endStateStoreCkptId: Option[String])

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -972,10 +972,22 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
972972
*
973973
* @param snapshotVersion checkpoint version of the snapshot to start with
974974
* @param endVersion checkpoint version to end with
975+
* @param readOnly whether the state store should be read-only
976+
* @param snapshotVersionStateStoreCkptId state store checkpoint ID of the snapshot version
977+
* @param endVersionStateStoreCkptId state store checkpoint ID of the end version
975978
* @return [[HDFSBackedStateStore]]
976979
*/
977980
override def replayStateFromSnapshot(
978-
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
981+
snapshotVersion: Long,
982+
endVersion: Long,
983+
readOnly: Boolean,
984+
snapshotVersionStateStoreCkptId: Option[String] = None,
985+
endVersionStateStoreCkptId: Option[String] = None): StateStore = {
986+
if (snapshotVersionStateStoreCkptId.isDefined || endVersionStateStoreCkptId.isDefined) {
987+
throw StateStoreErrors.stateStoreCheckpointIdsNotSupported(
988+
"HDFSBackedStateStoreProvider does not support checkpointFormatVersion > 1 " +
989+
"but a state store checkpointID is passed in")
990+
}
979991
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
980992
logInfo(log"Retrieved snapshot at version " +
981993
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +
@@ -990,10 +1002,21 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
9901002
*
9911003
* @param snapshotVersion checkpoint version of the snapshot to start with
9921004
* @param endVersion checkpoint version to end with
1005+
* @param snapshotVersionStateStoreCkptId state store checkpoint ID of the snapshot version
1006+
* @param endVersionStateStoreCkptId state store checkpoint ID of the end version
9931007
* @return [[HDFSBackedReadStateStore]]
9941008
*/
995-
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
1009+
override def replayReadStateFromSnapshot(
1010+
snapshotVersion: Long,
1011+
endVersion: Long,
1012+
snapshotVersionStateStoreCkptId: Option[String] = None,
1013+
endVersionStateStoreCkptId: Option[String] = None):
9961014
ReadStateStore = {
1015+
if (snapshotVersionStateStoreCkptId.isDefined || endVersionStateStoreCkptId.isDefined) {
1016+
throw StateStoreErrors.stateStoreCheckpointIdsNotSupported(
1017+
"HDFSBackedStateStoreProvider does not support checkpointFormatVersion > 1 " +
1018+
"but a state store checkpointID is passed in")
1019+
}
9971020
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
9981021
logInfo(log"Retrieved snapshot at version " +
9991022
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +

0 commit comments

Comments
 (0)