From 14ff8f5d3e921970b3d7cf8a288b1d6b1a74c4d8 Mon Sep 17 00:00:00 2001 From: jingz-db Date: Wed, 10 Jul 2024 10:59:27 -0700 Subject: [PATCH 01/23] resolve conflicts --- .../execution/streaming/StatefulProcessorHandleImplBase.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala index 64d87073ccf9f..12f9ce768e42b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala @@ -23,7 +23,8 @@ import org.apache.spark.sql.execution.streaming.state.StateStoreErrors import org.apache.spark.sql.streaming.{StatefulProcessorHandle, TimeMode} abstract class StatefulProcessorHandleImplBase( - timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) extends StatefulProcessorHandle { + timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) + extends StatefulProcessorHandle { protected var currState: StatefulProcessorHandleState = PRE_INIT From 5ef175c85cd2546a088671ac7975ce814c2138a6 Mon Sep 17 00:00:00 2001 From: jingz-db Date: Wed, 10 Jul 2024 11:01:25 -0700 Subject: [PATCH 02/23] resolve --- .../execution/streaming/StatefulProcessorHandleImplBase.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala index 12f9ce768e42b..64d87073ccf9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImplBase.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.execution.streaming.state.StateStoreErrors import org.apache.spark.sql.streaming.{StatefulProcessorHandle, TimeMode} abstract class StatefulProcessorHandleImplBase( - timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) - extends StatefulProcessorHandle { + timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) extends StatefulProcessorHandle { protected var currState: StatefulProcessorHandleState = PRE_INIT From 8bf0576b1ff6ada87d055bdbe590dba138295ff8 Mon Sep 17 00:00:00 2001 From: jingz-db Date: Mon, 8 Jul 2024 14:31:03 -0700 Subject: [PATCH 03/23] a suite with composite type, why key encoder spec overwritten --- .../sql/execution/streaming/StatefulProcessorHandleImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala index 3031faa35b2d1..65b435b5c692c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala @@ -365,7 +365,7 @@ class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: Expressi ttlConfig: TTLConfig): MapState[K, V] = { verifyStateVarOperations("get_map_state", PRE_INIT) val colFamilySchema = columnFamilySchemaUtils. - getMapStateSchema(stateName, keyExprEnc, userKeyEnc, valEncoder, true) + getMapStateSchema(stateName, keyExprEnc, valEncoder, userKeyEnc, true) columnFamilySchemas.put(stateName, colFamilySchema) null.asInstanceOf[MapState[K, V]] } From 0b5e9458b09986f8185c1aa122775c59160c0507 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 25 Jun 2024 15:40:18 -0700 Subject: [PATCH 04/23] writing schema --- .../execution/streaming/ListStateImpl.scala | 13 ++- .../streaming/ListStateImplWithTTL.scala | 12 ++- .../execution/streaming/MapStateImpl.scala | 13 ++- .../streaming/MapStateImplWithTTL.scala | 12 ++- .../streaming/StateSchemaV3File.scala | 100 ++++++++++++++++++ .../execution/streaming/ValueStateImpl.scala | 12 ++- .../streaming/ValueStateImplWithTTL.scala | 13 ++- 7 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala index 56c9d2664d9e2..6931576a2c4f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala @@ -20,9 +20,20 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.ListState +object ListStateImpl { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), + true) + } +} + /** * Provides concrete implementation for list of values associated with a state variable * used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala index dc72f8bcd5600..416ce32088016 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala @@ -19,10 +19,20 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.{ListState, TTLConfig} import org.apache.spark.util.NextIterator +object ListStateImplWithTTL { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA_WITH_TTL, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), + true) + } +} /** * Class that provides a concrete implementation for a list state state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala index c58f32ed756db..5d861a9de9244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala @@ -19,10 +19,21 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair} +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair} import org.apache.spark.sql.streaming.MapState import org.apache.spark.sql.types.{BinaryType, StructType} +object MapStateImpl { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + COMPOSITE_KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) + } +} + class MapStateImpl[K, V]( store: StateStore, stateName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala index 2ab06f36dd5f7..ce58a495baf08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala @@ -20,10 +20,20 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.{MapState, TTLConfig} import org.apache.spark.util.NextIterator +object MapStateImplWithTTL { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + COMPOSITE_KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA_WITH_TTL, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) + } +} + /** * Class that provides a concrete implementation for map state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala new file mode 100644 index 0000000000000..82bab9a5301f0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream, StringReader} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} +import org.json4s.JValue +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +class StateSchemaV3File( + hadoopConf: Configuration, + path: String, + metadataCacheEnabled: Boolean = false) + extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { + + final val MAX_UTF_CHUNK_SIZE = 65535 + def this(sparkSession: SparkSession, path: String) = { + this( + sparkSession.sessionState.newHadoopConf(), + path, + metadataCacheEnabled = sparkSession.sessionState.conf.getConf( + SQLConf.STREAMING_METADATA_CACHE_ENABLED) + ) + } + + override protected def serialize(schema: JValue, out: OutputStream): Unit = { + val json = compact(render(schema)) + val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) + + val outputStream = out.asInstanceOf[FSDataOutputStream] + // DataOutputStream.writeUTF can't write a string at once + // if the size exceeds 65535 (2^16 - 1) bytes. + // Each metadata consists of multiple chunks in schema version 3. + try { + val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 + val metadataStringReader = new StringReader(json) + outputStream.writeInt(numMetadataChunks) + (0 until numMetadataChunks).foreach { _ => + val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) + outputStream.writeUTF(new String(buf, 0, numRead)) + } + outputStream.close() + } catch { + case e: Throwable => + throw e + } + } + + override protected def deserialize(in: InputStream): JValue = { + val buf = new StringBuilder + val inputStream = in.asInstanceOf[FSDataInputStream] + val numKeyChunks = inputStream.readInt() + (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) + val json = buf.toString() + JsonMethods.parse(json) + } + + override def add(batchId: Long, metadata: JValue): Boolean = { + require(metadata != null, "'null' metadata cannot written to a metadata log") + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + fileManager.delete(batchMetadataFile) + } + val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } + if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) + res + } + + override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { + val batchMetadataFile = batchIdToPath(batchId) + + if (metadataCacheEnabled && batchCache.containsKey(batchId)) { + false + } else { + write(batchMetadataFile, fn) + true + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index d916011245c00..7e6af3db38d26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -20,9 +20,19 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore} import org.apache.spark.sql.streaming.ValueState +object ValueStateImpl { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), + false) + } +} /** * Class that provides a concrete implementation for a single value state associated with state * variables used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala index 0ed5a6f29a984..c634c5840706d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala @@ -19,9 +19,20 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore} import org.apache.spark.sql.streaming.{TTLConfig, ValueState} +object ValueStateImplWithTTL { + def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + VALUE_ROW_SCHEMA_WITH_TTL, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), + false) + } +} + /** * Class that provides a concrete implementation for a single value state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. From cd045376ec24d52978796765db992856c7a2b924 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 1 Jul 2024 09:40:11 -0700 Subject: [PATCH 05/23] combining rules --- .../streaming/StateSchemaV3File.scala | 100 ------------------ 1 file changed, 100 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala deleted file mode 100644 index 82bab9a5301f0..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.io.{InputStream, OutputStream, StringReader} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} -import org.json4s.JValue -import org.json4s.jackson.JsonMethods -import org.json4s.jackson.JsonMethods.{compact, render} - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf - -class StateSchemaV3File( - hadoopConf: Configuration, - path: String, - metadataCacheEnabled: Boolean = false) - extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { - - final val MAX_UTF_CHUNK_SIZE = 65535 - def this(sparkSession: SparkSession, path: String) = { - this( - sparkSession.sessionState.newHadoopConf(), - path, - metadataCacheEnabled = sparkSession.sessionState.conf.getConf( - SQLConf.STREAMING_METADATA_CACHE_ENABLED) - ) - } - - override protected def serialize(schema: JValue, out: OutputStream): Unit = { - val json = compact(render(schema)) - val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) - - val outputStream = out.asInstanceOf[FSDataOutputStream] - // DataOutputStream.writeUTF can't write a string at once - // if the size exceeds 65535 (2^16 - 1) bytes. - // Each metadata consists of multiple chunks in schema version 3. - try { - val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 - val metadataStringReader = new StringReader(json) - outputStream.writeInt(numMetadataChunks) - (0 until numMetadataChunks).foreach { _ => - val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) - outputStream.writeUTF(new String(buf, 0, numRead)) - } - outputStream.close() - } catch { - case e: Throwable => - throw e - } - } - - override protected def deserialize(in: InputStream): JValue = { - val buf = new StringBuilder - val inputStream = in.asInstanceOf[FSDataInputStream] - val numKeyChunks = inputStream.readInt() - (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) - val json = buf.toString() - JsonMethods.parse(json) - } - - override def add(batchId: Long, metadata: JValue): Boolean = { - require(metadata != null, "'null' metadata cannot written to a metadata log") - val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { - fileManager.delete(batchMetadataFile) - } - val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } - if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) - res - } - - override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { - val batchMetadataFile = batchIdToPath(batchId) - - if (metadataCacheEnabled && batchCache.containsKey(batchId)) { - false - } else { - write(batchMetadataFile, fn) - true - } - } -} From de9404ae455e9619f18c9568da27e584d8c258d6 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 1 Jul 2024 17:09:54 -0700 Subject: [PATCH 06/23] Feedback --- .../streaming/TransformWithStateExec.scala | 50 +++++++++++++++++++ .../streaming/state/SchemaHelper.scala | 7 +++ .../streaming/TransformWithStateSuite.scala | 4 ++ 3 files changed, 61 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index c42d58ad67eac..80d3777183687 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -116,8 +116,14 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ +<<<<<<< HEAD private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas +======= + def getColFamilySchemas(): List[ColumnFamilySchema] = { + val driverProcessorHandle = getDriverProcessorHandle + val columnFamilySchemas = driverProcessorHandle.columnFamilySchemas +>>>>>>> 6768eea5a98 (Feedback) closeProcessorHandle() columnFamilySchemas } @@ -380,6 +386,7 @@ case class TransformWithStateExec( ) } +<<<<<<< HEAD private def fetchOperatorStateMetadataLog( hadoopConf: Configuration, checkpointDir: String, @@ -395,6 +402,11 @@ case class TransformWithStateExec( stateSchemaVersion: Int): Array[String] = { assert(stateSchemaVersion >= 3) val newSchemas = getColFamilySchemas() +======= + override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration, batchId: Long): + Array[String] = { + val newColumnFamilySchemas = getColFamilySchemas() +>>>>>>> 6768eea5a98 (Feedback) val schemaFile = new StateSchemaV3File( hadoopConf, stateSchemaDirPath(StateStoreId.DEFAULT_STORE_NAME).toString) // TODO: [SPARK-48849] Read the schema path from the OperatorStateMetadata file @@ -415,6 +427,7 @@ case class TransformWithStateExec( } private def validateSchemas( +<<<<<<< HEAD oldSchemas: List[ColumnFamilySchema], newSchemas: Map[String, ColumnFamilySchema]): Unit = { oldSchemas.foreach { case oldSchema: ColumnFamilySchemaV1 => @@ -427,6 +440,43 @@ case class TransformWithStateExec( ) } } +======= + oldSchema: List[ColumnFamilySchema], + newSchema: List[ColumnFamilySchema]): Unit = { + if (oldSchema.size != newSchema.size) { + throw new RuntimeException + } + // turn oldSchema to map + val oldSchemaMap = oldSchema.map(schema => + (schema.columnFamilyName, schema)).toMap + newSchema.foreach { case newSchema: ColumnFamilySchemaV1 => + val oldSchema = oldSchemaMap.get(newSchema.columnFamilyName) + if (oldSchema.isEmpty) { + throw new RuntimeException + } + if (oldSchema.get != newSchema) { + throw new RuntimeException + } + } + } + + /** Metadata of this stateful operator and its states stores. */ + override def operatorStateMetadata( + stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { + val info = getStateInfo + val operatorInfo = OperatorInfoV1(info.operatorId, shortName) + // stateSchemaFilePath should be populated at this point + val stateStoreInfo = + Array(StateStoreMetadataV2( + StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, stateSchemaPaths.head)) + + val operatorPropertiesJson: JValue = + ("timeMode" -> JString(timeMode.toString)) ~ + ("outputMode" -> JString(outputMode.toString)) + + val json = compact(render(operatorPropertiesJson)) + OperatorStateMetadataV2(operatorInfo, stateStoreInfo, json) +>>>>>>> 6768eea5a98 (Feedback) } private def stateSchemaDirPath(storeName: String): Path = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala index 0a8021ab3de2b..2c71f9cb91604 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala @@ -72,11 +72,18 @@ object ColumnFamilySchemaV1 { assert(colFamilyMap.isInstanceOf[Map[_, _]], s"Expected Map but got ${colFamilyMap.getClass}") val keySchema = StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String]) +<<<<<<< HEAD val valueSchema = StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]) ColumnFamilySchemaV1( colFamilyMap("columnFamilyName").asInstanceOf[String], keySchema, valueSchema, +======= + new ColumnFamilySchemaV1( + colFamilyMap("columnFamilyName").asInstanceOf[String], + keySchema, + StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]), +>>>>>>> 6768eea5a98 (Feedback) KeyStateEncoderSpec.fromJson(keySchema, colFamilyMap("keyStateEncoderSpec") .asInstanceOf[Map[String, Any]]), colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 31a3ae648c054..3ad151da7da5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -844,12 +844,16 @@ class TransformWithStateSuite extends StateStoreMetricsTest private def fetchColumnFamilySchemas( checkpointDir: String, operatorId: Int): List[ColumnFamilySchema] = { +<<<<<<< HEAD val operatorStateMetadataLog = fetchOperatorStateMetadataLog(checkpointDir, operatorId) val stateSchemaFilePath = operatorStateMetadataLog. getLatest().get._2. asInstanceOf[OperatorStateMetadataV2]. stateStoreInfo.head.stateSchemaFilePath fetchStateSchemaV3File(checkpointDir, operatorId).getWithPath(new Path(stateSchemaFilePath)) +======= + fetchStateSchemaV3File(checkpointDir, operatorId).getLatest().get._2 +>>>>>>> 6768eea5a98 (Feedback) } private def fetchStateSchemaV3File( From 40ba1e99aa5507884f06ccb413ba99e2aeda6aae Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 1 Jul 2024 17:35:33 -0700 Subject: [PATCH 07/23] rebase --- .../streaming/TransformWithStateExec.scala | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 80d3777183687..7f5f9bc9c76a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -116,14 +116,8 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ -<<<<<<< HEAD private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas -======= - def getColFamilySchemas(): List[ColumnFamilySchema] = { - val driverProcessorHandle = getDriverProcessorHandle - val columnFamilySchemas = driverProcessorHandle.columnFamilySchemas ->>>>>>> 6768eea5a98 (Feedback) closeProcessorHandle() columnFamilySchemas } @@ -386,7 +380,6 @@ case class TransformWithStateExec( ) } -<<<<<<< HEAD private def fetchOperatorStateMetadataLog( hadoopConf: Configuration, checkpointDir: String, @@ -402,11 +395,6 @@ case class TransformWithStateExec( stateSchemaVersion: Int): Array[String] = { assert(stateSchemaVersion >= 3) val newSchemas = getColFamilySchemas() -======= - override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration, batchId: Long): - Array[String] = { - val newColumnFamilySchemas = getColFamilySchemas() ->>>>>>> 6768eea5a98 (Feedback) val schemaFile = new StateSchemaV3File( hadoopConf, stateSchemaDirPath(StateStoreId.DEFAULT_STORE_NAME).toString) // TODO: [SPARK-48849] Read the schema path from the OperatorStateMetadata file @@ -427,7 +415,6 @@ case class TransformWithStateExec( } private def validateSchemas( -<<<<<<< HEAD oldSchemas: List[ColumnFamilySchema], newSchemas: Map[String, ColumnFamilySchema]): Unit = { oldSchemas.foreach { case oldSchema: ColumnFamilySchemaV1 => @@ -440,24 +427,6 @@ case class TransformWithStateExec( ) } } -======= - oldSchema: List[ColumnFamilySchema], - newSchema: List[ColumnFamilySchema]): Unit = { - if (oldSchema.size != newSchema.size) { - throw new RuntimeException - } - // turn oldSchema to map - val oldSchemaMap = oldSchema.map(schema => - (schema.columnFamilyName, schema)).toMap - newSchema.foreach { case newSchema: ColumnFamilySchemaV1 => - val oldSchema = oldSchemaMap.get(newSchema.columnFamilyName) - if (oldSchema.isEmpty) { - throw new RuntimeException - } - if (oldSchema.get != newSchema) { - throw new RuntimeException - } - } } /** Metadata of this stateful operator and its states stores. */ @@ -476,7 +445,6 @@ case class TransformWithStateExec( val json = compact(render(operatorPropertiesJson)) OperatorStateMetadataV2(operatorInfo, stateStoreInfo, json) ->>>>>>> 6768eea5a98 (Feedback) } private def stateSchemaDirPath(storeName: String): Path = { @@ -490,24 +458,6 @@ case class TransformWithStateExec( new Path(new Path(storeNamePath, "_metadata"), "schema") } - /** Metadata of this stateful operator and its states stores. */ - override def operatorStateMetadata( - stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { - val info = getStateInfo - val operatorInfo = OperatorInfoV1(info.operatorId, shortName) - // stateSchemaFilePath should be populated at this point - val stateStoreInfo = - Array(StateStoreMetadataV2( - StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, stateSchemaPaths.head)) - - val operatorPropertiesJson: JValue = - ("timeMode" -> JString(timeMode.toString)) ~ - ("outputMode" -> JString(outputMode.toString)) - - val json = compact(render(operatorPropertiesJson)) - OperatorStateMetadataV2(operatorInfo, stateStoreInfo, json) - } - override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver From 0a0e9e5c8dd6ccbe21f8b860e1014b1e140c266e Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 1 Jul 2024 18:23:07 -0700 Subject: [PATCH 08/23] refactors --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../sql/execution/streaming/statefulOperators.scala | 2 ++ 2 files changed, 11 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6ca831f99304b..143b43eef1376 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2081,6 +2081,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_SCHEMA_FILE_VERSION = + buildConf("spark.sql.streaming.stateStore.stateSchemaVersion") + .doc("The version of the state schema. This is used to check if the schema of the state " + + "store is compatible with the schema of the state data. If the schema is not compatible, " + + "the query will fail.") + .version("4.0.0") + .intConf + .createWithDefault(2) + val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 492ba37865d42..69699473fb8b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -211,6 +211,8 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp /** Records the duration of running `body` for the next query progress update. */ protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2 + def stateSchemaVersion: Int = 2 + /** Metadata of this stateful operator and its states stores. */ def operatorStateMetadata( stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { From b2674aebde9657e8e1f76fd55ac71a46d1517eda Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 2 Jul 2024 13:14:05 -0700 Subject: [PATCH 09/23] feedback --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 --------- .../sql/execution/streaming/statefulOperators.scala | 2 -- 2 files changed, 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 143b43eef1376..6ca831f99304b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2081,15 +2081,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val STATE_SCHEMA_FILE_VERSION = - buildConf("spark.sql.streaming.stateStore.stateSchemaVersion") - .doc("The version of the state schema. This is used to check if the schema of the state " + - "store is compatible with the schema of the state data. If the schema is not compatible, " + - "the query will fail.") - .version("4.0.0") - .intConf - .createWithDefault(2) - val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 69699473fb8b1..492ba37865d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -211,8 +211,6 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp /** Records the duration of running `body` for the next query progress update. */ protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2 - def stateSchemaVersion: Int = 2 - /** Metadata of this stateful operator and its states stores. */ def operatorStateMetadata( stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { From fd83cf810856492ce250ab7a85fe775118164972 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 9 Jul 2024 09:17:47 -0700 Subject: [PATCH 10/23] feedback --- .../sql/execution/streaming/state/StateSchemaV3File.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala index 482e802b7d87e..f6bccc08a80cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala @@ -97,3 +97,7 @@ class StateSchemaV3File( object StateSchemaV3File { val VERSION = 3 } + +object StateSchemaV3File { + val VERSION = 3 +} From 6a2e10683d80e2ffdec0cd514bf354f93c9562dc Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 15:52:15 -0700 Subject: [PATCH 11/23] rebase --- .../sql/execution/streaming/state/StateSchemaV3File.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala index f6bccc08a80cb..482e802b7d87e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala @@ -97,7 +97,3 @@ class StateSchemaV3File( object StateSchemaV3File { val VERSION = 3 } - -object StateSchemaV3File { - val VERSION = 3 -} From 3a1e5648a7cb0674ed6b7fcac96d834fc2932d22 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 25 Jun 2024 15:40:18 -0700 Subject: [PATCH 12/23] writing schema --- .../streaming/StateSchemaV3File.scala | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala new file mode 100644 index 0000000000000..82bab9a5301f0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream, StringReader} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} +import org.json4s.JValue +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +class StateSchemaV3File( + hadoopConf: Configuration, + path: String, + metadataCacheEnabled: Boolean = false) + extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { + + final val MAX_UTF_CHUNK_SIZE = 65535 + def this(sparkSession: SparkSession, path: String) = { + this( + sparkSession.sessionState.newHadoopConf(), + path, + metadataCacheEnabled = sparkSession.sessionState.conf.getConf( + SQLConf.STREAMING_METADATA_CACHE_ENABLED) + ) + } + + override protected def serialize(schema: JValue, out: OutputStream): Unit = { + val json = compact(render(schema)) + val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) + + val outputStream = out.asInstanceOf[FSDataOutputStream] + // DataOutputStream.writeUTF can't write a string at once + // if the size exceeds 65535 (2^16 - 1) bytes. + // Each metadata consists of multiple chunks in schema version 3. + try { + val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 + val metadataStringReader = new StringReader(json) + outputStream.writeInt(numMetadataChunks) + (0 until numMetadataChunks).foreach { _ => + val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) + outputStream.writeUTF(new String(buf, 0, numRead)) + } + outputStream.close() + } catch { + case e: Throwable => + throw e + } + } + + override protected def deserialize(in: InputStream): JValue = { + val buf = new StringBuilder + val inputStream = in.asInstanceOf[FSDataInputStream] + val numKeyChunks = inputStream.readInt() + (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) + val json = buf.toString() + JsonMethods.parse(json) + } + + override def add(batchId: Long, metadata: JValue): Boolean = { + require(metadata != null, "'null' metadata cannot written to a metadata log") + val batchMetadataFile = batchIdToPath(batchId) + if (fileManager.exists(batchMetadataFile)) { + fileManager.delete(batchMetadataFile) + } + val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } + if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) + res + } + + override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { + val batchMetadataFile = batchIdToPath(batchId) + + if (metadataCacheEnabled && batchCache.containsKey(batchId)) { + false + } else { + write(batchMetadataFile, fn) + true + } + } +} From d83ff7ba771251dc0fe01481152c69b8d228fd26 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 8 Jul 2024 11:32:14 -0700 Subject: [PATCH 13/23] checking the OperatorStateMetadata log for the state schema file --- .../spark/sql/execution/streaming/IncrementalExecution.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 3fe5aeae5f637..32ef5ae4e8a97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -213,6 +213,7 @@ class IncrementalExecution( val metadata = stateStoreWriter.operatorStateMetadata(stateSchemaPaths) stateStoreWriter match { case tws: TransformWithStateExec => + logError(s"### checkpointLocation: $checkpointLocation") val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path( checkpointLocation, tws.getStateInfo.operatorId.toString)) val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, From 2d4db497975826040b4824e8c97c8f94d98774e5 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 26 Jun 2024 17:00:35 -0700 Subject: [PATCH 14/23] creating operatorstatemetadata log --- .../spark/sql/execution/streaming/statefulOperators.scala | 4 ++-- .../streaming/state/OperatorStateMetadataSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 492ba37865d42..4dc4e1f70b388 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -216,7 +216,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) - val stateStoreInfo = + val stateStoreInfo: Array[StateStoreMetadata] = Array(StateStoreMetadataV1(StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions)) OperatorStateMetadataV1(operatorInfo, stateStoreInfo) } @@ -938,7 +938,7 @@ case class SessionWindowStateStoreSaveExec( stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) - val stateStoreInfo = Array(StateStoreMetadataV1( + val stateStoreInfo: Array[StateStoreMetadata] = Array(StateStoreMetadataV1( StateStoreId.DEFAULT_STORE_NAME, stateManager.getNumColsForPrefixKey, info.numPartitions)) OperatorStateMetadataV1(operatorInfo, stateStoreInfo) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index dd8f7aab51dd0..b55bf63054ab8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -106,7 +106,7 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { StopStream ) - val expectedStateStoreInfo = Array( + val expectedStateStoreInfo: Array[StateStoreMetadata] = Array( StateStoreMetadataV1("left-keyToNumValues", 0, numShufflePartitions), StateStoreMetadataV1("left-keyWithIndexToValue", 0, numShufflePartitions), StateStoreMetadataV1("right-keyToNumValues", 0, numShufflePartitions), From e824f724207b6148db48cfc0959f03dd1e5e3e26 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 27 Jun 2024 15:14:16 -0700 Subject: [PATCH 15/23] removing ': Array[StateStoreMetadata]' --- .../spark/sql/execution/streaming/statefulOperators.scala | 4 ++-- .../streaming/state/OperatorStateMetadataSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 4dc4e1f70b388..492ba37865d42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -216,7 +216,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) - val stateStoreInfo: Array[StateStoreMetadata] = + val stateStoreInfo = Array(StateStoreMetadataV1(StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions)) OperatorStateMetadataV1(operatorInfo, stateStoreInfo) } @@ -938,7 +938,7 @@ case class SessionWindowStateStoreSaveExec( stateSchemaPaths: Array[String] = Array.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) - val stateStoreInfo: Array[StateStoreMetadata] = Array(StateStoreMetadataV1( + val stateStoreInfo = Array(StateStoreMetadataV1( StateStoreId.DEFAULT_STORE_NAME, stateManager.getNumColsForPrefixKey, info.numPartitions)) OperatorStateMetadataV1(operatorInfo, stateStoreInfo) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index b55bf63054ab8..dd8f7aab51dd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -106,7 +106,7 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { StopStream ) - val expectedStateStoreInfo: Array[StateStoreMetadata] = Array( + val expectedStateStoreInfo = Array( StateStoreMetadataV1("left-keyToNumValues", 0, numShufflePartitions), StateStoreMetadataV1("left-keyWithIndexToValue", 0, numShufflePartitions), StateStoreMetadataV1("right-keyToNumValues", 0, numShufflePartitions), From 16c781dd6c48f2d4ae76cedf91b82e431a28c308 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 8 Jul 2024 11:32:14 -0700 Subject: [PATCH 16/23] checking the OperatorStateMetadata log for the state schema file --- .../streaming/OperatorStateMetadataLog.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala index 09b4b65e3c3a3..e5e4ecf0e0740 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -47,7 +47,18 @@ class OperatorStateMetadataLog( override protected def serialize(metadata: OperatorStateMetadata, out: OutputStream): Unit = { val fsDataOutputStream = out.asInstanceOf[FSDataOutputStream] fsDataOutputStream.write(s"v${metadata.version}\n".getBytes(StandardCharsets.UTF_8)) +<<<<<<< HEAD OperatorStateMetadataUtils.serialize(fsDataOutputStream, metadata) +======= + metadata.version match { + case 1 => + OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata) + case 2 => + logError(s"### stateSchemaPath: ${metadata.asInstanceOf[OperatorStateMetadataV2]. + stateStoreInfo.head.stateSchemaFilePath}") + OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata) + } +>>>>>>> 0acb7055d11 (checking the OperatorStateMetadata log for the state schema file) } override protected def deserialize(in: InputStream): OperatorStateMetadata = { From 192bb8f317096abacb444ddb49c2598ba6bcc19b Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 8 Jul 2024 11:48:38 -0700 Subject: [PATCH 17/23] adding todo --- .../spark/sql/execution/streaming/IncrementalExecution.scala | 1 - .../sql/execution/streaming/OperatorStateMetadataLog.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 32ef5ae4e8a97..3fe5aeae5f637 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -213,7 +213,6 @@ class IncrementalExecution( val metadata = stateStoreWriter.operatorStateMetadata(stateSchemaPaths) stateStoreWriter match { case tws: TransformWithStateExec => - logError(s"### checkpointLocation: $checkpointLocation") val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path( checkpointLocation, tws.getStateInfo.operatorId.toString)) val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala index e5e4ecf0e0740..85432c6fc682f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -54,8 +54,6 @@ class OperatorStateMetadataLog( case 1 => OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata) case 2 => - logError(s"### stateSchemaPath: ${metadata.asInstanceOf[OperatorStateMetadataV2]. - stateStoreInfo.head.stateSchemaFilePath}") OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata) } >>>>>>> 0acb7055d11 (checking the OperatorStateMetadata log for the state schema file) From fbd16860ce45cc87094498aa9eb17743d69d3e06 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 9 Jul 2024 13:15:55 -0700 Subject: [PATCH 18/23] purging --- .../execution/streaming/AsyncLogPurge.scala | 23 ++++++++++++++ .../streaming/IncrementalExecution.scala | 30 ++++++++++++++++++- .../streaming/MicroBatchExecution.scala | 6 +++- .../streaming/OperatorStateMetadataLog.scala | 16 ++++++++++ .../streaming/TransformWithStateExec.scala | 5 ++++ 5 files changed, 78 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala index aa393211a1c15..2461e63e98e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -40,8 +41,12 @@ trait AsyncLogPurge extends Logging { private val purgeRunning = new AtomicBoolean(false) + private val purgeOldestRunning = new AtomicBoolean(false) + protected def purge(threshold: Long): Unit + protected def purgeOldest(plan: SparkPlan): Unit + protected lazy val useAsyncPurge: Boolean = sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE) protected def purgeAsync(batchId: Long): Unit = { @@ -62,6 +67,24 @@ trait AsyncLogPurge extends Logging { } } + protected def purgeOldestAsync(plan: SparkPlan): Unit = { + if (purgeOldestRunning.compareAndSet(false, true)) { + asyncPurgeExecutorService.execute(() => { + try { + purgeOldest(plan) + } catch { + case throwable: Throwable => + logError("Encountered error while performing async log purge", throwable) + errorNotifier.markError(throwable) + } finally { + purgeOldestRunning.set(false) + } + }) + } else { + log.debug("Skipped log purging since there is already one in progress.") + } + } + protected def asyncLogPurgeShutdown(): Unit = { ThreadUtils.shutdown(asyncPurgeExecutorService) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 3fe5aeae5f637..413b5de6ea89e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -58,7 +58,7 @@ class IncrementalExecution( val offsetSeqMetadata: OffsetSeqMetadata, val watermarkPropagator: WatermarkPropagator, val isFirstBatch: Boolean) - extends QueryExecution(sparkSession, logicalPlan) with Logging { + extends QueryExecution(sparkSession, logicalPlan) with Logging with AsyncLogPurge { // Modified planner with stateful operations. override val planner: SparkPlanner = new SparkPlanner( @@ -79,6 +79,25 @@ class IncrementalExecution( StreamingTransformWithStateStrategy :: Nil } + // Methods to enable the use of AsyncLogPurge + protected val minLogEntriesToMaintain: Int = + sparkSession.sessionState.conf.minBatchesToRetain + + val errorNotifier: ErrorNotifier = new ErrorNotifier() + + override protected def purge(threshold: Long): Unit = {} + + override protected def purgeOldest(planWithStateOpId: SparkPlan): Unit = { + planWithStateOpId.collect { + case tws: TransformWithStateExec => + val metadataPath = OperatorStateMetadataV2.metadataFilePath(new Path( + checkpointLocation, tws.getStateInfo.operatorId.toString)) + val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, + metadataPath.toString) + operatorStateMetadataLog.purge(minLogEntriesToMaintain) + } + } + private lazy val hadoopConf = sparkSession.sessionState.newHadoopConf() private[sql] val numStateStores = offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key) @@ -497,6 +516,14 @@ class IncrementalExecution( } } + def purgeMetadataFiles(planWithStateOpId: SparkPlan): Unit = { + if (useAsyncPurge) { + purgeOldestAsync(planWithStateOpId) + } else { + purgeOldest(planWithStateOpId) + } + } + override def apply(plan: SparkPlan): SparkPlan = { val planWithStateOpId = plan transform composedRule // Need to check before write to metadata because we need to detect add operator @@ -508,6 +535,7 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule + purgeMetadataFiles(planWithStateOpId) simulateWatermarkPropagation(planWithStateOpId) planWithStateOpId transform WatermarkPropagationRule.rule diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index f636413f7c518..8fb1739f65fab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -929,6 +929,10 @@ class MicroBatchExecution( awaitProgressLock.unlock() } } + + override protected def purgeOldest(plan: SparkPlan): Unit = { + + } } object MicroBatchExecution { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala index 85432c6fc682f..63a9a4b77e4b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -70,4 +70,20 @@ class OperatorStateMetadataLog( case "v2" => OperatorStateMetadataUtils.deserialize(2, bufferedReader) } } + + /** + * Purges all log entries so that we only keep minLogEntriesToMaintain log files + */ + override def purge(minLogEntriesToMaintain: Long): Unit = { + val batches = listBatches.sorted + if (batches.length > minLogEntriesToMaintain) { + val batchesToRemove = batches.take(batches.length - minLogEntriesToMaintain.toInt) + batchesToRemove.foreach { batchId => + val batchPath = batchIdToPath(batchId) + if (fileManager.exists(batchPath)) { + fileManager.delete(batchPath) + } + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index 7f5f9bc9c76a2..e16ab0de01713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -389,6 +389,11 @@ case class TransformWithStateExec( new OperatorStateMetadataLog(hadoopConf, operatorStateMetadataPath.toString) } + override def metadataFilePath(): Path = { + OperatorStateMetadataV2.metadataFilePath( + new Path(getStateInfo.checkpointLocation, getStateInfo.operatorId.toString)) + } + override def validateAndMaybeEvolveStateSchema( hadoopConf: Configuration, batchId: Long, From 128615c48e0b5153273fe6e6f4df725474837ff4 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 9 Jul 2024 14:17:57 -0700 Subject: [PATCH 19/23] removing conflict --- .../execution/streaming/OperatorStateMetadataLog.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala index 63a9a4b77e4b4..479dbca662230 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -47,16 +47,7 @@ class OperatorStateMetadataLog( override protected def serialize(metadata: OperatorStateMetadata, out: OutputStream): Unit = { val fsDataOutputStream = out.asInstanceOf[FSDataOutputStream] fsDataOutputStream.write(s"v${metadata.version}\n".getBytes(StandardCharsets.UTF_8)) -<<<<<<< HEAD OperatorStateMetadataUtils.serialize(fsDataOutputStream, metadata) -======= - metadata.version match { - case 1 => - OperatorStateMetadataV1.serialize(fsDataOutputStream, metadata) - case 2 => - OperatorStateMetadataV2.serialize(fsDataOutputStream, metadata) - } ->>>>>>> 0acb7055d11 (checking the OperatorStateMetadata log for the state schema file) } override protected def deserialize(in: InputStream): OperatorStateMetadata = { From 0e84fcd7ac26493d62b6ae84c1f15461a18e25df Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 9 Jul 2024 14:26:57 -0700 Subject: [PATCH 20/23] removing unnecessary stuff --- .../sql/execution/streaming/ListStateImpl.scala | 13 +------------ .../execution/streaming/ListStateImplWithTTL.scala | 12 +----------- .../sql/execution/streaming/MapStateImpl.scala | 13 +------------ .../execution/streaming/MapStateImplWithTTL.scala | 12 +----------- .../sql/execution/streaming/ValueStateImpl.scala | 12 +----------- .../execution/streaming/ValueStateImplWithTTL.scala | 13 +------------ 6 files changed, 6 insertions(+), 69 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala index 6931576a2c4f4..56c9d2664d9e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala @@ -20,20 +20,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.ListState -object ListStateImpl { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA, - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), - true) - } -} - /** * Provides concrete implementation for list of values associated with a state variable * used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala index 416ce32088016..dc72f8bcd5600 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImplWithTTL.scala @@ -19,20 +19,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.{ListState, TTLConfig} import org.apache.spark.util.NextIterator -object ListStateImplWithTTL { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA_WITH_TTL, - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), - true) - } -} /** * Class that provides a concrete implementation for a list state state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala index 5d861a9de9244..c58f32ed756db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala @@ -19,21 +19,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair} import org.apache.spark.sql.streaming.MapState import org.apache.spark.sql.types.{BinaryType, StructType} -object MapStateImpl { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - COMPOSITE_KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA, - PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) - } -} - class MapStateImpl[K, V]( store: StateStore, stateName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala index ce58a495baf08..2ab06f36dd5f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala @@ -20,20 +20,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} import org.apache.spark.sql.streaming.{MapState, TTLConfig} import org.apache.spark.util.NextIterator -object MapStateImplWithTTL { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - COMPOSITE_KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA_WITH_TTL, - PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false) - } -} - /** * Class that provides a concrete implementation for map state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index 7e6af3db38d26..d916011245c00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -20,19 +20,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} import org.apache.spark.sql.streaming.ValueState -object ValueStateImpl { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA, - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), - false) - } -} /** * Class that provides a concrete implementation for a single value state associated with state * variables used in the streaming transformWithState operator. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala index c634c5840706d..0ed5a6f29a984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala @@ -19,20 +19,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} -import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} import org.apache.spark.sql.streaming.{TTLConfig, ValueState} -object ValueStateImplWithTTL { - def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = { - new ColumnFamilySchemaV1( - stateName, - KEY_ROW_SCHEMA, - VALUE_ROW_SCHEMA_WITH_TTL, - NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), - false) - } -} - /** * Class that provides a concrete implementation for a single value state associated with state * variables (with ttl expiration support) used in the streaming transformWithState operator. From fba3f87b7283084d726525a5bdc581e38895f62f Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 10:56:58 -0700 Subject: [PATCH 21/23] purging --- .../streaming/IncrementalExecution.scala | 1 + .../streaming/state/StateSchemaV3File.scala | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 413b5de6ea89e..3438b0a0b810a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -95,6 +95,7 @@ class IncrementalExecution( val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, metadataPath.toString) operatorStateMetadataLog.purge(minLogEntriesToMaintain) + case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala index 482e802b7d87e..b61522e3fe2fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala @@ -26,6 +26,7 @@ import scala.io.{Source => IOSource} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion @@ -39,7 +40,7 @@ import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVers */ class StateSchemaV3File( hadoopConf: Configuration, - path: String) { + path: String) extends Logging { val metadataPath = new Path(path) @@ -92,6 +93,19 @@ class StateSchemaV3File( throw e } } + + // list all the files in the metadata directory + // sort by the batchId + def listFiles(): Seq[Path] = { + fileManager.list(metadataPath).sorted.map(_.getPath) + } + + def listFilesBeforeBatch(batchId: Long): Seq[Path] = { + listFiles().filter { path => + val batchIdInPath = path.getName.split("_").head.toLong + batchIdInPath < batchId + } + } } object StateSchemaV3File { From f9d16bdeed0099597a12f371cb58fe61dc680c04 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 10 Jul 2024 14:12:50 -0700 Subject: [PATCH 22/23] purging works --- .../streaming/IncrementalExecution.scala | 10 +- .../streaming/OperatorStateMetadataLog.scala | 19 ++-- .../streaming/StateSchemaV3File.scala | 100 ------------------ .../streaming/TransformWithStateExec.scala | 3 - .../streaming/state/StateSchemaV3File.scala | 22 +++- .../streaming/TransformWithStateSuite.scala | 46 +++++++- 6 files changed, 78 insertions(+), 122 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 3438b0a0b810a..4ce762714e864 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadat import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1 -import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataV2, OperatorStateMetadataWriter} +import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV1, OperatorStateMetadataV2, OperatorStateMetadataWriter, StateSchemaV3File, StateStoreId} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -94,7 +94,13 @@ class IncrementalExecution( checkpointLocation, tws.getStateInfo.operatorId.toString)) val operatorStateMetadataLog = new OperatorStateMetadataLog(sparkSession, metadataPath.toString) - operatorStateMetadataLog.purge(minLogEntriesToMaintain) + val thresholdBatchId = + operatorStateMetadataLog.findThresholdBatchId(minLogEntriesToMaintain) + operatorStateMetadataLog.purge(thresholdBatchId) + val stateSchemaV3File = new StateSchemaV3File( + sparkSession.sessionState.newHadoopConf(), + path = tws.stateSchemaFilePath(Some(StateStoreId.DEFAULT_STORE_NAME)).toString) + stateSchemaV3File.purge(thresholdBatchId) case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala index 479dbca662230..19cd96eb59e96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OperatorStateMetadataLog.scala @@ -62,19 +62,12 @@ class OperatorStateMetadataLog( } } - /** - * Purges all log entries so that we only keep minLogEntriesToMaintain log files - */ - override def purge(minLogEntriesToMaintain: Long): Unit = { - val batches = listBatches.sorted - if (batches.length > minLogEntriesToMaintain) { - val batchesToRemove = batches.take(batches.length - minLogEntriesToMaintain.toInt) - batchesToRemove.foreach { batchId => - val batchPath = batchIdToPath(batchId) - if (fileManager.exists(batchPath)) { - fileManager.delete(batchPath) - } - } + def findThresholdBatchId(minLogEntriesToMaintain: Int): Long = { + val metadataFiles = listBatches + if (metadataFiles.length > minLogEntriesToMaintain) { + metadataFiles.sorted.take(metadataFiles.length - minLogEntriesToMaintain).last + 1 + } else { + -1 } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala index 82bab9a5301f0..e69de29bb2d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.io.{InputStream, OutputStream, StringReader} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream} -import org.json4s.JValue -import org.json4s.jackson.JsonMethods -import org.json4s.jackson.JsonMethods.{compact, render} - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf - -class StateSchemaV3File( - hadoopConf: Configuration, - path: String, - metadataCacheEnabled: Boolean = false) - extends HDFSMetadataLog[JValue](hadoopConf, path, metadataCacheEnabled) { - - final val MAX_UTF_CHUNK_SIZE = 65535 - def this(sparkSession: SparkSession, path: String) = { - this( - sparkSession.sessionState.newHadoopConf(), - path, - metadataCacheEnabled = sparkSession.sessionState.conf.getConf( - SQLConf.STREAMING_METADATA_CACHE_ENABLED) - ) - } - - override protected def serialize(schema: JValue, out: OutputStream): Unit = { - val json = compact(render(schema)) - val buf = new Array[Char](MAX_UTF_CHUNK_SIZE) - - val outputStream = out.asInstanceOf[FSDataOutputStream] - // DataOutputStream.writeUTF can't write a string at once - // if the size exceeds 65535 (2^16 - 1) bytes. - // Each metadata consists of multiple chunks in schema version 3. - try { - val numMetadataChunks = (json.length - 1) / MAX_UTF_CHUNK_SIZE + 1 - val metadataStringReader = new StringReader(json) - outputStream.writeInt(numMetadataChunks) - (0 until numMetadataChunks).foreach { _ => - val numRead = metadataStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE) - outputStream.writeUTF(new String(buf, 0, numRead)) - } - outputStream.close() - } catch { - case e: Throwable => - throw e - } - } - - override protected def deserialize(in: InputStream): JValue = { - val buf = new StringBuilder - val inputStream = in.asInstanceOf[FSDataInputStream] - val numKeyChunks = inputStream.readInt() - (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF())) - val json = buf.toString() - JsonMethods.parse(json) - } - - override def add(batchId: Long, metadata: JValue): Boolean = { - require(metadata != null, "'null' metadata cannot written to a metadata log") - val batchMetadataFile = batchIdToPath(batchId) - if (fileManager.exists(batchMetadataFile)) { - fileManager.delete(batchMetadataFile) - } - val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) } - if (metadataCacheEnabled && res) batchCache.put(batchId, metadata) - res - } - - override def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { - val batchMetadataFile = batchIdToPath(batchId) - - if (metadataCacheEnabled && batchCache.containsKey(batchId)) { - false - } else { - write(batchMetadataFile, fn) - true - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index e16ab0de01713..0f43256e3b6c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -402,9 +402,6 @@ case class TransformWithStateExec( val newSchemas = getColFamilySchemas() val schemaFile = new StateSchemaV3File( hadoopConf, stateSchemaDirPath(StateStoreId.DEFAULT_STORE_NAME).toString) - // TODO: [SPARK-48849] Read the schema path from the OperatorStateMetadata file - // and validate it with the new schema - val operatorStateMetadataLog = fetchOperatorStateMetadataLog( hadoopConf, getStateInfo.checkpointLocation, getStateInfo.operatorId) val mostRecentLog = operatorStateMetadataLog.getLatest() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala index b61522e3fe2fc..e7e94ae193664 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala @@ -96,18 +96,34 @@ class StateSchemaV3File( // list all the files in the metadata directory // sort by the batchId - def listFiles(): Seq[Path] = { - fileManager.list(metadataPath).sorted.map(_.getPath) + private[sql] def listFiles(): Seq[Path] = { + fileManager.list(metadataPath).sorted.map(_.getPath).toSeq } - def listFilesBeforeBatch(batchId: Long): Seq[Path] = { + private[sql] def listFilesBeforeBatch(batchId: Long): Seq[Path] = { listFiles().filter { path => val batchIdInPath = path.getName.split("_").head.toLong batchIdInPath < batchId } } + + /** + * purge schema files that are before thresholdBatchId, exclusive + */ + def purge(thresholdBatchId: Long): Unit = { + if (thresholdBatchId != -1) { + listFilesBeforeBatch(thresholdBatchId).foreach { + schemaFilePath => + fileManager.delete(schemaFilePath) + } + } + } } object StateSchemaV3File { val VERSION = 3 + + private[sql] def getBatchFromPath(path: Path): Long = { + path.getName.split("_").head.toLong + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 3ad151da7da5a..92492bcc044e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -861,7 +861,8 @@ class TransformWithStateSuite extends StateStoreMetricsTest operatorId: Int): StateSchemaV3File = { val hadoopConf = spark.sessionState.newHadoopConf() val stateChkptPath = new Path(checkpointDir, s"state/$operatorId") - val stateSchemaPath = new Path(new Path(stateChkptPath, "_metadata"), "schema") + val storeNamePath = new Path(stateChkptPath, "default") + val stateSchemaPath = new Path(new Path(storeNamePath, "_metadata"), "schema") new StateSchemaV3File(hadoopConf, stateSchemaPath.toString) } @@ -993,6 +994,49 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + + test("transformWithState - verify that metadata logs are purged") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString, + SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { + withTempDir { chkptDir => + val inputData = MemoryStream[String] + val result = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "1")), + StopStream, + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(("a", "2")), + StopStream, + StartStream(checkpointLocation = chkptDir.getCanonicalPath), + AddData(inputData, "a"), + CheckNewAnswer(), + StopStream + ) + val operatorStateMetadataLogs = + fetchOperatorStateMetadataLog(chkptDir.getCanonicalPath, 0) + .listBatchesOnDisk + assert(operatorStateMetadataLogs.length == 1) + // Make sure that only the latest batch has the schema file + assert(operatorStateMetadataLogs.head == 2) + + val schemaV3Files = fetchStateSchemaV3File(chkptDir.getCanonicalPath, 0).listFiles() + assert(schemaV3Files.length == 1) + assert(StateSchemaV3File.getBatchFromPath(schemaV3Files.head) == 2) + } + } + } + test("transformWithState - verify OperatorStateMetadataV2 serialization and deserialization" + " works") { withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> From 97b230dcbf6a7100911a004a7a71dbb6c5394be9 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Thu, 11 Jul 2024 11:11:09 -0700 Subject: [PATCH 23/23] purging --- .../spark/sql/execution/streaming/state/SchemaHelper.scala | 7 ------- .../spark/sql/streaming/TransformWithStateSuite.scala | 4 ---- 2 files changed, 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala index 2c71f9cb91604..0a8021ab3de2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala @@ -72,18 +72,11 @@ object ColumnFamilySchemaV1 { assert(colFamilyMap.isInstanceOf[Map[_, _]], s"Expected Map but got ${colFamilyMap.getClass}") val keySchema = StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String]) -<<<<<<< HEAD val valueSchema = StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]) ColumnFamilySchemaV1( colFamilyMap("columnFamilyName").asInstanceOf[String], keySchema, valueSchema, -======= - new ColumnFamilySchemaV1( - colFamilyMap("columnFamilyName").asInstanceOf[String], - keySchema, - StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]), ->>>>>>> 6768eea5a98 (Feedback) KeyStateEncoderSpec.fromJson(keySchema, colFamilyMap("keyStateEncoderSpec") .asInstanceOf[Map[String, Any]]), colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 92492bcc044e6..fd621fc1161f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -844,16 +844,12 @@ class TransformWithStateSuite extends StateStoreMetricsTest private def fetchColumnFamilySchemas( checkpointDir: String, operatorId: Int): List[ColumnFamilySchema] = { -<<<<<<< HEAD val operatorStateMetadataLog = fetchOperatorStateMetadataLog(checkpointDir, operatorId) val stateSchemaFilePath = operatorStateMetadataLog. getLatest().get._2. asInstanceOf[OperatorStateMetadataV2]. stateStoreInfo.head.stateSchemaFilePath fetchStateSchemaV3File(checkpointDir, operatorId).getWithPath(new Path(stateSchemaFilePath)) -======= - fetchStateSchemaV3File(checkpointDir, operatorId).getLatest().get._2 ->>>>>>> 6768eea5a98 (Feedback) } private def fetchStateSchemaV3File(