Skip to content

Commit 2c35d5f

Browse files
committed
creating operatorstatemetadata log
1 parent 32e73d0 commit 2c35d5f

File tree

8 files changed

+202
-25
lines changed

8 files changed

+202
-25
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import org.apache.spark.sql.connector.expressions.Transform
3131
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
3232
import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceErrors
3333
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.PATH
34-
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
35-
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataV1}
34+
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, OperatorStateMetadataLog}
35+
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataReader, OperatorStateMetadataV1, OperatorStateMetadataV2}
3636
import org.apache.spark.sql.sources.DataSourceRegister
3737
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructType}
3838
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -46,6 +46,7 @@ case class StateMetadataTableEntry(
4646
numPartitions: Int,
4747
minBatchId: Long,
4848
maxBatchId: Long,
49+
operatorPropertiesJson: String,
4950
numColsPrefixKey: Int) {
5051
def toRow(): InternalRow = {
5152
new GenericInternalRow(
@@ -55,6 +56,7 @@ case class StateMetadataTableEntry(
5556
numPartitions,
5657
minBatchId,
5758
maxBatchId,
59+
UTF8String.fromString(operatorPropertiesJson),
5860
numColsPrefixKey))
5961
}
6062
}
@@ -68,6 +70,7 @@ object StateMetadataTableEntry {
6870
.add("numPartitions", IntegerType)
6971
.add("minBatchId", LongType)
7072
.add("maxBatchId", LongType)
73+
.add("operatorProperties", StringType)
7174
}
7275
}
7376

@@ -192,22 +195,35 @@ class StateMetadataPartitionReader(
192195
val stateDir = new Path(checkpointLocation, "state")
193196
val opIds = fileManager
194197
.list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => pathToLong(f.getPath)).sorted
195-
opIds.map { opId =>
196-
new OperatorStateMetadataReader(new Path(stateDir, opId.toString), hadoopConf).read()
198+
opIds.flatMap { opId =>
199+
val operatorIdPath = new Path(stateDir, opId.toString)
200+
// check all OperatorStateMetadataV2
201+
val operatorStateMetadataV2Path = OperatorStateMetadataV2.metadataFilePath(operatorIdPath)
202+
if (fileManager.exists(operatorStateMetadataV2Path)) {
203+
val operatorStateMetadataLog = new OperatorStateMetadataLog(
204+
hadoopConf, operatorStateMetadataV2Path.toString)
205+
operatorStateMetadataLog.listBatchesOnDisk.flatMap(operatorStateMetadataLog.get)
206+
} else {
207+
Array(new OperatorStateMetadataReader(operatorIdPath, hadoopConf).read())
208+
}
197209
}
198210
}
199211

200212
private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
201213
allOperatorStateMetadata.flatMap { operatorStateMetadata =>
202-
require(operatorStateMetadata.version == 1)
203-
val operatorStateMetadataV1 = operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1]
204-
operatorStateMetadataV1.stateStoreInfo.map { stateStoreMetadata =>
205-
StateMetadataTableEntry(operatorStateMetadataV1.operatorInfo.operatorId,
206-
operatorStateMetadataV1.operatorInfo.operatorName,
214+
require(operatorStateMetadata.version == 1 || operatorStateMetadata.version == 2)
215+
val operatorProperties = operatorStateMetadata match {
216+
case _: OperatorStateMetadataV1 => ""
217+
case v2: OperatorStateMetadataV2 => v2.operatorPropertiesJson
218+
}
219+
operatorStateMetadata.stateStoreInfo.map { stateStoreMetadata =>
220+
StateMetadataTableEntry(operatorStateMetadata.operatorInfo.operatorId,
221+
operatorStateMetadata.operatorInfo.operatorName,
207222
stateStoreMetadata.storeName,
208223
stateStoreMetadata.numPartitions,
209224
if (batchIds.nonEmpty) batchIds.head else -1,
210225
if (batchIds.nonEmpty) batchIds.last else -1,
226+
operatorProperties,
211227
stateStoreMetadata.numColsPrefixKey
212228
)
213229
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadat
3737
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
3838
import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
3939
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
40-
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataWriter}
40+
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataV2, OperatorStateMetadataWriter}
4141
import org.apache.spark.sql.internal.SQLConf
4242
import org.apache.spark.sql.streaming.OutputMode
4343
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -189,6 +189,15 @@ class IncrementalExecution(
189189

190190
object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
191191
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
192+
case tws: TransformWithStateExec if isFirstBatch =>
193+
val metadata = tws.operatorStateMetadata()
194+
// use a subdirectory for v2
195+
val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path(
196+
checkpointLocation, tws.getStateInfo.operatorId.toString))
197+
val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession,
198+
metadataPath.toString)
199+
operatorStateMetadataLog.add(currentBatchId, metadata)
200+
tws
192201
case stateStoreWriter: StateStoreWriter if isFirstBatch =>
193202
val metadata = stateStoreWriter.operatorStateMetadata()
194203
val metadataWriter = new OperatorStateMetadataWriter(new Path(
@@ -454,11 +463,11 @@ class IncrementalExecution(
454463
new Path(checkpointLocation).getParent.toString,
455464
new SerializableConfiguration(hadoopConf))
456465
val opMetadataList = reader.allOperatorStateMetadata
457-
ret = opMetadataList.map { operatorMetadata =>
458-
val metadataInfoV1 = operatorMetadata
459-
.asInstanceOf[OperatorStateMetadataV1]
460-
.operatorInfo
461-
metadataInfoV1.operatorId -> metadataInfoV1.operatorName
466+
ret = opMetadataList.map {
467+
case OperatorStateMetadataV1(operatorInfo, _) =>
468+
operatorInfo.operatorId -> operatorInfo.operatorName
469+
case OperatorStateMetadataV2(operatorInfo, _, _) =>
470+
operatorInfo.operatorId -> operatorInfo.operatorName
462471
}.toMap
463472
} catch {
464473
case e: Exception =>
@@ -495,8 +504,8 @@ class IncrementalExecution(
495504

496505
// The two rules below don't change the plan but can cause the side effect that
497506
// metadata/schema is written in the checkpoint directory of stateful operator.
498-
planWithStateOpId transform StateSchemaValidationRule.rule
499-
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule
507+
val schemaValidatedPlan = planWithStateOpId transform StateSchemaValidationRule.rule
508+
schemaValidatedPlan transform WriteStatefulOperatorMetadataRule.rule
500509

501510
simulateWatermarkPropagation(planWithStateOpId)
502511
planWithStateOpId transform WatermarkPropagationRule.rule
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream}
21+
import java.nio.charset.StandardCharsets
22+
import java.nio.charset.StandardCharsets._
23+
24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs.FSDataOutputStream
26+
27+
import org.apache.spark.sql.SparkSession
28+
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, OperatorStateMetadataV1, OperatorStateMetadataV2}
29+
import org.apache.spark.sql.internal.SQLConf
30+
31+
32+
class OperatorStateMetadataLog(
33+
hadoopConf: Configuration,
34+
path: String,
35+
metadataCacheEnabled: Boolean = false)
36+
extends HDFSMetadataLog[OperatorStateMetadata](hadoopConf, path, metadataCacheEnabled) {
37+
38+
def this(sparkSession: SparkSession, path: String) = {
39+
this(
40+
sparkSession.sessionState.newHadoopConf(),
41+
path,
42+
metadataCacheEnabled = sparkSession.sessionState.conf.getConf(
43+
SQLConf.STREAMING_METADATA_CACHE_ENABLED)
44+
)
45+
}
46+
47+
override protected def serialize(metadata: OperatorStateMetadata, out: OutputStream): Unit = {
48+
val fsDataOutputStream = out.asInstanceOf[FSDataOutputStream]
49+
fsDataOutputStream.write(s"v${metadata.version}\n".getBytes(StandardCharsets.UTF_8))
50+
metadata.version match {
51+
case 1 =>
52+
OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata)
53+
case 2 =>
54+
OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata)
55+
}
56+
}
57+
58+
override protected def deserialize(in: InputStream): OperatorStateMetadata = {
59+
// called inside a try-finally where the underlying stream is closed in the caller
60+
// create buffered reader from input stream
61+
val bufferedReader = new BufferedReader(new InputStreamReader(in, UTF_8))
62+
// read first line for version number, in the format "v{version}"
63+
val version = bufferedReader.readLine()
64+
version match {
65+
case "v1" => OperatorStateMetadataV1.deserialize(bufferedReader)
66+
case "v2" => OperatorStateMetadataV2.deserialize(bufferedReader)
67+
}
68+
}
69+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ case class StreamingSymmetricHashJoinExec(
230230
override def operatorStateMetadata(): OperatorStateMetadata = {
231231
val info = getStateInfo
232232
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
233-
val stateStoreInfo = stateStoreNames.map(StateStoreMetadataV1(_, 0, info.numPartitions)).toArray
233+
val stateStoreInfo: Array[StateStoreMetadata] =
234+
stateStoreNames.map(StateStoreMetadataV1(_, 0, info.numPartitions)).toArray
234235
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
235236
}
236237

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.Path
26+
import org.json4s.JsonAST.JValue
27+
import org.json4s.JsonDSL._
28+
import org.json4s.JString
29+
import org.json4s.jackson.JsonMethods.{compact, render}
2630

2731
import org.apache.spark.broadcast.Broadcast
2832
import org.apache.spark.rdd.RDD
@@ -95,6 +99,8 @@ case class TransformWithStateExec(
9599
}
96100
}
97101

102+
override def operatorStateMetadataVersion: Int = 2
103+
98104
private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = {
99105
val driverProcessorHandle = new DriverStatefulProcessorHandleImpl
100106
statefulProcessor.setHandle(driverProcessorHandle)
@@ -382,7 +388,25 @@ case class TransformWithStateExec(
382388
private def validateSchemas(
383389
oldSchema: List[ColumnFamilySchema],
384390
newSchema: List[ColumnFamilySchema]): Unit = {
385-
// TODO: Implement logic that allows for schema evolution
391+
// TODO: Implement logic that allows for schema validation and evolution
392+
}
393+
394+
/** Metadata of this stateful operator and its states stores. */
395+
override def operatorStateMetadata(): OperatorStateMetadata = {
396+
val info = getStateInfo
397+
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
398+
// stateSchemaFilePath should be populated at this point
399+
assert(info.stateSchemaPath.isDefined)
400+
val stateStoreInfo: Array[StateStoreMetadata] =
401+
Array(StateStoreMetadataV2(
402+
StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, info.stateSchemaPath.get))
403+
404+
val operatorPropertiesJson: JValue =
405+
("timeMode" -> JString(timeMode.toString)) ~
406+
("outputMode" -> JString(outputMode.toString))
407+
408+
val json = compact(render(operatorPropertiesJson))
409+
OperatorStateMetadataV2(operatorInfo, stateStoreInfo, json)
386410
}
387411

388412
private def stateSchemaFilePath(storeName: Option[String] = None): Path = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, Metadata
3333
/**
3434
* Metadata for a state store instance.
3535
*/
36-
trait StateStoreMetadata {
36+
trait StateStoreMetadata extends Serializable {
3737
def storeName: String
3838
def numColsPrefixKey: Int
3939
def numPartitions: Int
@@ -42,6 +42,21 @@ trait StateStoreMetadata {
4242
case class StateStoreMetadataV1(storeName: String, numColsPrefixKey: Int, numPartitions: Int)
4343
extends StateStoreMetadata
4444

45+
case class StateStoreMetadataV2(
46+
storeName: String,
47+
numColsPrefixKey: Int,
48+
numPartitions: Int,
49+
stateSchemaFilePath: String)
50+
extends StateStoreMetadata
51+
52+
object StateStoreMetadataV2 {
53+
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
54+
55+
@scala.annotation.nowarn
56+
private implicit val manifest = Manifest
57+
.classType[StateStoreMetadataV2](implicitly[ClassTag[StateStoreMetadataV2]].runtimeClass)
58+
}
59+
4560
/**
4661
* Information about a stateful operator.
4762
*/
@@ -54,14 +69,25 @@ case class OperatorInfoV1(operatorId: Long, operatorName: String) extends Operat
5469

5570
trait OperatorStateMetadata {
5671
def version: Int
72+
73+
def operatorInfo: OperatorInfo
74+
75+
def stateStoreInfo: Array[StateStoreMetadata]
5776
}
5877

5978
case class OperatorStateMetadataV1(
6079
operatorInfo: OperatorInfoV1,
61-
stateStoreInfo: Array[StateStoreMetadataV1]) extends OperatorStateMetadata {
80+
stateStoreInfo: Array[StateStoreMetadata]) extends OperatorStateMetadata {
6281
override def version: Int = 1
6382
}
6483

84+
case class OperatorStateMetadataV2(
85+
operatorInfo: OperatorInfoV1,
86+
stateStoreInfo: Array[StateStoreMetadata],
87+
operatorPropertiesJson: String) extends OperatorStateMetadata {
88+
override def version: Int = 2
89+
}
90+
6591
object OperatorStateMetadataV1 {
6692

6793
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
@@ -84,6 +110,27 @@ object OperatorStateMetadataV1 {
84110
}
85111
}
86112

113+
object OperatorStateMetadataV2 {
114+
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
115+
116+
@scala.annotation.nowarn
117+
private implicit val manifest = Manifest
118+
.classType[OperatorStateMetadataV2](implicitly[ClassTag[OperatorStateMetadataV2]].runtimeClass)
119+
120+
def metadataFilePath(stateCheckpointPath: Path): Path =
121+
new Path(new Path(new Path(stateCheckpointPath, "v2"), "_metadata"), "metadata")
122+
123+
def deserialize(in: BufferedReader): OperatorStateMetadata = {
124+
Serialization.read[OperatorStateMetadataV2](in)
125+
}
126+
127+
def serialize(
128+
out: FSDataOutputStream,
129+
operatorStateMetadata: OperatorStateMetadata): Unit = {
130+
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV2], out)
131+
}
132+
}
133+
87134
/**
88135
* Write OperatorStateMetadata into the state checkpoint directory.
89136
*/
@@ -114,7 +161,9 @@ class OperatorStateMetadataWriter(stateCheckpointPath: Path, hadoopConf: Configu
114161
}
115162

116163
/**
117-
* Read OperatorStateMetadata from the state checkpoint directory.
164+
* Read OperatorStateMetadata from the state checkpoint directory. This class will only be
165+
* used to read OperatorStateMetadataV1.
166+
* OperatorStateMetadataV2 will be read by the OperatorStateMetadataLog.
118167
*/
119168
class OperatorStateMetadataReader(stateCheckpointPath: Path, hadoopConf: Configuration) {
120169

0 commit comments

Comments
 (0)